In [23]:
%env PYSPARK_PYTHON=python

env: PYSPARK_PYTHON=python


Initialize the spark session and import all the necessary libraries.

Sets the default tz to Malaysia for the later date transformations

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, udf, from_unixtime, concat_ws, substring, convert_timezone, lit, sha2, floor, datediff, current_date, to_date, desc
import re
import plotly.express as px
import pandas as pd



In [25]:
# Initialize SparkSession
spark = SparkSession.builder.appName("file_processing") \
    .config("spark.driver.memory", "4g") \
    .config('spark.log.level', 'DEBUG') \
    .config("spark.sql.session.timeZone", "Asia/Kuala_Lumpur") \
    .getOrCreate()

Unnest everything into a tabular format

In [26]:

# Read JSON file into a DataFrame
df = spark.read.json("cc_sample_transaction.json")


# Schema for personal_detail
personal_detail_schema = StructType([
    StructField("person_name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("address", StringType(), True),  # address is a stringified JSON
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
    StructField("city_pop", StringType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True)
])

# Schema for address
address_schema = StructType([
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True)
])

# Parse personal_detail into struct
df = df.withColumn("personal_detail_json", from_json("personal_detail", personal_detail_schema))
# Parse nested address inside personal_detail
df = df.withColumn("address_json", from_json(col("personal_detail_json.address"), address_schema))

df = df.select(
    col("Unnamed: 0"),
    "trans_date_trans_time",
    "cc_num",
    "merchant",
    "category",
    "amt",
    col("personal_detail_json.person_name"),
    col("personal_detail_json.gender"),
    col("address_json.street").alias("street"),
    col("address_json.city").alias("city"),
    col("address_json.state").alias("state"),
    col("address_json.zip").alias("zip"),
    col("personal_detail_json.lat").alias("lat"),
    col("personal_detail_json.long").alias("long"),
    col("personal_detail_json.city_pop"),
    col("personal_detail_json.job"),
    col("personal_detail_json.dob"),
    "trans_num",
    "merch_lat",
    "merch_long",
    "is_fraud",
    "merch_zipcode",
    "merch_last_update_time",
    "merch_eff_time",
    "cc_bic",
)





Quick check if there are any duplicates

In [None]:
initial_count = df.count()
df = df.dropDuplicates()
dropped_count = initial_count - df.count()
print(f"Number of rows dropped: {dropped_count}")

Cleans the names into first and last names

With the assumption that there are only 2 valid 'words' in a name, we can remove all special characters, trim all whitespace then split by space.
This will result in something like: 
| Raw | Cleaned |
| --- | ------- |
| Heather, , Chase NOOOO | [Heather, Chase] |
| Melissa@Aguilar | [Melissa, Aguilar] |
| Eddie\|Mendez!!! | [Eddie, Mendez] |

We also convert the unix dates into a timestamp in the local timezone, do some calculation to get the user's current age using `dob` and bin the amount by range



In [28]:
def clean_name(name):
    cleaned = re.sub('[^a-zA-Z0-9 \n\.]', ' ', name) # remove special characters
    cleaned = re.sub(r'\s+', ' ', cleaned).strip() # remove whitspace
    cleaned = cleaned.split(" ")

    return cleaned

def amount_range(amt):
    amt = float(amt)
    if amt < 10:
        return "0-10"
    elif amt < 50:
        return "10-50"
    elif amt < 100:
        return "50-100"
    elif amt < 500:
        return "100-500"
    elif amt < 1000:
        return "500-1000"
    else:
        return "1000+"

clean_name_udf = udf(lambda x:clean_name(x),ArrayType(StringType()))

df = df.withColumn("first", clean_name_udf(col("person_name"))[0])
df = df.withColumn("last", clean_name_udf(col("person_name"))[1])
df = df.drop("person_name")

# transform date columns
# standard pyspark from_unixtime function does not work with milliseconds
# using the following workaround works https://stackoverflow.com/questions/61246054/convert-unix-timestamp-into-ms-timestamp-in-pyspark
df = df.withColumn("merch_last_update_time", concat_ws(".",from_unixtime(substring(col("merch_last_update_time"),0,10),"yyyy-MM-dd HH:mm:ss"),substring(col("merch_last_update_time"),-3,3)))
df = df.withColumn("merch_eff_time", concat_ws(".",from_unixtime(substring(col("merch_eff_time"),0,10),"yyyy-MM-dd HH:mm:ss"),substring(col("merch_eff_time"),-3,3)))
# we assume the default trx date is in UTC
df = df.withColumn("trans_date_trans_time", convert_timezone(lit('UTC'), lit('Asia/Kuala_Lumpur'), df.trans_date_trans_time))

df = df.withColumn("age", floor(datediff(current_date(), to_date(col('dob'), 'yyyy-MM-dd'))/365.25))

amount_range_udf = udf(lambda x:amount_range(x),StringType())

df = df.withColumn("amt_range", amount_range_udf(col("amt")))


For PII handling we can mask private data using hash, this ensures its unrecognizable by the whoever is the recipient of this data

In [29]:
df = df.withColumn('first', sha2(col('first'), 256))
df = df.withColumn('last', sha2(col('last'), 256))  
df = df.withColumn('dob', sha2(col('dob'), 256))
df = df.withColumn('street', sha2(col('street'), 256))

Data preparation for analysis on frauds

In [38]:

most_frauds_merchant = df.groupBy("merchant","is_fraud").count().filter(df.is_fraud == '1').orderBy(desc('count'))
most_frauds_category = df.groupBy("category","is_fraud").count().filter(df.is_fraud == '1').orderBy(desc('count'))
most_frauds_amount = df.groupBy("amt_range","is_fraud").count().filter(df.is_fraud == '1').orderBy(desc('count'))
fraud_by_location = df.filter(df.is_fraud == '1').groupBy("state").count().orderBy(desc('count'))



In [40]:
import plotly.express as px

# Convert from Spark to Pandas
most_frauds_merchant_pd = most_frauds_merchant.toPandas()

fig = px.bar(
    most_frauds_merchant_pd.head(20),  # top 20 merchants
    x='merchant',
    y='count',
    color='count',
    title='Top Merchants Involved in Fraudulent Transactions',
    labels={'count': 'Fraud Count'},
)

fig.update_layout(xaxis_tickangle=-45)
fig.show()


In [41]:
most_frauds_category_pd = most_frauds_category.toPandas()

fig = px.bar(
    most_frauds_category_pd,
    x='category',
    y='count',
    title='Fraudulent Transactions by Category',
    labels={'count': 'Fraud Count'},
    color='category'
)

fig.update_layout(xaxis_tickangle=-45)
fig.show()


In [42]:
most_frauds_amount_pd = most_frauds_amount.toPandas()

fig = px.bar(
    most_frauds_amount_pd,
    x='amt_range',
    y='count',
    title='Fraudulent Transactions by Amount Range',
    labels={'count': 'Fraud Count', 'amt_range': 'Amount Range'},
    color='amt_range'
)

fig.update_layout(xaxis_tickangle=-45)
fig.show()


In [None]:
fig = px.choropleth(
    fraud_by_location.toPandas(),
    locations='state',        # column with state codes
    locationmode='USA-states',
    color='count',            
    scope="usa",              
    color_continuous_scale="Viridis", 
    labels={'count': 'Fraud Count'},
    title='Fraud Counts by US State'
)

fig.show()


# Findings From The Data

### Top merchants involved in fraudenlent transactions:
#### Identify possibly compromised or suspicious merchants.

1. Rau and Sons - 49
2. Kozey-Boehm  - 48
3. Cormier-LLC  - 48

### Top Fraudulent Transactions by Category
#### Which spending categories are most prone to fraud?

1. grocery_pos  - 1,743
2. shopping_net - 1,713
3. misc_net     - 915

### Most Common Fraudulent Transactions by Amount Range
#### Identify possible patterns in fraudenlent transactions to flag as suspicious.

1. $500 - $1,000 - 2,698
2. $100 - $500   - 2,206
3. $10 - $50     - 1,082

### Most Common US State for Fraud to Occur In
#### Which state is fraud more likely to occur in? 

1. NY - 555
2. TX - 479
3. PA - 458

In [None]:
spark.stop()
