In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, regexp_replace, trim, split, to_timestamp, from_unixtime, date_format, unix_timestamp, when, length, from_utc_timestamp, concat, lit, year, expr, lower, concat_ws, sha2, lpad, round, repeat
from pyspark.sql.types import *
import pandas as pd

In [2]:
# Initialize Spark session with all available cores
spark = SparkSession.builder.appName("LocalETL").master("local[*]").getOrCreate()

In [3]:

# Read the credit card transaction json into a DF
df = spark.read.json("cc_sample_transaction.json")

In [4]:
# Define schema for personal detail key
personal_schema = StructType([
    StructField("person_name", StringType()),
    StructField("gender", StringType()),
    StructField("address", StringType()),
    StructField("lat", StringType()),
    StructField("long", StringType()),
    StructField("city_pop", StringType()),
    StructField("job", StringType()),
    StructField("dob", StringType()),
])

# Define schema for address key
address_schema = StructType([
    StructField("street", StringType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("zip", StringType()),
])

# Define schema for credit card record dictionary
final_schema = StructType([
    StructField("Unnamed: 0", IntegerType(), True),
    StructField("trans_date_trans_time", TimestampType(), True),
    StructField("trans_date_trans_time_utc+8", TimestampType(), True),
    StructField("cc_num", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amt", DoubleType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("trans_num", StringType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True),
    StructField("is_fraud", IntegerType(), True),
    StructField("merch_zipcode", StringType(), True),
    StructField("merch_last_update_time", TimestampType(), True),
    StructField("merch_last_update_time_utc+8", TimestampType(), True),
    StructField("merch_eff_time", TimestampType(), True),
    StructField("merch_eff_time_utc+8", TimestampType(), True),
    StructField("cc_bic", StringType(), True)
])

In [5]:
# Define timestamp format and timezone for timestamp transformation
ts_sec_fmt = "yyyy-MM-dd HH:mm:ss"
ts_final = "yyyy-MM-dd HH:mm:ss.SSSSSS"
tzone = "Asia/Kuala_Lumpur"

# Extract nested fields from 'personal_detail'
df_personal_parse = (df
                     .withColumn("personal_parsed", from_json(col("personal_detail"), personal_schema))
                     .withColumn("person_name", col("personal_parsed.person_name"))
                     .withColumn("gender", col("personal_parsed.gender"))
                     .withColumn("address_parsed", from_json(col("personal_parsed.address"), address_schema))
                     .withColumn("street", col("address_parsed.street"))
                     .withColumn("city", col("address_parsed.city"))
                     .withColumn("state", col("address_parsed.state"))
                     .withColumn("zip", col("address_parsed.zip"))   
                     .withColumn("lat", col("personal_parsed.lat"))
                     .withColumn("long", col("personal_parsed.long"))
                     .withColumn("city_pop", col("personal_parsed.city_pop"))
                     .withColumn("job", col("personal_parsed.job"))
                     .withColumn("dob", col("personal_parsed.dob"))                   
                     )

# Clean and split person name into first and last name
df_name_split = (df_personal_parse
                 .withColumn("person_name_clean", regexp_replace(col("person_name"), r"[,@/!;:\-\\|]", " "))
                 .withColumn("person_name_clean", regexp_replace(col("person_name_clean"), r"\s+", " "))
                 .withColumn("person_name_clean", trim(col("person_name_clean")))
                 .withColumn("first", split(col("person_name_clean"), " ").getItem(0))
                 .withColumn("last", split(col("person_name_clean"), " ").getItem(1))
                 )

# Drop unused parsed struct columns
df_struct_drop = (df_name_split
                  .drop(col("personal_parsed"))
                  .drop(col("address_parsed")))

# Clean up all fields
cols_to_clean = df_struct_drop.columns

for c in cols_to_clean:
    df_struct_drop = (df_struct_drop
                      .withColumn(c, when(lower(trim(col(c))).isin("na", "null", ""), None).otherwise(trim(col(c)))))

# Transform string fields
df_str_trans = (df_struct_drop
                .withColumn("merchant", expr("CASE WHEN merchant LIKE 'fraud_%' THEN substring(merchant, 7) ELSE merchant END"))
                .withColumn("zip", lpad(col("zip").cast("string"), 5, "0")))

# Cast columns to numeric fields
df_flt_trans = (df_str_trans
                .withColumn("amt", col("amt").cast(DecimalType(10, 2)))
                .withColumn("lat", col("lat").cast(DecimalType(9, 6)))
                .withColumn("long", col("long").cast(DecimalType(9, 6)))
                .withColumn("merch_lat", col("merch_lat").cast(DecimalType(9, 6)))
                .withColumn("merch_long", col("merch_long").cast(DecimalType(9, 6)))
                )

# Process and convert timestamp fields
df_ts_trans = (df_flt_trans
               .withColumn("trans_date_trans_time", to_timestamp("trans_date_trans_time", ts_sec_fmt))
               .withColumn("trans_date_trans_time_utc+8"
                           ,from_utc_timestamp(col("trans_date_trans_time"), tzone))
               .withColumn("merch_last_update_time"
                           ,when(length(col("merch_last_update_time")) == 12, col("merch_last_update_time") * 10)
                           .otherwise(col("merch_last_update_time")))
               .withColumn("merch_last_update_time", (col("merch_last_update_time").cast("double") / 1000).cast("timestamp"))
               .withColumn("merch_last_update_time_utc+8", from_utc_timestamp(col("merch_last_update_time"), tzone))
               .withColumn("merch_eff_time"
                           ,when(length(col("merch_eff_time")) == 15, col("merch_eff_time") * 10)
                           .otherwise(col("merch_eff_time")))
               .withColumn("merch_eff_time", (col("merch_eff_time").cast("double") / 1000000).cast("timestamp"))
               .withColumn("merch_eff_time_utc+8" , from_utc_timestamp(col("merch_eff_time"), tzone))
               )

# Apply PII masking
df_pii_apply = (df_ts_trans
                .withColumn("cc_num", concat_ws("", lit("XXXX-XXXX-XXXX-"), col("cc_num").substr(-4, 4)))
                .withColumn("first", sha2(col("first"), 256))
                .withColumn("last", sha2(col("last"), 256))
                .withColumn("street", lit("Address Hidden"))
                .withColumn("zip", concat(col("zip").substr(1, 3), lit("XX")))
                .withColumn("lat", round(col("lat"), 1))
                .withColumn("long", round(col("long"), 1))
                .withColumn("dob", concat_ws("-", year(col("dob")).cast("string"), lit("01"), lit("01")))
                )


In [6]:
# Arrange final DF columns to final_schema with the correct data types
df_final = df_pii_apply.select([col(f.name).cast(f.dataType) for f in final_schema])
df_final.show()
# Count = 1296675

+----------+---------------------+---------------------------+-------------------+--------------------+-------------+------+--------------------+--------------------+------+--------------+--------------------+-----+-----+----+------+--------+--------------------+----------+--------------------+---------+-----------+--------+-------------+----------------------+----------------------------+--------------------+--------------------+-----------+
|Unnamed: 0|trans_date_trans_time|trans_date_trans_time_utc+8|             cc_num|            merchant|     category|   amt|               first|                last|gender|        street|                city|state|  zip| lat|  long|city_pop|                 job|       dob|           trans_num|merch_lat| merch_long|is_fraud|merch_zipcode|merch_last_update_time|merch_last_update_time_utc+8|      merch_eff_time|merch_eff_time_utc+8|     cc_bic|
+----------+---------------------+---------------------------+-------------------+--------------------+---

In [7]:
# Set chunk size for batch write DF into CSV file
chunk_size = 200000
total_rows = df_final.count()

# Write DF to CSV file in chunks
for start in range(0, total_rows, chunk_size):
    df_chunk = df_final.filter((col("Unnamed: 0") >= start) & (col("Unnamed: 0") < start + chunk_size))
    
    df_pd = df_chunk.toPandas()
    df_pd.to_csv("cc_data.csv", mode='a', header=start == 0, index=False)