## ML Project - A Machine Learning Pipeline for Credit Card Fraud Detection using SparkML

## Project Objective
Part 1 ETL
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Machine Learning Pipeline creation
- Part 3 Model evaluation
- Part 4 Model Persistance

## Datasets Used 
Dataset URL (Both Test and Train) available on Kaggle: https://www.kaggle.com/datasets/kartik2112/fraud-detection

In [10]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [122]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

import findspark
findspark.init()

In [121]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

In [14]:
spark = SparkSession.builder.appName("Fraud Dection").getOrCreate()

24/04/15 20:57:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [15]:
spark

In [17]:
train = '../input/fraud-detection/fraudTrain.csv'
test = '../input/fraud-detection/fraudTest.csv'

In [19]:
df = spark.read.csv(test, header=True, inferSchema=True)
df1 = spark.read.csv(train, header=True, inferSchema=True)

                                                                                

In [20]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [35]:
df1.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [43]:
df2 = df.unionAll(df1)

In [44]:
df2.count()

                                                                                

1852394

In [38]:
df2 = df2.dropDuplicates()

In [42]:
df2.count()

AttributeError: 'function' object has no attribute 'count'

In [45]:
df2 = df2.dropna()

In [46]:
df2.count()

24/04/15 21:21:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
Expected: _c0 but found: 
CSV file: file:///kaggle/input/fraud-detection/fraudTest.csv
24/04/15 21:21:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, do

1852394

In [47]:
df2.write.parquet("fraud_detection.parquet")

24/04/15 21:22:33 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
Expected: _c0 but found: 
CSV file: file:///kaggle/input/fraud-detection/fraudTest.csv
24/04/15 21:22:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, do

## ML Pipeline Creation Now

In [48]:
Fraud_data = spark.read.parquet("fraud_detection.parquet")

In [52]:
Fraud_data.count()
Fraud_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [60]:
Fraud_data.groupBy('is_fraud').count().orderBy('count').show()



+--------+-------+
|is_fraud|  count|
+--------+-------+
|       1|   9651|
|       0|1842743|
+--------+-------+



                                                                                

In [74]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [75]:
# Date and Time Conversion
Fraud_data = Fraud_data.withColumn("year", year("trans_date_trans_time")) \
                   .withColumn("month", month("trans_date_trans_time")) \
                   .withColumn("day", dayofmonth("trans_date_trans_time")) \
                   .withColumn("hour", hour("trans_date_trans_time")) \
                   .withColumn("minute", minute("trans_date_trans_time"))

# Categorical Encoding
categorical_cols = ["merchant", "category", "city", "state", "job"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_cols]

# Numeric Conversion for Fraud Detection
numeric_cols = ["cc_num", "amt"]
assembler_inputs = numeric_cols + [col + "_encoded" for col in categorical_cols] + ["year", "month", "day", "hour", "minute"]

# Create a Pipeline to apply transformations
pipeline = Pipeline(stages=indexers + encoders)
Fraud_data_transformed = pipeline.fit(Fraud_data).transform(Fraud_data)

# Include necessary numeric columns in the VectorAssembler
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
Fraud_data_transformed = assembler.transform(Fraud_data_transformed)


                                                                                

In [77]:
Fraud_data_transformed.select("features","is_fraud").show()

+--------------------+--------+
|            features|is_fraud|
+--------------------+--------+
|(2168,[0,1,586,70...|       0|
|(2168,[0,1,106,69...|       0|
|(2168,[0,1,367,70...|       0|
|(2168,[0,1,40,695...|       0|
|(2168,[0,1,522,70...|       0|
|(2168,[0,1,16,695...|       0|
|(2168,[0,1,612,70...|       0|
|(2168,[0,1,10,695...|       0|
|(2168,[0,1,516,70...|       0|
|(2168,[0,1,72,696...|       0|
|(2168,[0,1,114,69...|       0|
|(2168,[0,1,373,70...|       0|
|(2168,[0,1,74,696...|       0|
|(2168,[0,1,138,69...|       0|
|(2168,[0,1,148,69...|       0|
|(2168,[0,1,267,70...|       0|
|(2168,[0,1,550,70...|       0|
|(2168,[0,1,170,69...|       0|
|(2168,[0,1,441,70...|       0|
|(2168,[0,1,160,69...|       0|
+--------------------+--------+
only showing top 20 rows



In [78]:
(training_data, testing_data) = Fraud_data_transformed.randomSplit([0.7, 0.3], seed=42)

## Build and Train a Linear Regression Model

In [79]:
lr = LinearRegression(featuresCol="features", labelCol="is_fraud")
model = lr.fit(training_data)

24/04/15 22:23:21 WARN Instrumentation: [82497152] regParam is zero, which might cause numerical instability and overfitting.
24/04/15 22:23:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/04/15 22:23:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/04/15 22:23:54 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/04/15 22:23:54 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
24/04/15 22:23:54 WARN Instrumentation: [82497152] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

## Model Evaluation

In [81]:
predictions = model.transform(testing_data)

In [83]:
from pyspark.ml.evaluation import RegressionEvaluator

In [84]:
evaluator = RegressionEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R Squared =", r2)



R Squared = 0.13790926951417593


                                                                                

In [85]:
evaluator = RegressionEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE =", rmse)



RMSE = 0.06692730987573403


                                                                                

In [86]:
evaluator = RegressionEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print("MAE =", mae)



MAE = 0.011706303494961764


                                                                                

## Save Model Pipeline

In [94]:
model.write().save("Fraud_detection")

Py4JJavaError: An error occurred while calling o1785.save.
: java.io.IOException: Path Fraud_detection already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


## Load Model 

In [124]:
predictions = model.transform(testing_data)

In [125]:
final_predictions = predictions.select("is_fraud","prediction").show()

[Stage 85:>                                                         (0 + 1) / 1]

+--------+--------------------+
|is_fraud|          prediction|
+--------+--------------------+
|       0|0.011500296314609582|
|       0|-0.00144851842189...|
|       0|-0.00409370417275...|
|       0|0.016197039165894322|
|       0|0.010898181726431089|
|       0|-0.00412693105196...|
|       0|0.002410922614015...|
|       0|0.001959099967132838|
|       0|6.907279286942902E-4|
|       0|-0.00871189661293...|
|       0|-0.00217304451336...|
|       0|0.014340486372858896|
|       0|0.006532762696502381|
|       0|0.014104133298301846|
|       0|0.007082016321068...|
|       0|-0.00439931865046872|
|       0| 0.00815343044769401|
|       0|0.028880342526386604|
|       0|0.003733907043943052|
|       0| 0.01442815614741444|
+--------+--------------------+
only showing top 20 rows



                                                                                