In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [14]:
from pyspark.sql import functions as f
from pyspark.sql.functions import UserDefinedFunction
from datetime import date, datetime, timedelta
from pyspark.sql.functions import col, mean, stddev, log
from pyspark.sql.types import *

In [4]:
from pyspark.sql import SparkSession
APP_NAME = "CreditCardFraudDetection"
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
spark

In [7]:
data_path = "C:/Users/Sandeep Reddy/Downloads/archive/"  
data_files = "creditcard_*.csv"
df = spark.read.csv(data_path + data_files, header=True, inferSchema=True)
print("Data Loaded Successfully")
df.printSchema()

Data Loaded Successfully
root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true

In [8]:
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.describe().show()

Rows: 284807, Columns: 31
+-------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|             Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V

In [9]:
df = df.fillna(0).withColumnRenamed("Class", "Fraudulent")


In [10]:
df

DataFrame[Time: double, V1: double, V2: double, V3: double, V4: double, V5: double, V6: double, V7: double, V8: double, V9: double, V10: double, V11: double, V12: double, V13: double, V14: double, V15: double, V16: double, V17: double, V18: double, V19: double, V20: double, V21: double, V22: double, V23: double, V24: double, V25: double, V26: double, V27: double, V28: double, Amount: double, Fraudulent: int]

In [15]:
stats = df.select(mean("Amount").alias("mean"), stddev("Amount").alias("std")).collect()
mean_value, std_value = stats[0]["mean"], stats[0]["std"]

In [16]:
df = df.withColumn("NormalizedAmount", (col("Amount") - mean_value) / std_value)
df = df.withColumn("AmountLog", log(col("Amount") + 1))
print("Data Transformation Completed")

Data Transformation Completed


In [17]:
output_path = "D:/credit_card/cleaned_data.parquet"
df.write.parquet(output_path, mode="overwrite")
print(f"Data saved to {output_path}")

Data saved to D:/credit_card/cleaned_data.parquet


In [18]:
df.createOrReplaceTempView("credit_card")
avg_norm_fraud = spark.sql("SELECT AVG(NormalizedAmount) AS AvgNorm FROM credit_card WHERE Fraudulent = 1")
max_norm_non_fraud = spark.sql("SELECT MAX(NormalizedAmount) AS MaxNorm FROM credit_card WHERE Fraudulent = 0")
avg_log_amounts = spark.sql("SELECT AVG(AmountLog) AS AvgLog, Fraudulent FROM credit_card GROUP BY Fraudulent")

In [19]:
avg_norm_fraud.show()
max_norm_non_fraud.show()
avg_log_amounts.show()

+-------------------+
|            AvgNorm|
+-------------------+
|0.13538176514533437|
+-------------------+

+-----------------+
|          MaxNorm|
+-----------------+
|102.3620630045498|
+-----------------+

+------------------+----------+
|            AvgLog|Fraudulent|
+------------------+----------+
|2.8205955123701885|         1|
| 3.152761873849474|         0|
+------------------+----------+

