<a href="https://colab.research.google.com/github/Anusha-Borole/RaptorX_Credit-Card-Fraud-Detection/blob/main/Data_Engineer_Task_Anusha_Hitendra_Borole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing Libraries

In [30]:
!pip install pyspark



In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, log
import os

# Mounting Google Drive

In [32]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [33]:
import zipfile
import os

zip_path = '/content/drive/MyDrive/Data/creditcardtask.csv.zip'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall("/content/dataset")

extracted_files = os.listdir("/content/dataset")
print("Extracted files:", extracted_files)

file_path = f"/content/dataset/{extracted_files[0]}"
print("Path to CSV file:", file_path)

Extracted files: ['creditcard.csv', 'creditcard_1.csv', 'creditcard_2.csv']
Path to CSV file: /content/dataset/creditcard.csv


# Duplicating files for bulk file handling

In [34]:
!cp {file_path} /content/dataset/creditcard_1.csv
!cp {file_path} /content/dataset/creditcard_2.csv

file_paths = [
    file_path,
    "/content/dataset/creditcard_1.csv",
    "/content/dataset/creditcard_2.csv"
]

print("File paths:", file_paths)

File paths: ['/content/dataset/creditcard.csv', '/content/dataset/creditcard_1.csv', '/content/dataset/creditcard_2.csv']


# Initializing Spark session

In [35]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CreditCardFraudDetection") \
    .getOrCreate()

df = spark.read.csv(file_paths, header=True, inferSchema=True)

df = df.dropDuplicates()

df.show(5)

+-----+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------+-----+
| Time|                V1|                V2|                V3|                 V4|                V5|                V6|                 V7|                V8|                V9|               V10|               V11|               V12|                V13|              V14|              V15|                V16|               V17|               V18|               V19|               V20|               V21|               V22|               

# Checking the shape of the dataset

In [36]:
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")

Number of rows: 283726
Number of columns: 31


# Checking for null or missing values

In [37]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
print("Null values in each column:")
null_counts.show()

Null values in each column:
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+



# Descriptive statistics of the dataset

In [38]:
print("Descriptive Statistics:")
df.describe().show()

Descriptive Statistics:
+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|              Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V

# Data Cleaning

In [39]:
df = df.na.fill(0)

df = df.withColumnRenamed("Class", "IsFraud")

df.show(5)

+-----+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------+-------+
| Time|                V1|                V2|                V3|                 V4|                V5|                V6|                 V7|                V8|                V9|               V10|               V11|               V12|                V13|              V14|              V15|                V16|               V17|               V18|               V19|               V20|               V21|               V22|             

# Data Transformation

In [40]:
amount_mean = df.select(mean(col("Amount"))).collect()[0][0]
amount_stddev = df.select(stddev(col("Amount"))).collect()[0][0]

df = df.withColumn("NormalizedAmount", (col("Amount") - amount_mean) / amount_stddev)


# Mathematical Transformation

In [41]:
df = df.withColumn("AmountLog", log(col("Amount") + 1))

df.show(5)

+-----+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------+-------+--------------------+------------------+
| Time|                V1|                V2|                V3|                 V4|                V5|                V6|                 V7|                V8|                V9|               V10|               V11|               V12|                V13|              V14|              V15|                V16|               V17|               V18|               V19|               V20|           

# File Conversion

In [42]:
output_path = '/content/drive/MyDrive/Data/creditcard_fraud_detection.parquet'
df.write.parquet(output_path, mode='overwrite')

print(f"Parquet file saved to: {output_path}")

Parquet file saved to: /content/drive/MyDrive/Data/creditcard_fraud_detection.parquet


In [43]:
df.createOrReplaceTempView("credit_card_fraud")

# **SQL Querying**

# Average normalized amount in fraudulent transactions

In [44]:
query1 = spark.sql("""
    SELECT AVG(NormalizedAmount) AS AvgNormalizedAmountFraud
    FROM credit_card_fraud
    WHERE IsFraud = 1
""")
print("Query 1: Average normalized amount in fraudulent transactions")
query1.show()

Query 1: Average normalized amount in fraudulent transactions
+------------------------+
|AvgNormalizedAmountFraud|
+------------------------+
|     0.14137081760981038|
+------------------------+



# Maximum normalized amount in non-fraudulent transactions

In [45]:
query2 = spark.sql("""
    SELECT MAX(NormalizedAmount) AS MaxNormalizedAmountNonFraud
    FROM credit_card_fraud
    WHERE IsFraud = 0
""")
print("Query 2: Maximum normalized amount in non-fraudulent transactions")
query2.show()

Query 2: Maximum normalized amount in non-fraudulent transactions
+---------------------------+
|MaxNormalizedAmountNonFraud|
+---------------------------+
|         102.24738365067253|
+---------------------------+



# Average of the "AmountLog" column for fraudulent and non-fraudulent transactions

In [46]:
query3 = spark.sql("""
    SELECT IsFraud, AVG(AmountLog) AS AvgAmountLog
    FROM credit_card_fraud
    GROUP BY IsFraud
""")
print("Query 3: Average of the 'AmountLog' column for fraudulent and non-fraudulent transactions")
query3.show()

Query 3: Average of the 'AmountLog' column for fraudulent and non-fraudulent transactions
+-------+------------------+
|IsFraud|      AvgAmountLog|
+-------+------------------+
|      1| 2.837549032419647|
|      0|3.1542885231954867|
+-------+------------------+



In [47]:
spark.stop()
print("Spark session stopped.")

Spark session stopped.
