In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, from_unixtime, from_utc_timestamp
from pyspark.sql.functions import date_format, regexp_replace, split, expr, sha2, count, when, trim
from pyspark.sql.types import StructType, StringType
from pyspark.sql.functions import udf
from datetime import datetime
import pandas as pd

filepath = 'dbfs:/Workspace/Users/imran.syafi/cc_sample_transaction.json'

spark = SparkSession.builder.master("local").appName("test").getOrCreate()

df_spark = spark.read.json(filepath)

personal_detail_schema = StructType() \
    .add("person_name", StringType()) \
    .add("gender", StringType()) \
    .add("address", StringType()) \
    .add("lat", StringType()) \
    .add("long", StringType()) \
    .add("city_pop", StringType()) \
    .add("job", StringType()) \
    .add("dob", StringType())

df_parsed = df_spark.withColumn("personal", from_json(col("personal_detail"), personal_detail_schema))

address_schema = StructType() \
    .add("street", StringType()) \
    .add("city", StringType()) \
    .add("state", StringType()) \
    .add("zip", StringType())

df_with_address = df_parsed.withColumn("address_struct", from_json(col("personal.address"), address_schema))

df_flat = df_with_address \
    .withColumn("person_name", col("personal.person_name")) \
    .withColumn("gender", col("personal.gender")) \
    .withColumn("lat", col("personal.lat")) \
    .withColumn("long", col("personal.long")) \
    .withColumn("city_pop", col("personal.city_pop")) \
    .withColumn("job", col("personal.job")) \
    .withColumn("dob", col("personal.dob")) \
    .withColumn("street", col("address_struct.street")) \
    .withColumn("city", col("address_struct.city")) \
    .withColumn("state", col("address_struct.state")) \
    .withColumn("zip", col("address_struct.zip"))

df_flat = df_flat.drop("personal", "personal_detail", "address_struct")

df_flat.printSchema()
df_flat.display()
df_flat.count()

## Timestamp Conversion

In [0]:
df_flat = df_flat.withColumn(
    "trans_date_trans_time_fmt", 
    date_format(
        from_utc_timestamp(to_timestamp("trans_date_trans_time", "yyyy-MM-dd HH:mm:ss"), "Asia/Kuala_Lumpur"),
        "yyyy-MM-dd HH:mm:ss.SSSSSS Z"
    )
)
df_flat = df_flat.drop("trans_date_trans_time").withColumnRenamed("trans_date_trans_time_fmt", "trans_date_trans_time")

df_flat = df_flat.withColumn(
    "merch_last_update_time_fmt",
    date_format(
        from_utc_timestamp(
            from_unixtime(col("merch_last_update_time") / 1000),
            "Asia/Kuala_Lumpur"
        ),
        "yyyy-MM-dd HH:mm:ss.SSSSSS Z"
    )
)
df_flat = df_flat.drop("merch_last_update_time").withColumnRenamed("merch_last_update_time_fmt", "merch_last_update_time")

df_flat = df_flat.withColumn(
    "merch_eff_time_fmt",
    date_format(
        from_utc_timestamp(
            from_unixtime(col("merch_eff_time") / 1_000_000),
            "Asia/Kuala_Lumpur"
        ),
        "yyyy-MM-dd HH:mm:ss.SSSSSS Z"
    )
)
df_flat = df_flat.drop("merch_eff_time").withColumnRenamed("merch_eff_time_fmt", "merch_eff_time")

## Name Cleaning

In [0]:
df_flat = df_flat.withColumn(
    "person_name_cleaned",
    regexp_replace("person_name", r"[^a-zA-Z0-9,]", ",")
)
df_flat = df_flat.withColumn(
    "person_name_cleaned",
    regexp_replace("person_name_cleaned", r"(eeeee|NOOOO)", "")
)
df_flat = df_flat.withColumn(
    "person_name_cleaned",
    regexp_replace("person_name_cleaned", r"\s+", "")
)
df_flat = df_flat.withColumn(
    "name_parts_filtered",
    expr("""
        filter(split(person_name_cleaned, ","), x -> x != "")
    """)
)
df_flat = df_flat.withColumn(
    "cleaned_name",
    expr("concat_ws(' ', name_parts_filtered)")
)
df_flat = df_flat.withColumn(
    "first",
    expr("split(cleaned_name, ' ')[0]")
).withColumn(
    "last",
    expr("split(cleaned_name, ' ')[size(split(cleaned_name, ' ')) - 1]")
)

df_flat = df_flat.drop("person_name", "person_name_cleaned", "name_parts_filtered", "cleaned_name")


## Handling PII data

In [0]:
def mask_dob(dob_str):
    try:
        dt = datetime.strptime(dob_str, "%Y-%m-%d")
        return f"{dt.year}-XX-XX"
    except Exception as e:
        return None  # Handle invalid dates

def mask_name(name):
    return name[0] + "*" * (len(name) - 1) if name else ""  # Ensure empty names are handled

def hash_column(column_name):
    return sha2(col(column_name), 256)

mask_dob_udf = udf(mask_dob, StringType())
mask_name_udf = udf(mask_name, StringType())

df_pii_handled = df_flat.withColumn(
    "dob_masked", mask_dob_udf(col("dob"))
)

df_pii_handled = df_pii_handled.withColumn(
    "first_name_masked", mask_name_udf(col("first"))
).withColumn(
    "last_name_masked", mask_name_udf(col("last"))
)

df_pii_handled = df_pii_handled.withColumn(
    "cc_num_hashed", hash_column("cc_num")
).withColumn(
    "street_hashed", hash_column("street")
)

df_pii_handled = df_pii_handled.drop("first", "last", "dob", "cc_num", "street")

df_pii_handled = df_pii_handled.withColumnRenamed("first_name_masked", "first") \
                               .withColumnRenamed("last_name_masked", "last") \
                               .withColumnRenamed("cc_num_hashed", "cc_num") \
                               .withColumnRenamed("dob_masked", "dob") \
                               .withColumnRenamed("street_hashed", "street")

df_pii_handled.display()
df_pii_handled.printSchema()

## Data Quality Checks

this is a very simple check that I have been practicing through the years, it helps me quickly identify holes in the data, from here I will then reach out to the stakeholders to discuss the way forward, whether filling this in with 0, averages, median or any default values. (it will depend on what the field actually is)

In [0]:
total_rows = df_pii_handled.count()

summary_data = []
for c in df_pii_handled.columns:
    non_null_count = df_pii_handled.select(count(col(c)).alias("cnt")).collect()[0]["cnt"]
    null_count = total_rows - non_null_count
    empty_string_count = df_pii_handled.filter(trim(col(c)) == "").count()

    summary_data.append({
        "name": c,
        "total": total_rows,
        "non_nulls": non_null_count,
        "nulls": null_count,
        "empty_strings": empty_string_count
    })

summary_df = pd.DataFrame(summary_data)
display(summary_df)


### Visualization and Analysis


In [0]:
df_pandas = df_pii_handled.toPandas()

df_pandas["is_fraud"] = pd.to_numeric(df_pandas["is_fraud"], errors="coerce")
df_pandas["amt"] = pd.to_numeric(df_pandas["amt"], errors="coerce")

df_pandas = df_pandas.dropna(subset=["is_fraud", "amt"])

total_transactions = len(df_pandas)
total_fraud_cases = df_pandas["is_fraud"].sum()
fraud_rate = total_fraud_cases / total_transactions * 100
total_amount = df_pandas["amt"].sum()
fraud_amount = df_pandas[df_pandas["is_fraud"] == 1]["amt"].sum()
fraud_amount_pct = fraud_amount / total_amount * 100

print("High-Level Summary Metrics:")
print(f"Total Transactions: {total_transactions}")
print(f"Total Fraud Cases: {total_fraud_cases} ({fraud_rate:.2f}%)")
print(f"Total Transaction Amount: ${total_amount:.2f}")
print(f"Fraudulent Transaction Amount: ${fraud_amount:.2f} ({fraud_amount_pct:.2f}%)\n")


In [0]:
fraud_rate_by_category = df_pandas.groupby("category")["is_fraud"].mean().sort_values(ascending=False).reset_index()

plt.figure(figsize=(10, 6))
sns.barplot(data=fraud_rate_by_category, x="is_fraud", y="category", palette="rocket")
plt.xlabel("Fraud Rate")
plt.ylabel("Merchant Category")
plt.title("Fraud Rate by Merchant Category")
plt.tight_layout()
plt.show()

##Transaction Amount Distribution (Fraud vs Non-Fraud)

In [0]:
plt.figure(figsize=(10, 6))
sns.boxplot(data=df_pandas, x="is_fraud", y="amt", palette="Set2")
plt.xticks([0, 1], ["Non-Fraud", "Fraud"])
plt.title("Transaction Amount Distribution: Fraud vs Non-Fraud")
plt.ylabel("Transaction Amount ($)")
plt.xlabel("Fraud Status")
plt.show()

##7-Day Rolling Average of Fraud
Lets us see if fraud cases are on the rise, and plan ahead.

In [0]:
df_pandas["date"] = pd.to_datetime(df_pandas["trans_date_trans_time"]).dt.date

daily_fraud = df_pandas.groupby("date")["is_fraud"].sum().rolling(window=7).mean()

plt.figure(figsize=(12, 5))
daily_fraud.plot()
plt.title("7-Day Rolling Avg of Fraud Cases")
plt.xlabel("Date")
plt.ylabel("Fraud Cases Count")
plt.grid(True)
plt.show()

## Top 10 Fraudulent Merchants
Simple targetting of merchant to be taken further actions, suspend / penalty

In [0]:
top_merchants = df_pandas[df_pandas["is_fraud"] == 1]["merchant"].value_counts().head(10)

plt.figure(figsize=(10, 6))
top_merchants.plot(kind="barh", color="orange")
plt.xlabel("Fraud Cases")
plt.title("Top 10 Merchants with Most Fraud Cases")
plt.gca().invert_yaxis()  # To display the highest fraud merchant on top
plt.tight_layout()
plt.show()

#Handling PII Data: 
Clearly explain your chosen methods for managing
personally identifiable information (PII).
---------------------------------------------
1) Date of Birth
I kept just the year (e.g., “YYYY-XX-XX”) to allow for analysis like age-based trends, but removed the month and day to reduce the risk of identifying someone, especially when whoever it is has access to the other pieces of data.

2) Name (First and Last): I masked part of the name (e.g., “Edward” → “E*****”) to keep things anonymous while still allowing for useful tasks like grouping or sorting.

3) Credit Card Number and Street Address: I hashed these fields using SHA-256 to completely anonymize them while still keeping their uniqueness, allowing it to still be useful for analytical use.

###These will largely depend on company policies, the stakeholders involved, and the specific use case of the data, some fields might not be tackled here, but are considered PII by company for example.

#Data Quality Assurance:
Describe how you identify and process dirty
data.

Theres a part in my notebook that I have been practicing through the years, it helps me quickly identify holes in the data, from here I will then reach out to the stakeholders to discuss the way forward, whether filling this in with 0, averages, median or any default values. (it will depend on what the field actually is)

In cases where we encounter more complex dirty data, like issues with names or other sensitive fields, those will need to be handled separately.