### Install kagglehub to download data from Kaggle

In [1]:
!pip install kagglehub > /dev/null 2>&1

### Download Kaggle data

In [2]:
import kagglehub
import shutil
import os

path = kagglehub.dataset_download("jinquan/cc-sample-data", force_download=True)

print("Path to file:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/jinquan/cc-sample-data?dataset_version_number=1...


100%|██████████| 210M/210M [06:21<00:00, 579kB/s]    

Extracting files...





Path to file: /home/jovyan/.cache/kagglehub/datasets/jinquan/cc-sample-data/versions/1


In [3]:
for file in os.listdir(path):
    shutil.move(f'{path}/{file}', f'/home/jovyan/work/kaggle/data/{file}')

### Import libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DecimalType
from pyspark.sql.functions import col, from_unixtime, from_json, to_date, to_timestamp, split, regexp_replace, trim, split, from_utc_timestamp, date_format, udf, sum, md5, concat_ws, replace, broadcast, pandas_udf
from pyspark import StorageLevel

### Create Spark session and define schemas for the JSON data

In [5]:
spark = SparkSession.builder \
    .appName("cc_trans") \
    .config("spark.ui.port", "4040") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "3g") \
    .config("spark.executor.cores", "3") \
    .config("spark.sql.shuffle.partitions", "24") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") \
    .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") \
    .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") \
    .config("spark.sql.parquet.compression.codec", "zstd") \
    .getOrCreate()

schema = StructType([
    StructField("Unnamed: 0", StringType()),
    StructField("trans_date_trans_time", StringType()),
    StructField("cc_bic", StringType()),
    StructField("cc_num", StringType()),
    StructField("merchant", StringType()),
    StructField("category", StringType()),
    StructField("amt", DecimalType(10, 2)),
    StructField("personal_detail", StringType()),
    StructField("trans_num", StringType()),
    StructField("merch_lat", DecimalType(9, 6)),
    StructField("merch_long", DecimalType(9, 6)),
    StructField("is_fraud", StringType()),
    StructField("merch_zipcode", StringType()),
    StructField("merch_eff_time", StringType()),
    StructField("merch_last_update_time", StringType())
])

personal_detail_schema = StructType([
    StructField("person_name", StringType()),
    StructField("gender", StringType()),
    StructField("address", StringType()),
    StructField("lat", DecimalType(7, 4)),
    StructField("long", DecimalType(7, 4)),
    StructField("city_pop", StringType()),
    StructField("job", StringType()),
    StructField("dob", StringType()),
])

address_schema = StructType([
    StructField("street", StringType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("zip", StringType())
])

### Load JSON file into dataframe

In [6]:
df = spark.read.format("json").schema(schema).load(f"/home/jovyan/work/kaggle/data/cc_sample_transaction.json")
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count()

1296675

In [7]:
df.show(2, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Unnamed: 0             | 0                                                                                                                                                                                                                                                                                     
 trans_date_trans_time  | 2019-01-01 00:00:18                                                                                                                                                                                                                                                                   
 cc_bic                 | CITIUS33CHI                                                

In [8]:
df.printSchema()
df.unpersist()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_bic: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: decimal(10,2) (nullable = true)
 |-- personal_detail: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- merch_lat: decimal(9,6) (nullable = true)
 |-- merch_long: decimal(9,6) (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merch_eff_time: string (nullable = true)
 |-- merch_last_update_time: string (nullable = true)



DataFrame[Unnamed: 0: string, trans_date_trans_time: string, cc_bic: string, cc_num: string, merchant: string, category: string, amt: decimal(10,2), personal_detail: string, trans_num: string, merch_lat: decimal(9,6), merch_long: decimal(9,6), is_fraud: string, merch_zipcode: string, merch_eff_time: string, merch_last_update_time: string]

### Handle nested JSON in _personal_details_ column

In [9]:
df = df.withColumn('personal_detail', from_json(col('personal_detail'), personal_detail_schema))
df = df.withColumn('address', from_json(col('personal_detail.address'), address_schema))

df = df.select(
    col("Unnamed: 0"),
    col("trans_date_trans_time"),
    col("cc_num"),
    col("merchant"),
    col("category"),
    col("amt"),
    col("personal_detail.person_name").alias("person_name"),
    col("personal_detail.gender").alias("gender"),
    col("address.street").alias("street"),
    col("address.city").alias("city"),
    col("address.state").alias("state"),
    col("address.zip").alias("zip"),
    col("personal_detail.lat").alias("lat"),
    col("personal_detail.long").alias("long"),
    col("personal_detail.city_pop").alias("city_pop"),
    col("personal_detail.job").alias("job"),
    col("personal_detail.dob").alias("dob"),
    col("trans_num"),
    col("merch_lat"),
    col("merch_long"),
    col("is_fraud"),
    col("merch_zipcode"),
    col("merch_eff_time"),
    col("merch_last_update_time"),
    col("cc_bic")
)

### Change data types for better performance and memory usage

In [10]:
df = df.withColumn("Unnamed: 0", col("Unnamed: 0").cast(IntegerType()))
df = df.withColumn("is_fraud", col("is_fraud").cast(BooleanType()))
df = df.withColumn("city_pop", col("city_pop").cast(IntegerType()))
df = df.withColumn('dob', to_date('dob', 'yyyy-MM-dd'))

### Convert to timestamp data type, then convert to UTC +8 timezone
* _merch_eff_time_ and _merch_last_update_time_ have values in Unix timestamp
* 13-digit timestamp represents milliseconds and 16-digit timestamp represents microseconds

In [11]:
df = df.withColumn('trans_date_trans_time', to_timestamp('trans_date_trans_time', 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn("trans_date_trans_time", from_utc_timestamp(col("trans_date_trans_time"), "Asia/Kuala_Lumpur"))

df = df.withColumn("merch_eff_time", to_timestamp(from_unixtime(col("merch_eff_time") / 1000000), 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn("merch_eff_time", from_utc_timestamp(col("merch_eff_time"), "Asia/Kuala_Lumpur"))

df = df.withColumn("merch_last_update_time", to_timestamp(from_unixtime(col("merch_last_update_time") / 1000), 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn("merch_last_update_time", from_utc_timestamp(col("merch_last_update_time"), "Asia/Kuala_Lumpur"))

### Split _person_name_ column into _first_name_ and _last_name_, and data cleaning

In [12]:
df = df.withColumn("first", split(col("person_name"), ",")[0]) \
       .withColumn("last", split(col("person_name"), ",")[1])

df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[Unnamed: 0: int, trans_date_trans_time: timestamp, cc_num: string, merchant: string, category: string, amt: decimal(10,2), person_name: string, gender: string, street: string, city: string, state: string, zip: string, lat: decimal(7,4), long: decimal(7,4), city_pop: int, job: string, dob: date, trans_num: string, merch_lat: decimal(9,6), merch_long: decimal(9,6), is_fraud: boolean, merch_zipcode: string, merch_eff_time: timestamp, merch_last_update_time: timestamp, cc_bic: string, first: string, last: string]

In [13]:
df.filter(col("first").isNull() | col("last").isNull() | (col("first")=='') | (col("last")=='') | (col("first")==' ') | (col("last")==' ')).select(col("person_name"), col("first"), col("last")).show(1)

+--------------+--------------+----+
|   person_name|         first|last|
+--------------+--------------+----+
|Edward@Sanchez|Edward@Sanchez|NULL|
+--------------+--------------+----+
only showing top 1 row



In [14]:
df = df.withColumn("first", split(col("person_name"), "[,@]")[0]) \
       .withColumn("last", split(col("person_name"), "[,@]")[1])

In [15]:
df.filter(col("first").isNull() | col("last").isNull() | (col("first")=='') | (col("last")=='') | (col("first")==' ') | (col("last")==' ')).select(col("person_name"), col("first"), col("last")).show(1)

+--------------------+------+----+
|         person_name| first|last|
+--------------------+------+----+
|Kelsey, , Richard...|Kelsey|    |
+--------------------+------+----+
only showing top 1 row



* _person_name_ also has other types of delimiter, not only _commas_
* Assuming that first name and last name can only contain alphabetic characters
* Delimiter can be repetitive with or without _space_

In [16]:
df = df.withColumn("person_name", regexp_replace(col("person_name"), r"[^A-Za-z]+", " ")) \
        .withColumn("person_name", trim(regexp_replace(col("person_name"), r"\s+", " "))) \
        .withColumn("first", split(col("person_name"), " ")[0]) \
        .withColumn("last", split(col("person_name"), " ")[1])

In [17]:
df.filter(col("first").isNull() | col("last").isNull() | (col("first")=='') | (col("last")=='')).select(col("person_name"), col("first"), col("last")).show(1)
df.unpersist()

+-----------+-----+----+
|person_name|first|last|
+-----------+-----+----+
+-----------+-----+----+



DataFrame[Unnamed: 0: int, trans_date_trans_time: timestamp, cc_num: string, merchant: string, category: string, amt: decimal(10,2), person_name: string, gender: string, street: string, city: string, state: string, zip: string, lat: decimal(7,4), long: decimal(7,4), city_pop: int, job: string, dob: date, trans_num: string, merch_lat: decimal(9,6), merch_long: decimal(9,6), is_fraud: boolean, merch_zipcode: string, merch_eff_time: timestamp, merch_last_update_time: timestamp, cc_bic: string, first: string, last: string]

### Mask sensitive PII data

In [18]:
@pandas_udf(StringType())
def hash_pii(series: pd.Series) -> pd.Series:
    return series.apply(lambda v: hashlib.sha256(v.encode("utf‑8")).hexdigest() if v is not None else None)

In [19]:
df = df.withColumnRenamed('cc_num', 'pii_cc_num')
df = df.withColumn('cc_num', hash_pii("pii_cc_num"))
df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[Unnamed: 0: int, trans_date_trans_time: timestamp, pii_cc_num: string, merchant: string, category: string, amt: decimal(10,2), person_name: string, gender: string, street: string, city: string, state: string, zip: string, lat: decimal(7,4), long: decimal(7,4), city_pop: int, job: string, dob: date, trans_num: string, merch_lat: decimal(9,6), merch_long: decimal(9,6), is_fraud: boolean, merch_zipcode: string, merch_eff_time: timestamp, merch_last_update_time: timestamp, cc_bic: string, first: string, last: string, cc_num: string]

In [20]:
df.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------
 Unnamed: 0             | 0                                                                
 trans_date_trans_time  | 2019-01-01 08:00:18                                              
 pii_cc_num             | 2703186189652095                                                 
 merchant               | fraud_Rippin, Kub and Mann                                       
 category               | misc_net                                                         
 amt                    | 4.97                                                             
 person_name            | Jennifer Banks eeeee                                             
 gender                 | F                                                                
 street                 | 561 Perry Cove                                                   
 city                   | Moravian Falls                                        

* If you are using Athena, Trino or any other tools that will need to point to S3 bucket to create tables, you can define the table structure to use the masked column instead of the original one

### Check for columns that have null value

In [21]:
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_columns = [c for c in df.columns if null_counts.collect()[0][c] > 0]

print("Columns with null values:\n", null_columns)

Columns with null values:
 ['merch_zipcode']


In [22]:
df.filter(col('merch_zipcode').isNull()).show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------
 Unnamed: 0             | 1                                                                
 trans_date_trans_time  | 2019-01-01 08:00:44                                              
 pii_cc_num             | 630423337322                                                     
 merchant               | fraud_Heller, Gutmann and Zieme                                  
 category               | grocery_pos                                                      
 amt                    | 107.23                                                           
 person_name            | Stephanie Gill eeeee                                             
 gender                 | F                                                                
 street                 | 43039 Riley Greens Suite 393                                     
 city                   | Orient                                                

* These particular records do not have _merch_zipcode_ field in the source file. We can just ignore this.

### Handling duplicated records

In [23]:
print('Total records before deduplication:', df.count())

Total records before deduplication: 1296675


In [24]:
df = df.withColumn("md5", md5(concat_ws("|", *[col(c) for c in df.columns])))
df = df.dropDuplicates(["md5"])
df = df.drop("md5")

In [25]:
print('Total records after deduplication:', df.count())

Total records after deduplication: 1296675


### Merge table with a lookup table
* Download external data from Kaggle to map state code to their full names
* Use broadcast join method to reduce shuffle operations and speed up the join

In [26]:
path = kagglehub.dataset_download("francescopettini/us-state-names-codes-and-abbreviations", force_download=True)

print("Path to file:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/francescopettini/us-state-names-codes-and-abbreviations?dataset_version_number=2...


100%|██████████| 830/830 [00:00<00:00, 1.73MB/s]

Extracting files...
Path to file: /home/jovyan/.cache/kagglehub/datasets/francescopettini/us-state-names-codes-and-abbreviations/versions/2





In [27]:
for file in os.listdir(path):
    shutil.move(f'{path}/{file}', f'/home/jovyan/work/kaggle/{file}')

In [28]:
state_name = spark.read.csv(f"/home/jovyan/work/kaggle/{file}", header=True, inferSchema=True)

In [29]:
state_name.show(5)

+----+----------+------------+----------+
|Code|     State|Abbreviation|Alpha code|
+----+----------+------------+----------+
|   1|   Alabama|        Ala.|        AL|
|   2|    Alaska|        NULL|        AK|
|   4|   Arizona|       Ariz.|        AZ|
|   5|  Arkansas|        Ark.|        AR|
|   6|California|      Calif.|        CA|
+----+----------+------------+----------+
only showing top 5 rows



In [30]:
state_name = state_name.withColumnRenamed("State", "state").withColumnRenamed("Alpha code", "state_code").select(col("state"), col("state_code"))
df = df.withColumnRenamed("state", "state_code")

In [31]:
df_joined = df.alias("main").join(broadcast(state_name).alias("lookup"), "state_code", "left") \
            .select("main.*", "lookup.state")

df_joined.persist(StorageLevel.MEMORY_AND_DISK)
df.unpersist()

DataFrame[Unnamed: 0: int, trans_date_trans_time: timestamp, pii_cc_num: string, merchant: string, category: string, amt: decimal(10,2), person_name: string, gender: string, street: string, city: string, state_code: string, zip: string, lat: decimal(7,4), long: decimal(7,4), city_pop: int, job: string, dob: date, trans_num: string, merch_lat: decimal(9,6), merch_long: decimal(9,6), is_fraud: boolean, merch_zipcode: string, merch_eff_time: timestamp, merch_last_update_time: timestamp, cc_bic: string, first: string, last: string, cc_num: string]

In [32]:
df_joined.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------
 state_code             | KY                                                               
 Unnamed: 0             | 493806                                                           
 trans_date_trans_time  | 2019-08-04 02:41:57                                              
 pii_cc_num             | 4707867759656333                                                 
 merchant               | fraud_Torphy-Kertzmann                                           
 category               | health_fitness                                                   
 amt                    | 10.46                                                            
 person_name            | Susan Shah                                                       
 gender                 | F                                                                
 street                 | 9016 Jordan Bypass Apt. 516                           

### Saving into file

In [33]:
df_joined = df_joined.select(
    col("Unnamed: 0"),
    col("trans_date_trans_time"),
    col("cc_num"),
    col("merchant"),
    col("category"),
    col("amt"),
    col("gender"),
    col("street"),
    col("city"),
    col("state"),
    col("zip"),
    col("lat"),
    col("long"),
    col("city_pop"),
    col("job"),
    col("dob"),
    col("trans_num"),
    col("merch_lat"),
    col("merch_long"),
    col("is_fraud"),
    col("merch_zipcode"),
    col("merch_eff_time"),
    col("merch_last_update_time"),
    col("cc_bic"),
    col("pii_cc_num")
)

df_joined.show(1, truncate=False, vertical=True)

df_joined.coalesce(1).write.mode("overwrite").parquet('/home/jovyan/work/kaggle/output/')

df_joined.unpersist()

-RECORD 0----------------------------------------------------------------------------------
 Unnamed: 0             | 493806                                                           
 trans_date_trans_time  | 2019-08-04 02:41:57                                              
 cc_num                 | b72b3338da0c4b30c7647123824da6badeb23fb91e98a44d23cc60e1e4eb9b35 
 merchant               | fraud_Torphy-Kertzmann                                           
 category               | health_fitness                                                   
 amt                    | 10.46                                                            
 gender                 | F                                                                
 street                 | 9016 Jordan Bypass Apt. 516                                      
 city                   | Cranks                                                           
 state                  | Kentucky                                              

DataFrame[Unnamed: 0: int, trans_date_trans_time: timestamp, cc_num: string, merchant: string, category: string, amt: decimal(10,2), gender: string, street: string, city: string, state: string, zip: string, lat: decimal(7,4), long: decimal(7,4), city_pop: int, job: string, dob: date, trans_num: string, merch_lat: decimal(9,6), merch_long: decimal(9,6), is_fraud: boolean, merch_zipcode: string, merch_eff_time: timestamp, merch_last_update_time: timestamp, cc_bic: string, pii_cc_num: string]

* A lot of persist() and unpersist() are used to avoid recomputation
* Recomputation happens if actions such as count(), show(), and collect(), are triggered, 