<a href="https://colab.research.google.com/github/AsifK24-8030/fraud-detection-pyspark/blob/main/Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.sql.functions import col, sum as _sum, when

In [2]:
# Step 1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("FraudDetection") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.rpc.message.maxSize", "512") \
    .getOrCreate()

In [3]:
# Loading the dataset(ratings) from a CSV file into a Spark DataFrame
creditcard_df = spark.read.csv(r'/content/creditcard.csv', inferSchema=True, header=True, quote='"')

In [4]:
#Showing first few rows of the DataFrame
creditcard_df.show(10)
print(creditcard_df.columns)

+----+------------------+-------------------+------------------+------------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|                V3|                V4|                 V5|                 V6|                  V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|                V16|                V17|                V18|                V19|                V20|                 V21|          

In [5]:
#Showing all maximum Columns
pd.options.display.max_columns = None

In [6]:

# Print schema (like pandas .info())
creditcard_df.printSchema()

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla

In [7]:
# Show first 5 rows
creditcard_df.show(5)

+----+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|     

In [8]:
# Show descriptive stats
creditcard_df.describe().show()

+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|              Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V14|                 V15|

In [9]:
# Calculate null counts per column
null_counts = creditcard_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in creditcard_df.columns
])

In [10]:
# Show the result
null_counts.show()

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+



In [11]:
#Removing Unecessary Column-Time
creditcard_df = creditcard_df.drop('Time')

In [12]:
# Show first 5 rows
creditcard_df.show(5)


+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|               

In [13]:
#Checking if data contains duplicate values or not


from pyspark.sql.functions import count

# Group by all columns and count occurrences
duplicates = (
    creditcard_df.groupBy(creditcard_df.columns)
    .count()
    .filter("count > 1")
)

# Sum up how many duplicate *rows* exist
duplicate_count = duplicates.selectExpr("sum(count - 1) as duplicate_rows").collect()[0]["duplicate_rows"]

print(f"Number of duplicate rows: {duplicate_count}")

from pyspark.sql.functions import count

# Group by all columns and count occurrences
duplicates = (
    creditcard_df.groupBy(creditcard_df.columns)
    .count()
    .filter("count > 1")
)

Number of duplicate rows: 9144


In [14]:
#Removing Duplicated values from dataset
creditcard_df = creditcard_df.drop_duplicates()

In [15]:
#Checking the datset is balanced or imbalanced
counts_df = creditcard_df.groupBy("Class").count()
counts_df.show()

+-----+------+
|Class| count|
+-----+------+
|    1|   473|
|    0|275190|
+-----+------+



In [16]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

#Prepare features (X) and label (y)
feature_columns = [c for c in creditcard_df.columns if c != 'Class']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
final_df = assembler.transform(creditcard_df).select(col('features'), col('Class').alias('label'))

#Split into train/test
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)

#Define classifiers
classifiers = {
    "Logistic Regression": LogisticRegression(featuresCol='features', labelCol='label'),
    "Decision Tree Classifier": DecisionTreeClassifier(featuresCol='features', labelCol='label')
}

#Evaluation metrics
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
evaluator_precision = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedRecall')
evaluator_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')

#Train, predict, and evaluate
for name, clf in classifiers.items():
    print(f"\n========== {name} ==========")
    model = clf.fit(train_data)
    predictions = model.transform(test_data)

    accuracy = evaluator_accuracy.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)

    print(f"Accuracy:  {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall:    {recall}")
    print(f"F1 Score:  {f1}")



Accuracy:  0.9991475160067473
Precision: 0.9990918601023422
Recall:    0.9991475160067473
F1 Score:  0.9991101844316745

Accuracy:  0.9993470335370831
Precision: 0.9993094175751728
Recall:    0.9993470335370831
F1 Score:  0.9992987368163624


Under Sampling

In [17]:
from pyspark.sql.functions import col

# Separate normal and fraud transactions
normal_df = creditcard_df.filter(col('Class') == 0)
fraud_df = creditcard_df.filter(col('Class') == 1)

# Show counts (equivalent to .shape[0])
print(f"Normal transactions count: {normal_df.count()}")
print(f"Fraud transactions count: {fraud_df.count()}")

# Undersample normal transactions (sample 473 rows without replacement)
normal_sample_df = normal_df.sample(withReplacement=False, fraction=473 / normal_df.count(), seed=42)

print(f"Sampled normal transactions count: {normal_sample_df.count()}")

# Combine the undersampled normal and all fraud transactions
new_df = normal_sample_df.union(fraud_df)

# Count classes in the new dataset
new_df.groupBy('Class').count().show()


Normal transactions count: 275190
Fraud transactions count: 473
Sampled normal transactions count: 468
+-----+-----+
|Class|count|
+-----+-----+
|    0|  468|
|    1|  473|
+-----+-----+



In [18]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Step 1: Assemble features into a single vector column
feature_cols = [c for c in new_df.columns if c != 'Class']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(new_df).select(col("features"), col("Class").alias("label"))

# Step 2: Split data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 3: Define classifiers
classifiers = {
    "Logistic Regression": LogisticRegression(featuresCol="features", labelCol="label"),
    "Decision Tree Classifier": DecisionTreeClassifier(featuresCol="features", labelCol="label")
}

# Step 4: Define evaluator for metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

for name, clf in classifiers.items():
    print(f"\n========== {name} ===========")
    model = clf.fit(train_data)
    predictions = model.transform(test_data)

    accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
    precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
    recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
    f1 = evaluator.setMetricName("f1").evaluate(predictions)

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")



Accuracy: 0.9278
Precision: 0.9311
Recall: 0.9278
F1 Score: 0.9280

Accuracy: 0.9111
Precision: 0.9199
Recall: 0.9111
F1 Score: 0.9114


In [19]:
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc_model = dtc.fit(data)


In [20]:
# Saving the trained DecisionTree model
dtc_model.save("credit_card_model")


In [21]:
#Loading the Model
from pyspark.ml.classification import DecisionTreeClassificationModel

# Load the saved DecisionTree model
loaded_model = DecisionTreeClassificationModel.load("credit_card_model")


In [22]:
from pyspark.sql import Row

# Example feature values (same as your array)
input_features = [-1.3598071336738, -0.0727811733098497, 2.53634673796914, 1.37815522427443, -0.338320769942518,
                  0.462387777762292, 0.239598554061257, 0.0986979012610507, 0.363786969611213, 0.0907941719789316,
                  -0.551599533260813, -0.617800855762348, -0.991389847235408, -0.311169353699879, 1.46817697209427,
                  -0.470400525259478, 0.207971241929242, 0.0257905801985591, 0.403992960255733, 0.251412098239705,
                  -0.018306777944153, 0.277837575558899, -0.110473910188767, 0.0669280749146731, 0.128539358273528,
                  -0.189114843888824, 0.133558376740387, -0.0210530534538215, 149.62]

# Create a Spark DataFrame for this single row
input_df = spark.createDataFrame([Row(**{f: v for f, v in zip(feature_cols, input_features)})])

# Assemble features into vector
input_vector = assembler.transform(input_df)

In [23]:
#Making Predictions
prediction = loaded_model.transform(input_vector)

# Get the prediction value
predicted_class = prediction.select("prediction").collect()[0]["prediction"]

if predicted_class == 0.0:
    print("Normal Transaction")
else:
    print("Fraud Transaction")

Normal Transaction


In [24]:
!git config --global user.email "k248030@nu.edu.pk "
!git config --global user.name "AsifK24-8030"

In [25]:
!git clone https://github.com/AsifK24-8030/fraud-detection-pyspark.git


Cloning into 'fraud-detection-pyspark'...
remote: Enumerating objects: 3, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (3/3), done.


In [28]:
!cp Final-Project.ipynb /content/fraud-detection-pyspark/


cp: cannot stat 'Final-Project.ipynb': No such file or directory
