# Intro
So here's my plan on doing this, I split them into these parts:

1. Setup the pySpark application
2. Load the data, flatten the JSON
3. Exploratory Data Analysis
4. Data Cleaning
5. Visualization

# Part 1: Setup the pySpark application

In [5]:
import os
from pyspark.sql import SparkSession
try:
    spark.stop()
    print("theres a current session running, stop it")
except:
    pass

os.environ['JAVA_HOME'] = r'C:\Program Files\Microsoft\jdk-17.0.16.8-hotspot'
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("JSON loader") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print("yea done")

theres a current session running, stop it
yea done


# Part 2: Load the data

In [6]:
df = spark.read.json("cc_sample_transaction.json")
df.show(5, truncate=False)

+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+----------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+--------------------------------+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num          |is_fraud|merch_eff_time  |merch_last_update_time|merch_lat         |merch_long |merch_zipcode|merchant                          |personal_detail                                                                                                                                                                                                                                                                      |trans_

# Part 3: EDA

looking at this data, i'll start with the normal checking first which are:
* have a look at the schema.
* how many rows of data are we dealing with?


then we'll look for things that we need to cleanup, which on top of my head are:


* first of all, the personal detail data seemed nested, i'll try to flatten those
* check if data types are correct
* missing values, nulls, empty strings, etc.
* check for format issues, normally with the time format ()

i noticed there are some specific ones mentioned in the questions which are:

* Timestamp conversion
* Name derivation

not really familiar with credit card transaction data but looking at the data, there's a few things we can check:

* credit transaction value must be positive right?
* is the location lats/long correct?

since we had labels for frauds, what if we try to see:

* the distrubution of the frauds vs the non fraud ones
* what are kind of transaction are normally labeled as frauds, is there a pattern?

In [7]:
# check the schema
df.printSchema()

#check the numbert of rows
df.count()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cc_bic: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_eff_time: string (nullable = true)
 |-- merch_last_update_time: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- personal_detail: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- trans_num: string (nullable = true)



1296675

In [8]:
# flatten the personal_details part
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
import json

# get one example of the personal_detail JSON to build the structure
sample_json = df.select("personal_detail").limit(1).collect()[0]["personal_detail"]

# create the schema for personal_detail
personal_detail_schema = StructType([
    StructField(key, StringType(), True) for key in json.loads(sample_json).keys()
])

# parse the personal_detail json
df_flat = df.withColumn("personal_detail_json", from_json(col("personal_detail"), personal_detail_schema))

# create columns for the parsed personal_detail fields
for field in personal_detail_schema.fieldNames():
    df_flat = df_flat.withColumn(field, col("personal_detail_json")[field])

# remove the temporary personal_detail parsing column and original personal_detail column
df_flat = df_flat.drop("personal_detail_json", "personal_detail")

# now flatten the address field (which is also JSON)
sample_address = df_flat.select("address").limit(1).collect()[0]["address"]

# create schema for address JSON
address_schema = StructType([
    StructField(key, StringType(), True) for key in json.loads(sample_address).keys()
])

# parse the address json
df_flat = df_flat.withColumn("address_json", from_json(col("address"), address_schema))

# create columns with "addr_" prefix just in case there's naming issues
for field in address_schema.fieldNames():
    df_flat = df_flat.withColumn(f"addr_{field}", col("address_json")[field])

# remove the temporary address
df_flat = df_flat.drop("address_json", "address")

In [9]:
from pyspark.sql.functions import col, to_date, from_unixtime, when
from pyspark.sql.types import DoubleType

# Check current data types
for field in df_flat.schema.fields:
    print(f"{field.name}: {field.dataType}")

Unnamed: 0: StringType()
amt: StringType()
category: StringType()
cc_bic: StringType()
cc_num: StringType()
is_fraud: StringType()
merch_eff_time: StringType()
merch_last_update_time: StringType()
merch_lat: StringType()
merch_long: StringType()
merch_zipcode: StringType()
merchant: StringType()
trans_date_trans_time: StringType()
trans_num: StringType()
person_name: StringType()
gender: StringType()
lat: StringType()
long: StringType()
city_pop: StringType()
job: StringType()
dob: StringType()
addr_street: StringType()
addr_city: StringType()
addr_state: StringType()
addr_zip: StringType()


In [10]:
# one by one changing column types can be tedious so let's make a list of expected data types
numeric_columns = ['amt', 'merch_lat', 'merch_long', 'lat', 'long', 'city_pop']
boolean_columns = ['is_fraud']
timestamp_columns = ['trans_date_trans_time'] 
unix_timestamp_columns = ['merch_eff_time', 'merch_last_update_time']
date_columns = ['dob']

df_typed = df_flat

# text to number
for column in numeric_columns:
    df_typed = df_typed.withColumn(column, col(column).cast(DoubleType()))

# convert is_fraud to bool
for column in boolean_columns:
    df_typed = df_typed.withColumn(column,
                                   when(col(column) == "1", True)
                                   .when(col(column) == "0", False)
                                   .otherwise(None))

# convert normal timestamps to timestamp format and adjust to UTC+8
from pyspark.sql.functions import to_timestamp, expr
for column in timestamp_columns:
    df_typed = df_typed.withColumn(column, 
                                   to_timestamp(col(column), "yyyy-MM-dd HH:mm:ss") + expr("INTERVAL 8 HOURS"))

# convert unix timestamps to normal timestamps and adjust to UTC+8
for column in unix_timestamp_columns:
    df_typed = df_typed.withColumn(column, 
                                   from_unixtime(col(column).cast(DoubleType()) / 1000000) + expr("INTERVAL 8 HOURS"))

# convert strings to date
for column in date_columns:
    df_typed = df_typed.withColumn(column, to_date(col(column), "yyyy-MM-dd"))

# end, let's see the data types
for field in df_typed.schema.fields:
    print(f"{field.name}: {field.dataType}")

Unnamed: 0: StringType()
amt: DoubleType()
category: StringType()
cc_bic: StringType()
cc_num: StringType()
is_fraud: BooleanType()
merch_eff_time: StringType()
merch_last_update_time: StringType()
merch_lat: DoubleType()
merch_long: DoubleType()
merch_zipcode: StringType()
merchant: StringType()
trans_date_trans_time: TimestampType()
trans_num: StringType()
person_name: StringType()
gender: StringType()
lat: DoubleType()
long: DoubleType()
city_pop: DoubleType()
job: StringType()
dob: DateType()
addr_street: StringType()
addr_city: StringType()
addr_state: StringType()
addr_zip: StringType()


In [11]:
df_typed.show(10, truncate=False)

+----------+------+-------------+-----------+----------------+--------+-------------------+----------------------+------------------+------------------+-------------+----------------------------------+---------------------+--------------------------------+------------------------+------+-------+------------------+--------+---------------------------------+----------+------------------------------+--------------+----------+--------+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num          |is_fraud|merch_eff_time     |merch_last_update_time|merch_lat         |merch_long        |merch_zipcode|merchant                          |trans_date_trans_time|trans_num                       |person_name             |gender|lat    |long              |city_pop|job                              |dob       |addr_street                   |addr_city     |addr_state|addr_zip|
+----------+------+-------------+-----------+----------------+--------+-------------------+----------------------+--------------

In [12]:
from pyspark.sql.functions import split, trim, when, regexp_extract, col, substring_index

# get first and last name
df_with_names = df_typed.withColumn(
    "first_name", 
    trim(substring_index(col("person_name"), ",", 1))
).withColumn(
    "last_name",
    trim(when(
        substring_index(substring_index(col("person_name"), ",", 2), ",", -1) == "",
        None
    ).otherwise(
        substring_index(substring_index(col("person_name"), ",", 2), ",", -1)
    ))
)

df_cleaned = df_with_names

In [13]:
df_with_names.select("person_name", "first_name", "last_name").show(10, truncate=False)

+------------------------+---------------+---------------+
|person_name             |first_name     |last_name      |
+------------------------+---------------+---------------+
|Jennifer,Banks,eeeee    |Jennifer       |Banks          |
|Stephanie,Gill,eeeee    |Stephanie      |Gill           |
|Edward@Sanchez          |Edward@Sanchez |Edward@Sanchez |
|Jeremy/White, !         |Jeremy/White   |!              |
|Tyler@Garcia            |Tyler@Garcia   |Tyler@Garcia   |
|Jennifer,Conner,eeeee   |Jennifer       |Conner         |
|Kelsey, , Richards NOOOO|Kelsey         |               |
|Steven, Williams        |Steven         |Williams       |
|Heather, , Chase NOOOO  |Heather        |               |
|Melissa@Aguilar         |Melissa@Aguilar|Melissa@Aguilar|
+------------------------+---------------+---------------+
only showing top 10 rows


some of the data seems to have special characters and also extra commas. shouldv'e saw that earlier

In [14]:
# simple name quality checks
from pyspark.sql.functions import col, trim, length, regexp_replace

# count null first names
null_first = df_cleaned.filter(col("first_name").isNull()).count()

# count null last names  
null_last = df_cleaned.filter(col("last_name").isNull()).count()

# count empty first names
empty_first = df_cleaned.filter(trim(col("first_name")) == "").count()

# count empty last names
empty_last = df_cleaned.filter(trim(col("last_name")) == "").count()

# check special characters
clean_pattern = regexp_replace(col("person_name"), "[^a-zA-Z, ]", "")
has_symbols = df_cleaned.filter(length(col("person_name")) != length(clean_pattern)).count()

total = df_cleaned.count()

print(f"Total rows: {total}")
print(f"Null first names: {null_first}")
print(f"Empty first names: {empty_first}")
print(f"Null last names: {null_last}")
print(f"Empty last names: {empty_last}")
print(f"Names with special characters: {has_symbols}")
print(f"Clean names: {total - has_symbols}")

# show some examples of names that have special characters
if has_symbols > 0:
    print("\nSample names with special characters:")
    df_cleaned.filter(length(col("person_name")) != length(clean_pattern)).select("person_name").show(5, truncate=False)

Total rows: 1296675
Null first names: 0
Empty first names: 0
Null last names: 0
Empty last names: 216717
Names with special characters: 647993
Clean names: 648682

Sample names with special characters:
+---------------+
|person_name    |
+---------------+
|Edward@Sanchez |
|Jeremy/White, !|
|Tyler@Garcia   |
|Melissa@Aguilar|
|Eddie|Mendez!!!|
+---------------+
only showing top 5 rows


the cleanup plan is :
 1. replace special characters (like @, |, !) with commas
 2. remove extra spaces and empty parts between commas  
 3. remove unwanted suffixes like "NOOOO" and "eeeee" (since it seem like theres only those suffixes maybe) 
 4. get only the content before the second comma

In [15]:
from pyspark.sql.functions import col, regexp_replace, substring_index, when, trim

# First, let's examine some examples to understand the pattern
print("Original person_name examples:")
df_typed.select("person_name").show(10, truncate=False)

#
df_cleaned = df_typed.withColumn(
    "person_name_cleaned", 
    regexp_replace(col("person_name"), "[^a-zA-Z, ]", ",")
).withColumn(
    "person_name_cleaned",
    regexp_replace(col("person_name_cleaned"), ",\\s*,", ",")  # remove empty parts like ", ,"
).withColumn(
    "person_name_cleaned",
    regexp_replace(col("person_name_cleaned"), "\\s+", " ")   # replace multiple spaces with single space
).withColumn(
    "person_name_cleaned",
    regexp_replace(col("person_name_cleaned"), ",\\s+", ",")  # remove spaces after commas
).withColumn(
    "person_name_cleaned",
    regexp_replace(col("person_name_cleaned"), ",?(NOOOO|eeeee).*$", "")  # remove NOOOO and eeeee suffixes
).withColumn(
    "person_name_cleaned",
    substring_index(col("person_name_cleaned"), ",", 2)
)

# Now extract first and last name from the cleaned field
df_cleaned = df_cleaned.withColumn(
    "first_name", 
    trim(substring_index(col("person_name_cleaned"), ",", 1))
).withColumn(
    "last_name",
    trim(when(
        substring_index(col("person_name_cleaned"), ",", -1) == substring_index(col("person_name_cleaned"), ",", 1),
        None
    ).otherwise(
        substring_index(col("person_name_cleaned"), ",", -1)
    ))
)

# Show the results of our cleaning
print("\nCleaned person_name examples:")
df_cleaned.select("person_name", "person_name_cleaned", "first_name", "last_name").show(80, truncate=False)

# Optionally, drop the temporary column if you want to keep your dataframe clean
df_cleaned = df_cleaned.drop("person_name_cleaned")
df_cleaned = df_cleaned.drop("person_name")

Original person_name examples:
+------------------------+
|person_name             |
+------------------------+
|Jennifer,Banks,eeeee    |
|Stephanie,Gill,eeeee    |
|Edward@Sanchez          |
|Jeremy/White, !         |
|Tyler@Garcia            |
|Jennifer,Conner,eeeee   |
|Kelsey, , Richards NOOOO|
|Steven, Williams        |
|Heather, , Chase NOOOO  |
|Melissa@Aguilar         |
+------------------------+
only showing top 10 rows

Cleaned person_name examples:
+--------------------------+---------------------+-----------+----------+
|person_name               |person_name_cleaned  |first_name |last_name |
+--------------------------+---------------------+-----------+----------+
|Jennifer,Banks,eeeee      |Jennifer,Banks       |Jennifer   |Banks     |
|Stephanie,Gill,eeeee      |Stephanie,Gill       |Stephanie  |Gill      |
|Edward@Sanchez            |Edward,Sanchez       |Edward     |Sanchez   |
|Jeremy/White, !           |Jeremy,White         |Jeremy     |White     |
|Tyler@Garcia    

In [16]:
# so, here it was we have
df_cleaned.show(100, truncate=False)

+----------+------+-------------+-----------+-------------------+--------+-------------------+----------------------+------------------+------------------+-------------+-------------------------------------------+---------------------+--------------------------------+------+-------+------------------+---------+---------------------------------------------+----------+-------------------------------+------------------------+----------+--------+-----------+----------+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num             |is_fraud|merch_eff_time     |merch_last_update_time|merch_lat         |merch_long        |merch_zipcode|merchant                                   |trans_date_trans_time|trans_num                       |gender|lat    |long              |city_pop |job                                          |dob       |addr_street                    |addr_city               |addr_state|addr_zip|first_name |last_name |
+----------+------+-------------+-----------+---------------