In [1]:
from zipfile import ZipFile

# extract zip data
with ZipFile("../data/raw/PS_20174392719_1491204439457_log.csv (1).zip", 'r') as zObject:
    zObject.extractall(path="../data/raw/")

In [2]:
import os
import sys


# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"]= "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]= "notebook"
os.environ["PYSPARK_PYTHON"]= "python"

In [3]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession \
        .builder.appName('PreprocessData') \
        .master("local[*]")\
        .getOrCreate()

In [5]:
df = spark.read \
    .option("header", "true") \
    .option("index", "true") \
    .csv('../data/raw/PS_20174392719_1491204439457_log.csv')

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

df = df.withColumn('amount', F.col('amount').cast(DoubleType()))\
       .withColumn('oldbalanceOrg', F.col('oldbalanceOrg').cast(DoubleType()))\
       .withColumn('newbalanceOrig', F.col('newbalanceOrig').cast(DoubleType()))\
       .withColumn('isFraud', F.col('isFraud').cast(IntegerType()))\
       .withColumn('isFlaggedFraud', F.col('isFlaggedFraud').cast(IntegerType()))

df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [6]:
df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



In [7]:
df.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [8]:
from pyspark.sql.functions import col,isnan, when, count

# check missing values
df.select(\
    [count( \
        when( isnan(c) | col(c).isNull(), c) \
    ).alias(c) for c in df.columns])\
   .show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [6]:
from datetime import datetime, timedelta
from pyspark.sql import functions as F

dt = datetime(2023, 5, 1, 0)
# print(dt + timedelta(days = 4, hours=700))

def step_to_date(step, dt = dt):
    step = int(step)
    new_dt = dt + timedelta(hours=step)

    new_dt_str = new_dt.__str__()
    return new_dt_str

In [7]:
from pyspark.sql.types import TimestampType, StringType, IntegerType, LongType

step_to_date_UDF = F.udf(lambda z: step_to_date(z),StringType())

In [8]:
# df.select(F.col("step"), \
#     step_to_date_UDF(F.col("step")).alias("datetime") ) \
#     .withColumn("datetime", F.to_timestamp("datetime"))\
#    .show()

df_transform = df.withColumn('datetime',\
              F.to_timestamp(\
                  step_to_date_UDF(F.col('step'))\
              ))\
             .select(
                 'datetime',
                 'type',
                'amount',
                'nameOrig',
                'oldbalanceOrg',
                'newbalanceOrig',
                'nameDest',
                'oldbalanceDest',
                'newbalanceDest',
                'isFraud',
                'isFlaggedFraud',
                )

df_transform.show()

+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|           datetime|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|2023-05-01 01:00:00| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|2023-05-01 01:00:00| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|2023-05-01 01:00:00|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|2023-05-01 01:00:00|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|          

In [9]:
df_transform = df_transform.withColumn("id_transaction", F.monotonically_increasing_id().cast(IntegerType()))

df_transform.show()

+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+
|           datetime|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|id_transaction|
+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+
|2023-05-01 01:00:00| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|             0|
|2023-05-01 01:00:00| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|             1|
|2023-05-01 01:00:00|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|             2|
|2023-05-01 01:00:00|CASH_OU

In [10]:
dim_type = df_transform.select('type').distinct().withColumn("id_type", F.monotonically_increasing_id().cast(IntegerType()))

dim_type.show()

+--------+-------+
|    type|id_type|
+--------+-------+
|TRANSFER|      0|
| CASH_IN|      1|
|CASH_OUT|      2|
| PAYMENT|      3|
|   DEBIT|      4|
+--------+-------+



In [11]:
dim_orig = df_transform.select('nameOrig', 'oldbalanceOrg', 'newbalanceOrig').distinct().withColumn("id_orig", F.monotonically_increasing_id().cast(IntegerType()))

dim_orig.show()

+-----------+-------------+--------------+-------+
|   nameOrig|oldbalanceOrg|newbalanceOrig|id_orig|
+-----------+-------------+--------------+-------+
|C1000211739|    126268.96|     120654.21|      0|
|C1000622818|      21174.0|           0.0|      1|
|C1000875171|    1611454.8|    1636596.75|      2|
|C1001477450|          0.0|           0.0|      3|
|C1001568425|      27484.0|       15171.8|      4|
|C1002880109|   1945231.72|    2055317.36|      5|
|C1003353224|      24205.0|           0.0|      6|
|C1003440677|    197144.07|     172401.41|      7|
|C1003718398|     63832.92|      63360.19|      8|
|C1003855553|          0.0|           0.0|      9|
|C1003978811|   6729339.89|    6865597.95|     10|
|C1004583006|      30076.0|           0.0|     11|
|C1004882514|          0.0|           0.0|     12|
|C1004916517|     123354.0|           0.0|     13|
|C1005606914|          0.0|           0.0|     14|
| C100576908|       1365.0|           0.0|     15|
|C1005800772|          0.0|    

In [12]:
dim_date = df_transform.select('datetime').distinct().withColumn("id_date", F.monotonically_increasing_id().cast(IntegerType()))

dim_date.show()

+-------------------+-------+
|           datetime|id_date|
+-------------------+-------+
|2023-05-01 05:00:00|      0|
|2023-05-02 16:00:00|      1|
|2023-05-07 08:00:00|      2|
|2023-05-01 10:00:00|      3|
|2023-05-06 02:00:00|      4|
|2023-05-02 20:00:00|      5|
|2023-05-01 15:00:00|      6|
|2023-05-03 20:00:00|      7|
|2023-05-02 08:00:00|      8|
|2023-05-03 04:00:00|      9|
|2023-05-06 06:00:00|     10|
|2023-05-03 23:00:00|     11|
|2023-05-05 15:00:00|     12|
|2023-05-05 21:00:00|     13|
|2023-05-06 08:00:00|     14|
|2023-05-04 14:00:00|     15|
|2023-05-06 01:00:00|     16|
|2023-05-06 15:00:00|     17|
|2023-05-02 19:00:00|     18|
|2023-05-05 19:00:00|     19|
+-------------------+-------+
only showing top 20 rows



In [15]:
df_transform_type = df_transform.join(dim_type, 'type')

df_transform_type.show()
                                        

+--------+-------------------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+-------+
|    type|           datetime|    amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|id_transaction|id_type|
+--------+-------------------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+-------+
|TRANSFER|2023-05-01 01:00:00|     181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|             2|      0|
|TRANSFER|2023-05-01 01:00:00|  215310.3|C1670993182|        705.0|           0.0|C1100439041|       22425.0|           0.0|      0|             0|            19|      0|
|TRANSFER|2023-05-01 01:00:00| 311685.89|C1984094095|      10835.0|           0.0| C932583850|        6267.0|    2719172.89|      0|             

In [18]:
df_transform_orig = df_transform_type.alias('a').join(dim_orig.alias('b'),
                        (df_transform.nameOrig == dim_orig.nameOrig) & 
                        (df_transform.oldbalanceOrg == dim_orig.oldbalanceOrg) &
                        (df_transform.newbalanceOrig == dim_orig.newbalanceOrig)
                        ).select('a.*', 'b.id_orig')

df_transform_orig.show()

+--------+-------------------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+-------+-------+
|    type|           datetime|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|id_transaction|id_type|id_orig|
+--------+-------------------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+-------+-------+
| PAYMENT|2023-05-02 22:00:00|  8424.74|C1000001725|        783.0|           0.0|M1974356374|           0.0|           0.0|      0|             0|       1013585|      3|  64247|
|CASH_OUT|2023-05-12 16:00:00| 58347.84|C1000008393|      10794.0|           0.0| C615558732|    2551590.32|    2609938.16|      0|             0|        574967|      2| 313386|
| CASH_IN|2023-05-07 12:00:00|108365.47|C1000018372| 1.83412097E7| 1.844957517E7|C1247890187|     315345.28|  

In [19]:
df_transform_date = df_transform_orig.join(dim_date, 'datetime')

df_transform_date.show()
                                        

+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+-------+-------+-------+
|           datetime|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|id_transaction|id_type|id_orig|id_date|
+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+-------+-------+-------+
|2023-05-02 22:00:00| PAYMENT|  8424.74|C1000001725|        783.0|           0.0|M1974356374|           0.0|           0.0|      0|             0|       1013585|      3|  64247|    141|
|2023-05-12 16:00:00|CASH_OUT| 58347.84|C1000008393|      10794.0|           0.0| C615558732|    2551590.32|    2609938.16|      0|             0|        574967|      2| 313386|    290|
|2023-05-07 12:00:00| CASH_IN|108365.47|C1000018372| 1.83412097E7| 1.8

In [50]:
df_transform.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)
 |-- id: integer (nullable = false)
 |-- id_date: string (nullable = true)



In [62]:
df_transform = df_transform.withColumn('id_date', F.date_format(F.col('datetime'), "yyyyMMdd").cast(IntegerType()))

df_transform.show()

+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+---+--------+
|           datetime|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud| id| id_date|
+-------------------+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+---+--------+
|2023-05-01 01:00:00| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|  0|20230501|
|2023-05-01 01:00:00| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|  1|20230501|
|2023-05-01 01:00:00|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|  2|20230501|
|2023-05-01 01:00:00|CASH_OUT|    181.0|

In [63]:
df_transform.tail(1)

[Row(datetime=datetime.datetime(2023, 5, 31, 23, 0), type='CASH_OUT', amount='850002.52', nameOrig='C1280323807', oldbalanceOrg='850002.52', newbalanceOrig='0.0', nameDest='C873221189', oldbalanceDest='6510099.11', newbalanceDest='7360101.63', isFraud='1', isFlaggedFraud='0', id=1546705, id_date=20230531)]

In [54]:
df_transform.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)
 |-- id: integer (nullable = false)
 |-- id_date: string (nullable = true)



In [40]:
# df_transform.write.options(header='True', delimiter=',') \
#  .csv("../data/preprocessed/PS_20174392719_1491204439457_log.csv")

df_transform.write.parquet("../data/preprocessed/online_transactions.parquet")

In [20]:
spark.stop()

In [7]:
import os
import sys

from datetime import datetime, timedelta

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType

spark = SparkSession \
        .builder.appName('PreprocessData') \
        .master("local[*]").config("spark.executor.memory", "2g")\
        .getOrCreate()


# src_file = sys.argv[1]
# output_filename = sys.argv[2]
src_path = f"../datasets/PS_20174392719_1491204439457_log.csv"

print("input file:", src_path)
# print("output file:", output_filename)

print("######################################")
print("READING CSV FILE")
print("######################################")

df = spark.read \
    .option("header", True)\
    .csv(src_path)

dt = datetime(2023, 5, 1, 0)

def step_to_date(step, dt = dt):
    step = int(step)
    new_dt = dt + timedelta(hours=step)

    new_dt_str = new_dt.__str__()
    return new_dt_str

step_to_date_UDF = F.udf(lambda z: step_to_date(z),StringType())

print("######################################")
print("TRANSFORM STEP TO DATETIME")
print("######################################")

df_transform = df.withColumn('dateTransaction',
                             F.to_timestamp(
                                 step_to_date_UDF(F.col('step'))
                            )
                )

print("######################################")
print("TRANSFORM DiffOrg")
print("######################################")

df_transform1 = df_transform.withColumn('DiffOrg',
                        F.when(
                            F.col('type') == 'CASH_IN',
                            F.round(F.col('newbalanceOrig') - F.col('oldbalanceOrg'), 2)
                        )\
                        .otherwise(
                            F.round(F.col('oldbalanceOrg') - F.col('newbalanceOrig'), 2)
                        )
                )

print("######################################")
print("TRANSFORM DiffOrgStatus")
print("######################################")

df_transform2 = df_transform1.withColumn('DiffOrgStatus',
                        F.when(
                            F.col('amount') == F.col('DiffOrg'),
                            1
                        )\
                        .otherwise(
                            0
                        )
                )


print("######################################")
print("DROP UNNECESSARY COLUMNS")
print("######################################")

df_transform3 = df_transform2.drop('step')


print("######################################")
print("SAVE DATA")
print("######################################")

# df_transform3.repartition(1).write.parquet(f"/opt/airflow/datasets/{output_filename}")

df_transform3.repartition(1).printSchema()
# df_transform4 = df_transform3.toPandas().to_parquet(f"/opt/airflow/datasets/{output_filename}", engine="pyarrow" ,index=False)




input file: ../datasets/PS_20174392719_1491204439457_log.csv
######################################
READING CSV FILE
######################################
######################################
TRANSFORM STEP TO DATETIME
######################################
######################################
TRANSFORM DiffOrg
######################################
######################################
TRANSFORM DiffOrgStatus
######################################
######################################
DROP UNNECESSARY COLUMNS
######################################
######################################
SAVE DATA
######################################
root
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isF