<a href="https://colab.research.google.com/github/california-19/Fraud_Detection/blob/main/Fraud_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fraud Detection on Credit Card Transactions

This code is to detect fraudulent transactions on credit card purchases. The dataset is availabe at Kaggle.com with the name "creditcard.csv". A PySpark session is used to run this code. I used Logistic Regression, Naive Bayes, Gradient Boost and Random Forest classifiers on the same dataset. I got the best accuracy rate with the Random Forest classifier.

In [5]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark

In [9]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master('Fraud Detection').getOrCreate()

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# Here we imported our data from our Google drive. You may need to import data in another way.
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
import pandas as pd
path = "/content/drive/My Drive/Colab Notebooks/Projects/PySpark Mastercard/creditcard.csv"

In [None]:
# This is how to read a dataframe, not an RDD
df_cc = spark.read.csv(path, header=True, inferSchema=True)
df_cc.printSchema()

In [None]:
df_cc.show()

+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V20|                 V

In [None]:
type(df_cc)

pyspark.sql.dataframe.DataFrame

In [None]:
df_cc.columns

['Time',
 'V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'Class']

In [None]:
from pyspark.sql.functions import col, count, isnan, when

In [None]:
df_cc = df_cc.select(*(col(c).cast('float').alias(c) for c in df_cc.columns))

In [None]:
# I want to check if I have any null values
df_cc.select([count(when(col(c).isNull(),c)).alias(c) for c in df_cc.columns]).show()

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+



Machine learning algorithms in Spark require feature values to be in a vector. Therefore, we will use *VectorAssembler* to turn all feature values into vectors.

In [None]:
cols = df_cc.columns
cols.remove('Time')
cols.remove('Class')

# We specify the object from the VectorAssembler class.
assembler = VectorAssembler(inputCols=cols, outputCol='features')

# Now we transform the data into vectors
data = assembler.transform(df_cc)

data.select('features', 'Class').show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                                                                                                                                                                                                                              

Normally, you have to normalize (or scale) your data. But my dataset that I got from internet is already scaled. Therefore, I don't need to do any scaling.

In [None]:
data.columns

['Time',
 'V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'Class',
 'features']

In [None]:
data_new = data.select('features', 'Class')
data_new.show(5)

+--------------------+-----+
|            features|Class|
+--------------------+-----+
|[-1.3598071336746...|  0.0|
|[1.19185709953308...|  0.0|
|[-1.3583540916442...|  0.0|
|[-0.9662716984748...|  0.0|
|[-1.1582330465316...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [None]:
train, test = data_new.randomSplit([0.7,0.3])
train.show(5)
test.show(5)

+--------------------+-----+
|            features|Class|
+--------------------+-----+
|[-56.407508850097...|  0.0|
|[-36.802318572998...|  0.0|
|[-36.510581970214...|  0.0|
|[-34.591213226318...|  0.0|
|[-33.404083251953...|  0.0|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|            features|Class|
+--------------------+-----+
|[-34.148235321044...|  0.0|
|[-33.017173767089...|  0.0|
|[-28.524267196655...|  1.0|
|[-27.848180770874...|  1.0|
|[-27.670568466186...|  0.0|
+--------------------+-----+
only showing top 5 rows



## Logistic Regression
The target variable is a binomial variable. Therefore, we use logistic regression.

In [None]:
logReg = LogisticRegression(labelCol='Class', featuresCol='features', maxIter=40)
model = logReg.fit(train)

In [None]:
predicted_test = model.transform(test)

In [None]:
predicted_test.select('Class', 'prediction').show(10)

+-----+----------+
|Class|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 10 rows



I want to compute the ROC score but this is not available for dataframes in Spark. Therefore, we convert the dataframe into RDD and compute the ROC score in RDD format.

In [None]:
# Convert the dataframe into RDD
predictionAndLabels = predicted_test.select('Class', 'prediction').rdd

In [None]:
predictionAndLabels.collect()

[Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=1.0, prediction=0.0),
 Row(Class=1.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=1.0, prediction=1.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=1.0, prediction=1.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=1.0, prediction=1.0),
 Row(Class=1.0, prediction=1.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class=0.0, prediction=0.0),
 Row(Class

In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under the ROC curve, or AUC
print('Area under ROC = %s' % metrics.areaUnderROC)

Area under ROC = 0.9365832369856205


In [None]:
# I would like to compute the accuracy here
evaluator = MulticlassClassificationEvaluator(labelCol='Class', predictionCol='prediction', metricName='accuracy')
accuracy_LR = evaluator.evaluate(predicted_test)
print('Accuracy = ', accuracy_LR)

Accuracy =  0.9992289359315856


## Naive Bayes
The Naive Bayes algorithm in Spark requires that no features are negative. Therefore, we will scale this data fro the Naive Bayes algorithm. But the problem with the StandardScaler method is it does not guarantee positive values. We will be using MinMaxScaler which scales the data between 0 and 1.

In [None]:
from pyspark.ml.feature import MinMaxScaler

In [None]:
minmax_scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')

In [None]:
data_new = minmax_scaler.fit(data_new).transform(data_new)

In [None]:
data_new.show(5)

+--------------------+-----+--------------------+
|            features|Class|     scaled_features|
+--------------------+-----+--------------------+
|[-1.3598071336746...|  0.0|[0.93519233536847...|
|[1.19185709953308...|  0.0|[0.97854195322577...|
|[-1.3583540916442...|  0.0|[0.93521702075455...|
|[-0.9662716984748...|  0.0|[0.94187801544628...|
|[-1.1582330465316...|  0.0|[0.93861682967844...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [None]:
naive_bayes = NaiveBayes(featuresCol='scaled_features', labelCol='Class', smoothing=1.0)

In [None]:
train, test = data_new.randomSplit([0.7,0.3])

In [None]:
model = naive_bayes.fit(train)

In [None]:
predicted_test_nb = model.transform(test)

In [None]:
predicted_test_nb.show()

+--------------------+-----+--------------------+--------------------+--------------------+----------+
|            features|Class|     scaled_features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
|[-33.017173767089...|  0.0|[0.39737284953357...|[-51.385610536401...|[0.99809314895805...|       0.0|
|[-32.962810516357...|  0.0|[0.39829641389367...|[-44.521722234942...|[0.99713062866621...|       0.0|
|[-32.273468017578...|  0.0|[0.41000748994591...|[-44.530150418487...|[0.99709876952608...|       0.0|
|[-30.821435928344...|  0.0|[0.43467571837723...|[-54.779875272609...|[0.99792339686454...|       0.0|
|[-29.200328826904...|  1.0|[0.46221632205018...|[-44.651853130026...|[0.99693301615682...|       0.0|
|[-25.331010818481...|  0.0|[0.52795124706524...|[-50.584380937198...|[0.99827737953508...|       0.0|
|[-24.647977828979...|  0.0|[0.53955513236233...|[-49.693687059118...|[0.

In [None]:
predicted_test_nb.select('Class', 'prediction').show(10)

+-----+----------+
|Class|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 10 rows



In [None]:
predictionAndLabels_nb = predicted_test_nb.select('Class', 'prediction').rdd

In [None]:
evaluator_nb = MulticlassClassificationEvaluator(labelCol='Class', predictionCol='prediction', metricName='accuracy')
accuracy_NB = evaluator_nb.evaluate(predicted_test_nb)

In [None]:
print('Accuracy ', accuracy_NB)

Accuracy  0.9984299389550892


In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabels_nb)

# Area under ROC curve
print('Area under ROC = %s' % metrics.areaUnderROC)

Area under ROC = 0.0


## Gradient Boosting Classifier

In [None]:
gradient_boost_class = GBTClassifier(labelCol='Class', featuresCol='features')

In [None]:
model = gradient_boost_class.fit(train)

In [None]:
predicted_test = model.transform(test)

In [None]:
predicted_test.show()

+--------------------+-----+--------------------+--------------------+--------------------+----------+
|            features|Class|     scaled_features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
|[-33.017173767089...|  0.0|[0.39737284953357...|[1.53242415629925...|[0.95541925914566...|       0.0|
|[-32.962810516357...|  0.0|[0.39829641389367...|[0.17072704647578...|[0.58454369620026...|       0.0|
|[-32.273468017578...|  0.0|[0.41000748994591...|[0.17072704647578...|[0.58454369620026...|       0.0|
|[-30.821435928344...|  0.0|[0.43467571837723...|[1.47528095696563...|[0.95029003934379...|       0.0|
|[-29.200328826904...|  1.0|[0.46221632205018...|[-1.3996470797585...|[0.05736233010791...|       1.0|
|[-25.331010818481...|  0.0|[0.52795124706524...|[1.53390141197665...|[0.95554493233952...|       0.0|
|[-24.647977828979...|  0.0|[0.53955513236233...|[1.50627961977426...|[0.

In [None]:
predicted_test.select('Class', 'prediction').show(10)

+-----+----------+
|Class|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 10 rows



In [None]:
predictionAndLabels_gb = predicted_test.select('Class', 'prediction').rdd

In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabels_gb)

# Area under ROC curve
print('Area under ROC = %s' % metrics.areaUnderROC)

Area under ROC = 0.9238783635997955


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='Class', predictionCol='prediction', metricName='accuracy')
accuracy_GB = evaluator.evaluate(predicted_test)

In [None]:
print(accuracy_GB)

0.9993438550857089


## Random Forest Classifier

In [None]:
random_forest_classifier = RandomForestClassifier(labelCol='Class', featuresCol='features', numTrees=40)

In [None]:
model = random_forest_classifier.fit(train)

In [None]:
predicted_test = model.transform(test)

In [None]:
predicted_test.select('Class', 'prediction').show(10)

+-----+----------+
|Class|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 10 rows



In [None]:
predictionAndLabels = predicted_test.select('Class', 'prediction').rdd.map(lambda row: row[0:])

In [None]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print('Area under ROC = %s' % metrics.areaUnderROC)

Area under ROC = 0.9328186672979746


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='Class', predictionCol='prediction', metricName='accuracy')
accuracy_RF = evaluator.evaluate(predicted_test)

In [None]:
print(accuracy_RF)

0.9993907225795868


In [None]:
print('Accuracy LR', accuracy_LR)
print('Accuracy NB', accuracy_NB)
print('Accuracy GB', accuracy_GB)
print('Accuracy RF', accuracy_RF)

Accuracy LR 0.9992289359315856
Accuracy NB 0.9984299389550892
Accuracy GB 0.9993438550857089
Accuracy RF 0.9993907225795868
