# Credit Card Fraud Detection 

This notebook demonstrates an end-to-end big data solution using Apache Spark on EMR. We:

- Ingest raw transaction data from S3,
- Clean and transform the data (feature engineering),
- Save the transformed data in Parquet (for Athena integration),
- Build a fraud detection model using Spark MLlib on the transformed data, and
- Create visualizations to evaluate the model.

Let's begin by initializing our Spark session.


In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession with required S3 configurations.
spark = SparkSession.builder \
    .appName("FraudDetectionDataProcessing") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("SparkSession started successfully!")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1741982580669_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession started successfully!

# Data Ingestion

loading the raw CSV data from S3. This dataset (Synthetic_Financial_datasets_log.csv)

In [2]:
# Define the S3 path to your dataset.
raw_data_path = "s3://projectfraudcredit/Synthetic_Financial_datasets_log.csv"

# Read the CSV file into a DataFrame with header and inferred schema.
df = spark.read.csv(raw_data_path, header=True, inferSchema=True)

# Preview the initial data.
print("Initial Data:")
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Initial Data:
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M123

# Data Cleaning

We remove duplicate rows, filter out rows with null values in critical columns (e.g., 'amount'), and add a unique transaction identifier.


In [3]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Drop duplicate rows across all columns.
df_cleaned = df.dropDuplicates()

# Filter out rows where 'amount' is null.
df_cleaned = df_cleaned.filter(col("amount").isNotNull())

# Create a unique transaction identifier.
df_cleaned = df_cleaned.withColumn("transaction_id", monotonically_increasing_id())

# Preview the cleaned data.
print("Cleaned Data:")
df_cleaned.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cleaned Data:
+----+-------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+
|step|   type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|transaction_id|
+----+-------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+
|  20|CASH_IN|147268.71|C2004857446|      18817.0|     166085.71|  C40874062|           0.0|           0.0|      0|             0|             0|
|  20|CASH_IN| 82455.73|C1240676145|   6689707.73|    6772163.45|C1984982364|    2053118.45|    1970662.72|      0|             0|             1|
|  20|PAYMENT| 17832.11| C521375485|      30304.0|      12471.89|M1921757665|           0.0|           0.0|      0|             0|             2|
|  20|CASH_IN| 70493.83|C2057775424|      6703.71|      77197.54| C374264195|      97419.46|      26925.63|   

# Feature Engineering

We compute additional features using window functions. In this example, we aggregate by the originator (nameOrig) to calculate:
- Transaction frequency,
- Total, average, and standard deviation of the amount, and
- The difference between the old and new balance.


In [4]:
from pyspark.sql.functions import count, sum as _sum, avg, stddev, expr
from pyspark.sql.window import Window

# Define a window partitioned by the originator (nameOrig)
windowSpec = Window.partitionBy("nameOrig")

# Compute new features.
df_features = df_cleaned.withColumn("transaction_frequency", count("transaction_id").over(windowSpec)) \
    .withColumn("total_amount", _sum("amount").over(windowSpec)) \
    .withColumn("avg_amount", avg("amount").over(windowSpec)) \
    .withColumn("std_dev_amount", stddev("amount").over(windowSpec)) \
    .withColumn("balance_diff", expr("oldbalanceOrg - newbalanceOrig"))

# Preview the DataFrame with engineered features.
print("Data with Engineered Features:")
df_features.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data with Engineered Features:
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+------------+----------+--------------+-------------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|transaction_id|transaction_frequency|total_amount|avg_amount|std_dev_amount|       balance_diff|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+------------+----------+--------------+-------------------+
| 385| CASH_IN|139018.58|C1000021278|   4886789.95|    5025808.54| C174750132|     411015.63|     271997.05|      0|             0|  120259328329|                    1|   139018.58| 139018.58|          NULL|-139018.58999999985|
| 352|CASH_OUT|406150.63|C1000036439|      49511.0|      

# Saving Transformed Data

We save the transformed DataFrame in Parquet format to S3. This data is used to create an Athena external table for efficient querying.


In [5]:
# Define the S3 path where the transformed data will be saved.
transformed_data_path = "s3://projectfraudcredit/transformed-data/"

# Write the DataFrame as Parquet, overwriting any existing data.
df_features.write.mode("overwrite").parquet(transformed_data_path)

print("Transformed data successfully saved to:", transformed_data_path)

# To simulate Athena integration, we can read the Parquet data back.
df_features = spark.read.parquet(transformed_data_path)
df_features.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Transformed data successfully saved to: s3://projectfraudcredit/transformed-data/
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+------------+----------+--------------+------------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|transaction_id|transaction_frequency|total_amount|avg_amount|std_dev_amount|      balance_diff|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+------------+----------+--------------+------------------+
|  12|CASH_OUT|367527.28|C1000012640|          0.0|           0.0|C1239707538|    2411831.94|     2824848.7|      0|             0|   25769839836|                    1|   367527.28| 367527.28|          NULL|               0.0|
| 204| CAS

In [6]:
sc.install_pypi_package("boto3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3
  Downloading boto3-1.37.13-py3-none-any.whl (139 kB)
Collecting s3transfer<0.12.0,>=0.11.0
  Downloading s3transfer-0.11.4-py3-none-any.whl (84 kB)
Collecting botocore<1.38.0,>=1.37.13
  Downloading botocore-1.37.13-py3-none-any.whl (13.4 MB)
Installing collected packages: botocore, s3transfer, boto3
Successfully installed boto3-1.37.13 botocore-1.37.13 s3transfer-0.11.4


In [7]:
sc.install_pypi_package("numpy")
sc.install_pypi_package("pandas")
sc.install_pypi_package("scikit-learn")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting numpy
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Installing collected packages: numpy
Successfully installed numpy-2.0.2

Collecting pandas
  Downloading pandas-2.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
Collecting tzdata>=2022.7
  Downloading tzdata-2025.1-py2.py3-none-any.whl (346 kB)
Collecting python-dateutil>=2.8.2
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Installing collected packages: tzdata, python-dateutil, pandas
  Attempting uninstall: python-dateutil
    Found existing installation: python-dateutil 2.8.1
    Not uninstalling python-dateutil at /usr/lib/python3.9/site-packages, outside environment /mnt/yarn/usercache/livy/appcache/application_1741982580669_0001/container_1741982580669_0001_01_000001/tmp/spark-e261b25f-5ad7-452a-8746-75e933d9a9ec
    Can't uninstall 'python-dateutil'. No files were found to uninstall.
Successfully installed pandas-2.2.3 pyt

# Machine Learning: Fraud Detection Model

We assemble features into a vector, split the data into training and test sets, and build a Logistic Regression model using Spark MLlib. The label for fraud detection is assumed to be in the 'isFraud' column.


In [8]:
# Importing necessary ML classes from Spark MLlib evaluators.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Model Training & Predictions

# Defining the feature columns.
feature_cols = ["amount", "transaction_frequency", "total_amount", "avg_amount", "std_dev_amount", "balance_diff"]

# Creating the VectorAssembler with handleInvalid="skip" to ignore rows with nulls in feature columns.
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
data = assembler.transform(df_features)

# Filtering out rows where the label (isFraud) is null.
data = data.filter(data.isFraud.isNotNull())

# Splitting data into training (70%) and test (30%) sets.
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Initialize and train a Logistic Regression model.
lr = LogisticRegression(labelCol="isFraud", featuresCol="features", maxIter=10)
model = lr.fit(train_data)

# Making predictions on test data.
predictions = model.transform(test_data)

# ---------------------------
# Model Evaluation using Spark Evaluators

# Evaluating ROC-AUC using BinaryClassificationEvaluator.
roc_evaluator = BinaryClassificationEvaluator(labelCol="isFraud", metricName="areaUnderROC")
roc_auc = roc_evaluator.evaluate(predictions)
print("Test ROC-AUC:", roc_auc)

# Evaluating accuracy, precision, recall, and F1 using MulticlassClassificationEvaluator.
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print("Test Accuracy:", accuracy)

precision_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print("Test Precision:", precision)

recall_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print("Test Recall:", recall)

f1_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions)
print("Test F1 Score:", f1_score)

# ---------------------------
# Confusion Matrix (using Pandas & scikit-learn)

# Converting predictions DataFrame to Pandas for a detailed confusion matrix.
preds_pd = predictions.select("isFraud", "prediction").toPandas()

# Import confusion_matrix from scikit-learn.
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(preds_pd["isFraud"], preds_pd["prediction"])
print("Confusion Matrix:")
print(cm)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test ROC-AUC: 0.9901976243035846
Test Accuracy: 0.9994488333639537
Test Precision: 0.9994491373720265
Test Recall: 0.9994488333639537
Test F1 Score: 0.9993737503002043
Confusion Matrix:
[[5436    0]
 [   3    4]]

# Visualizations

We create visualizations to represent our model performance and key data distributions. Here, we will plot the ROC curve using Matplotlib and scikit-learn.


In [9]:
sc.install_pypi_package("python-dateutil==2.8.2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting python-dateutil==2.8.2
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil
  Attempting uninstall: python-dateutil
    Found existing installation: python-dateutil 2.9.0.post0
    Uninstalling python-dateutil-2.9.0.post0:
      Successfully uninstalled python-dateutil-2.9.0.post0
Successfully installed python-dateutil-2.8.2


In [10]:
sc.install_pypi_package("matplotlib")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting matplotlib
  Downloading matplotlib-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.3 MB)
Collecting contourpy>=1.0.1
  Downloading contourpy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (321 kB)
Collecting pillow>=8
  Downloading pillow-11.1.0-cp39-cp39-manylinux_2_28_x86_64.whl (4.5 MB)
Collecting kiwisolver>=1.3.1
  Downloading kiwisolver-1.4.7-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.6 MB)
Collecting cycler>=0.10
  Downloading cycler-0.12.1-py3-none-any.whl (8.3 kB)
Collecting fonttools>=4.22.0
  Downloading fonttools-4.56.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.6 MB)
Collecting importlib-resources>=3.2.0
  Downloading importlib_resources-6.5.2-py3-none-any.whl (37 kB)
Collecting zipp>=3.1.0
  Downloading zipp-3.21.0-py3-none-any.whl (9.6 kB)
Installing collected packages: zipp, pillow, kiwisolver, importlib-resources, fonttools, cycler, contourpy, matplotlib
Successfully installed contourpy-1.3

In [17]:
import matplotlib.pyplot as plt
import numpy as np
import io
import boto3
from sklearn.metrics import roc_curve, auc, confusion_matrix


s3_bucket = "projectfraudcredit"
s3_client = boto3.client("s3")

def upload_plot_to_s3(s3_key):
    """
    Uploads the current matplotlib figure directly to S3 using an in-memory buffer.
    """
    buf = io.BytesIO()
    plt.savefig(buf, format='png', bbox_inches='tight')
    buf.seek(0)
    s3_client.upload_fileobj(buf, s3_bucket, s3_key)
    print(f"Plot uploaded to: s3://{s3_bucket}/{s3_key}")
    plt.close()

# -------------------------------------
# 1. Prepare Data for Visualizations
# Convert predictions to a Pandas DataFrame and extract the probability for class 1.
preds_pd = predictions.select("isFraud", "prediction", "probability").toPandas()
preds_pd["probability_class1"] = preds_pd["probability"].apply(lambda x: x[1] if isinstance(x, (list, tuple)) else x[1])

# -------------------------------------
# 2. ROC Curve Visualization
fpr, tpr, thresholds = roc_curve(preds_pd["isFraud"], preds_pd["probability_class1"])
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random Guess')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='best')
upload_plot_to_s3("plots/roc_curve.png")

# -------------------------------------
# 3. Confusion Matrix Visualization
cm = confusion_matrix(preds_pd["isFraud"], preds_pd["prediction"])

plt.figure(figsize=(6, 5))
plt.matshow(cm, cmap=plt.cm.Blues, fignum=1)
plt.title('Confusion Matrix', pad=20)
plt.colorbar()
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
# Annotate each cell with its count.
for (i, j), value in np.ndenumerate(cm):
    plt.text(j, i, f'{value}', ha='center', va='center', color='red')
upload_plot_to_s3("plots/confusion_matrix.png")

# -------------------------------------
# 4. Logistic Regression Feature Coefficients Visualization
coefficients = model.coefficients.toArray()
feature_names = feature_cols

plt.figure(figsize=(8, 6))
plt.bar(feature_names, coefficients)
plt.xlabel('Features')
plt.ylabel('Coefficient Value')
plt.title('Logistic Regression Feature Coefficients')
plt.xticks(rotation=45)
upload_plot_to_s3("plots/feature_coefficients.png")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Plot uploaded to: s3://projectfraudcredit/plots/roc_curve.png
Plot uploaded to: s3://projectfraudcredit/plots/confusion_matrix.png
Plot uploaded to: s3://projectfraudcredit/plots/feature_coefficients.png