## 1. Load Data

In [0]:
from pyspark.sql.functions import explode, col, when, regexp_replace
from pyspark.sql.types import MapType, StringType

# Load transactions
transactions = spark.read.csv("/FileStore/transactions_data.csv", header=True, inferSchema=True)
# Remove "$" and cast variables
transactions = transactions.withColumn("amount", regexp_replace(col("amount"), r"[\$,]", "").cast("double"))
transactions.createOrReplaceTempView("transactions")

# Load cards
cards = spark.read.csv("/FileStore/cards_data.csv", header=True, inferSchema=True)
# Remove "$" and cast variables
cards = cards.withColumn("credit_limit", regexp_replace(col("credit_limit"), r"[\$,]", "").cast("double"))
cards.createOrReplaceTempView("cards")

# Load users
users = spark.read.csv("/FileStore/users_data.csv", header=True, inferSchema=True)
# Remove "$" and cast variables
users = users.withColumn("per_capita_income", regexp_replace(col("per_capita_income"), r"[\$,]", "").cast("double"))
users.createOrReplaceTempView("users")

# Load fraud labels
fraud_labels = spark.read.schema("target MAP<STRING, STRING>").json("/FileStore/train_fraud_labels.json")
fraud_labels = fraud_labels.selectExpr("explode(target) AS (transaction_id, fraud)")
fraud_labels = fraud_labels.withColumn("fraud", when(col("fraud") == "Yes", 1).otherwise(0))
fraud_labels.createOrReplaceTempView("fraud_labels")

## 2. Merge Data

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW merged_transactions AS
SELECT 
    t.id AS transaction_id,
    t.date,
    t.amount,
    t.merchant_city,
    t.merchant_state,
    t.zip,
    c.card_brand,
    c.card_type,
    c.credit_limit,
    u.current_age,
    u.per_capita_income,
    f.fraud
FROM transactions t
LEFT JOIN cards c ON t.card_id = c.id
LEFT JOIN users u ON t.client_id = u.id
LEFT JOIN fraud_labels f ON t.id = f.transaction_id;

## 3. Exploratory Data Analysis

### 3.1 Total Transactions and Fraudulent Transactions

In [0]:
%sql
SELECT 
    merchant_state,
    COUNT(*) AS total_transactions,
    SUM(fraud) AS fraudulent_transactions,
    ROUND(SUM(fraud) / COUNT(*) * 100, 2) AS fraud_percentage
FROM merged_transactions
GROUP BY merchant_state
ORDER BY fraud_percentage DESC
LIMIT 10;

merchant_state,total_transactions,fraudulent_transactions,fraud_percentage
Tuvalu,5,5,100.0
Haiti,386,253,65.54
Italy,7081,3061,43.23
,1563700,8779,0.56
OH,484122,316,0.07
SD,31090,8,0.03
MO,195854,32,0.02
IA,161261,32,0.02
VA,230685,26,0.01
SC,172753,17,0.01


Databricks visualization. Run in Databricks to view.

### 3.2 Fraud Distribution by State

In [0]:
%sql
SELECT 
    merchant_state,
    COUNT(*) AS total_transactions,
    SUM(fraud) AS fraudulent_transactions,
    ROUND(SUM(fraud) / COUNT(*) * 100, 2) AS fraud_percentage
FROM merged_transactions
GROUP BY merchant_state
ORDER BY fraud_percentage DESC
LIMIT 10;

merchant_state,total_transactions,fraudulent_transactions,fraud_percentage
Tuvalu,5,5,100.0
Haiti,386,253,65.54
Italy,7081,3061,43.23
,1563700,8779,0.56
OH,484122,316,0.07
SD,31090,8,0.03
MO,195854,32,0.02
IA,161261,32,0.02
VA,230685,26,0.01
SC,172753,17,0.01


### 3.3 Average Transaction Amount by Fraud Status

In [0]:
%sql
SELECT 
    fraud,
    ROUND(AVG(amount), 2) AS avg_transaction_amount
FROM merged_transactions
GROUP BY fraud;

fraud,avg_transaction_amount
,43.03
1.0,110.23
0.0,42.85


## 4. Derived Features

### 4.1 Add derived features

In [0]:
from pyspark.sql.functions import hour, dayofmonth, month, year, when

merged_transactions = spark.sql("SELECT * FROM merged_transactions")

# Create new time-related features
merged_transactions = merged_transactions.withColumn("hour", hour("date")) \
                                         .withColumn("day", dayofmonth("date")) \
                                         .withColumn("month", month("date")) \
                                         .withColumn("year", year("date")) \
                                         .withColumn("time_of_day", when((col("hour") >= 6) & (col("hour") < 18), "Day").otherwise("Night"))


### 4.2 Transactions by Time of Day

In [0]:
%sql
SELECT 
    time_of_day,
    COUNT(*) AS total_transactions,
    SUM(fraud) AS fraudulent_transactions
FROM (
    SELECT 
        CASE 
            WHEN hour(date) >= 6 AND hour(date) < 18 THEN 'Day'
            ELSE 'Night'
        END AS time_of_day,
        fraud
    FROM merged_transactions
) subquery
GROUP BY time_of_day
ORDER BY total_transactions DESC;

time_of_day,total_transactions,fraudulent_transactions
Day,10179626,11714
Night,3126289,1618


## 5. Clustering Analysis

### 5.1 Cluster Preparation

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Assemble features for clustering
assembler = VectorAssembler(inputCols=["amount", "credit_limit", "per_capita_income"], outputCol="features")
clustering_data = assembler.transform(merged_transactions)

kmeans = KMeans().setK(3).setSeed(42)  # Use 3 clusters
model = kmeans.fit(clustering_data)

# Predict clusters
clusters = model.transform(clustering_data)

# Cluster distribution
clusters.groupBy("prediction").count().show()

+----------+-------+
|prediction|  count|
+----------+-------+
|         1|3760457|
|         2| 199927|
|         0|9345531|
+----------+-------+



### 5.2 Inspect Characteristics

In [0]:
from pyspark.sql.functions import avg, sum, count, col

clusters.groupBy("prediction").agg(
    avg("amount").alias("avg_amount"),
    avg("credit_limit").alias("avg_credit_limit"),
    avg("per_capita_income").alias("avg_per_capita_income")
).show()

+----------+------------------+------------------+---------------------+
|prediction|        avg_amount|  avg_credit_limit|avg_per_capita_income|
+----------+------------------+------------------+---------------------+
|         1|51.317508284232524|26503.728852104945|   32236.042375168763|
|         2| 66.14250561454936| 64119.17460373036|    82768.49722148584|
|         0| 39.12400122689624|10103.170410434677|    19399.78950002948|
+----------+------------------+------------------+---------------------+



### 5.3 Identify High-Risk Groups

In [0]:
clusters.groupBy("prediction").agg(
    sum("fraud").alias("fraudulent_transactions"),
    count("*").alias("total_transactions"),
    (sum("fraud") / count("*") * 100).alias("fraud_rate")
).show()

+----------+-----------------------+------------------+-------------------+
|prediction|fraudulent_transactions|total_transactions|         fraud_rate|
+----------+-----------------------+------------------+-------------------+
|         1|                   3061|           3760457|0.08139968094303432|
|         2|                    165|            199927| 0.0825301234950757|
|         0|                  10106|           9345531|0.10813724763205002|
+----------+-----------------------+------------------+-------------------+



## 6. Fraud Detection Model

### 6.1 Model Preparation

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

assembler = VectorAssembler(inputCols=["amount", "current_age", "per_capita_income", "credit_limit"], outputCol="features")
transactions_model_data = assembler.transform(merged_transactions)

train_data, test_data = transactions_model_data.randomSplit([0.8, 0.2], seed=1234)

# Filter out rows with null or NaN values in the label column
train_data = train_data.filter(col("fraud").isNotNull())
test_data = test_data.filter(col("fraud").isNotNull())

# Train Random Forest model
rf = RandomForestClassifier(labelCol="fraud", featuresCol="features")
model = rf.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol="fraud", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC-AUC: {roc_auc}")

ROC-AUC: 0.6669766021787534


### 6.2 Results Summary

In [0]:
predictions.groupBy("fraud", "prediction").count().show()

+-----+----------+-------+
|fraud|prediction|  count|
+-----+----------+-------+
|    1|       0.0|   2626|
|    0|       0.0|1780696|
+-----+----------+-------+

