In [56]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CreditCardTxnAnalysis").getOrCreate()



In [57]:
df = spark.read.option("header",True).option("inferSchema",True).csv("credit_card_transactions.csv")

In [58]:
df.printSchema()

root
 |-- txn_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- merchant: string (nullable = true)
 |-- location: string (nullable = true)
 |-- txn_type: string (nullable = true)
 |-- is_fraud: integer (nullable = true)



# Data Cleaning


In [None]:
from pyspark.sql.functions import col, isnan, when, count

df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c)
    for c in df.columns
]).show()

In [60]:
#Drop rows with missing txn_id, user_id, or timestamp
df = df.dropna(subset=["txn_id", "user_id", "timestamp"])

In [61]:
#cast amount to float and filter out -ve amount numbers
df = df.withColumn("amount",col("amount").cast("float"))
df = df.filter(col("amount")>0)


In [63]:
#Convert timestamp to timestamp type

from pyspark.sql.functions import to_timestamp

df = df.withColumn("timestamp", to_timestamp("timestamp", "M/d/yyyy HH:mm"))


In [64]:
df.select("timestamp").show(5, truncate=False)


+-------------------+
|timestamp          |
+-------------------+
|2025-09-02 21:48:00|
|2025-09-02 21:46:00|
|2025-09-02 21:46:00|
|2025-09-02 21:46:00|
|2025-09-02 21:45:00|
+-------------------+
only showing top 5 rows



In [66]:
#extracting day, hour and date from timestamp
from pyspark.sql.functions import to_date, hour, date_format

df = df.withColumn("txn_date", to_date("timestamp"))
df = df.withColumn("txn_hour", hour("timestamp"))
df = df.withColumn("txn_dayofweek", date_format("timestamp","EEEE"))

In [69]:
#is weekend column
df = df.withColumn("is_weekend", when(df["txn_dayofweek"].isin("Saturday","Sunday"),1).otherwise(0))

In [97]:
#create region column

from pyspark.sql.functions import split

df = df.withColumn("region", split(col("location"), ", ")[1])

In [74]:
#new column for mid-night time transaction
df = df.withColumn(
"is_night_txn",
    when(
    (col("txn_hour")>=0) & (col("txn_hour")<6),1
    ).otherwise(0)
)

In [76]:
#high value transaction

df = df.withColumn(
"is_high_value_txn",
    when(
    col("amount")>500,1
    ).otherwise(0)
)

In [79]:
#fill null merchant values with UNKNOWN

df = df.fillna({"merchant":"UNKNOWN"}) 

In [None]:
#Total Spend, Transaction Count, and Fraud Rate per User

from pyspark.sql.functions import count, sum, round

user_summary = df.groupBy("user_id").agg(
    count("*").alias("total_txns"),
    round(sum("amount"), 2).alias("total_spent"),
    sum("is_fraud").alias("fraud_txns"),
    round((sum("is_fraud") / count("*")) * 100, 2).alias("fraud_rate_percent"),
    sum("is_night_txn").alias("night_txns"),
    sum("is_high_value_txn").alias("high_value_txns")
)

user_summary.orderBy("fraud_rate_percent", ascending=False).show(10)


In [89]:
# Playing with UDFs & Window Functions 

#Lets create merchant fraud summary table
merchant_risk_df = df.groupBy("merchant").agg(
    sum("is_fraud").alias("fraud_txns"),
    count("*").alias("total_txns")
).withColumn(
    "fraud_rate", round(col("fraud_txns") / col("total_txns"), 2)
)

In [None]:
#Check fraud rate
merchant_risk_df.show(5)

In [92]:
#udf's to clasify risk based on fraud_rate

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def classify_merchant(fraud_rate):
    if fraud_rate >= 0.05:
        return "High Risk"
    elif fraud_rate >= 0.02:
        return "Moderate Risk"
    else:
        return "Safe"

merchant_risk_udf = udf(classify_merchant, StringType())


In [None]:
merchant_risk_df = merchant_risk_df.withColumn(
    "risk_category", merchant_risk_udf(col("fraud_rate"))
)

merchant_risk_df.show()


In [None]:
#Rank Top Spending Users per Region using Window Functions

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("region").orderBy(col("amount").desc())

df_with_rank = df.withColumn("spend_rank_in_region", row_number().over(window_spec))

df_with_rank.select("user_id", "amount", "region", "spend_rank_in_region").filter("spend_rank_in_region <= 3").show()


In [100]:
#Flag possible duplicate transactions

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, unix_timestamp

# Only run if timestamp still exists
if "timestamp" in df.columns:
    window_spec = Window.partitionBy("user_id").orderBy("timestamp")

    df = df.withColumn("prev_txn_time", lag("timestamp").over(window_spec))
    df = df.withColumn("time_diff_sec", 
                       unix_timestamp("timestamp") - unix_timestamp("prev_txn_time"))
    df = df.withColumn("is_possible_duplicate", when(col("time_diff_sec") < 60, 1).otherwise(0))


In [103]:
# 1. Aggregate and rank users by total spend per region
from pyspark.sql.functions import sum
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

user_spend = df.groupBy("user_id", "region").agg(
    sum("amount").alias("total_spent")
)

window_spec = Window.partitionBy("region").orderBy(col("total_spent").desc())

top_spenders = user_spend.withColumn("rank", row_number().over(window_spec)).filter("rank <= 3")


In [106]:
import pandas as pd

pdf = pd.DataFrame(top_spenders.collect(), columns=top_spenders.columns)
pdf.to_excel("top_spenders_summary.xlsx", index=False)