# Credit Card Fraud Detection using PySpark + MLLib

## Using supervised learning, we will explore a dataset to detect a sample fraudulent credit card transactions.

### Meta
Name: Ansh Sikka

Dataset Source: Kaggle

Dataset Link: (https://www.kaggle.com/ealaxi/paysim1)

Date: 08/29/2021

In [0]:
%sh

curl -O 'https://fraud-detection-ansh-sikka.s3.us-east-2.amazonaws.com/fraud_transactions.csv'


In [0]:
%fs ls "file:/databricks/driver"


path,name,size
file:/databricks/driver/conf/,conf/,4096
file:/databricks/driver/preload_class.lst,preload_class.lst,813069
file:/databricks/driver/metastore_db/,metastore_db/,4096
file:/databricks/driver/ganglia/,ganglia/,4096
file:/databricks/driver/fraud_transactions.csv,fraud_transactions.csv,493534783
file:/databricks/driver/eventlogs/,eventlogs/,4096
file:/databricks/driver/logs/,logs/,4096


In [0]:
import pandas as pd
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [0]:
# Load data
# define path to file
path = 'file:/databricks/driver/fraud_transactions.csv'

# load data using sqlContext
df = sqlContext.read.format("csv")\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .load(path)\
      .limit(5000)

# display in table format


In [0]:
# Show number of flagged transactions
df.filter(df.isFlaggedFraud == 1).show()

### As we can see above, none of these transactions were flagged as fraud! Let's see how we can explore different supervised and unsupervised algorithms to detect these anomalies in credit card transactions.

### Now that we have the data loaded, let's take grab the column names that hold catigorical data and put them into a list.

In [0]:
from pyspark.ml.feature import OneHotEncoder

# stages in pipeline
stages = []
numericColumns = ["step", "amount", "oldbalanceOrg", "newbalanceOrig", "newbalanceDest"]
categoricalColumns = ["type"]

for categoricalCol in categoricalColumns:
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
  encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
  stages+=[stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol="isFraud", outputCol="label")
stages+=[label_stringIdx]



### Let's now put all the feature columns into a single vector columns

In [0]:
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericColumns
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features")
stages+=[assembler]

### Let's now run the stages as a pipeline. We can put all the feature transformations under a single call.

In [0]:
from pyspark.ml.classification import LogisticRegression
cols = df.columns

partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
preppedDataDF = pipelineModel.transform(df)
preppedDataDF.show()

### Let's extract the columns that are needed for the training and testing.

In [0]:
# take the vector columns and the original columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)


### Split the training and testing data: 30% Testing, 70% Training

In [0]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print("Training data points: " + str(trainingData.count()))
print("Testing data points: " + str(testData.count()))

### Let's use logistic regression. This algorithm will have an output of 0 or 1 (Great for Binary Classification)

In [0]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
 
# Train model with Training Data
lrModel = lr.fit(trainingData)
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well
selected = predictions.select("label", "prediction", "probability", "type", "amount")

display(selected.filter(selected.label==1))

label,prediction,probability,type,amount
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9853097690269782, 0.0146902309730218))",CASH_OUT,181.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9999999223379958, 7.766200416092772E-8))",CASH_OUT,416001.33
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9809994500639945, 0.01900054993600553))",TRANSFER,181.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9808041576519347, 0.019195842348065262))",TRANSFER,2806.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8661028218985519, 0.13389717810144808))",TRANSFER,13707.11
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7766871229648317, 0.22331287703516833))",TRANSFER,169941.73
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(6.02800238320705E-5, 0.9999397199761679))",TRANSFER,2930418.44
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(8.710106369412149E-17, 0.9999999999999999))",TRANSFER,10000000.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9843955766478124, 0.01560442335218759))",CASH_OUT,20128.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9833471141215638, 0.016652885878436163))",CASH_OUT,35063.63


## Here's a start. At first, none of them were flagged as fraud. Even though the amount we flagged as fraud is low, we still have a better chance at detections now!

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Create both evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC')

predictionAndTarget = predictions.select("label", "prediction")

acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
auc = evaluator.evaluate(predictionAndTarget)

print("Accuracy: " + str(acc))
print("F1 Score: " + str(f1))
print("Weighted Precision: " + str(weightedPrecision))
print("Weighted Recall: " + str(weightedRecall))
print("Area Under Curve: " + str(auc))


### Woah! Why are these metrics so high: Here's an explanation (from Spark MLLib Documentation):
While there are many different types of classification algorithms, the evaluation of classification models all share similar principles. In a supervised classification problem, there exists a true output and a model-generated predicted output for each data point. For this reason, the results for each data point can be assigned to one of four categories:

True Positive (TP) - label is positive and prediction is also positive
True Negative (TN) - label is negative and prediction is also negative
False Positive (FP) - label is negative but prediction is positive
False Negative (FN) - label is positive but prediction is negative
These four numbers are the building blocks for most classifier evaluation metrics. A fundamental point when considering classifier evaluation is that pure accuracy (i.e. was the prediction correct or incorrect) is not generally a good metric. The reason for this is because a dataset may be highly unbalanced. For example, if a model is designed to predict fraud from a dataset where 95% of the data points are not fraud and 5% of the data points are fraud, then a naive classifier that predicts not fraud, regardless of input, will be 95% accurate. For this reason, metrics like precision and recall are typically used because they take into account the type of error. In most applications there is some desired balance between precision and recall, which can be captured by combining the two into a single metric, called the F-measure.

However, we should take a look at the Area Under Curve (AUC). It's a value that is 0.5 < AUC < 1. The closer it is to 0.5, the less the classifier is able to distinguish between fraud and not fraud. Since 0.56 is closer to 0.5 than 1, we can see the model isn't good at all for determining fraud.

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
 
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=10)
 
# Train model with Training Data
dtModel = dt.fit(trainingData)
predictions = dtModel.transform(testData)
selected = predictions.select("label", "prediction", "probability", "type", "amount")
display(selected.filter(selected.label==1))

label,prediction,probability,type,amount
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",CASH_OUT,181.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",CASH_OUT,416001.33
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",TRANSFER,181.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",TRANSFER,2806.0
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.0, 1.0))",TRANSFER,13707.11
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.0, 1.0))",TRANSFER,169941.73
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.0, 1.0))",TRANSFER,2930418.44
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.0, 1.0))",TRANSFER,10000000.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8571428571428571, 0.14285714285714285))",CASH_OUT,20128.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8571428571428571, 0.14285714285714285))",CASH_OUT,35063.63


### We can see that there are more accurate predictions this time around! Let's see if we can get a better model though.

In [0]:
# Extract Results
predictionAndTarget = predictions.select("label", "prediction")

acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
auc = evaluator.evaluate(predictionAndTarget)

print("Accuracy: " + str(acc))
print("F1 Score: " + str(f1))
print("Weighted Precision: " + str(weightedPrecision))
print("Weighted Recall: " + str(weightedRecall))
print("Area Under Curve: " + str(auc))

### Woah! Why are these metrics so high: Here's an explanation (from Spark MLLib Documentation):
While there are many different types of classification algorithms, the evaluation of classification models all share similar principles. In a supervised classification problem, there exists a true output and a model-generated predicted output for each data point. For this reason, the results for each data point can be assigned to one of four categories:

True Positive (TP) - label is positive and prediction is also positive
True Negative (TN) - label is negative and prediction is also negative
False Positive (FP) - label is negative but prediction is positive
False Negative (FN) - label is positive but prediction is negative
These four numbers are the building blocks for most classifier evaluation metrics. A fundamental point when considering classifier evaluation is that pure accuracy (i.e. was the prediction correct or incorrect) is not generally a good metric. The reason for this is because a dataset may be highly unbalanced. For example, if a model is designed to predict fraud from a dataset where 95% of the data points are not fraud and 5% of the data points are fraud, then a naive classifier that predicts not fraud, regardless of input, will be 95% accurate. For this reason, metrics like precision and recall are typically used because they take into account the type of error. In most applications there is some desired balance between precision and recall, which can be captured by combining the two into a single metric, called the F-measure.

However, we should take a look at the Area Under Curve (AUC). It's a value that is 0.5 < AUC < 1. The closer it is to 0.5, the less the classifier is able to distinguish between fraud and not fraud. Since 0.66 is closer to 0.5 than 1, we can see the model isn't that great (but better than logisitic regression) for determining fraud.

For the last part, let's try a Random Forest Classifier, which use an ensemble (group) of decision trees to improve accuracy.

In [0]:
from pyspark.ml.classification import RandomForestClassifier
 
# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
 
# Train model with Training Data
rfModel = rf.fit(trainingData)
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [0]:
selected = predictions.select("label", "prediction", "probability", "type", "amount")

display(selected.filter(selected.label==1))

label,prediction,probability,type,amount
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9401727201747614, 0.0598272798252386))",CASH_OUT,181.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9879492020127824, 0.012050797987217565))",CASH_OUT,416001.33
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9056222995843581, 0.09437770041564175))",TRANSFER,181.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9056222995843581, 0.09437770041564175))",TRANSFER,2806.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6515504989081433, 0.3484495010918567))",TRANSFER,13707.11
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5517270886711276, 0.4482729113288724))",TRANSFER,169941.73
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.3206806421927158, 0.6793193578072843))",TRANSFER,2930418.44
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5684079149199885, 0.4315920850800114))",TRANSFER,10000000.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9841258137549037, 0.015874186245096303))",CASH_OUT,20128.0
1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9841258137549037, 0.015874186245096303))",CASH_OUT,35063.63


In [0]:
# Extract Results
predictionAndTarget = predictions.select("label", "prediction")

acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
auc = evaluator.evaluate(predictionAndTarget)

print("Accuracy: " + str(acc))
print("F1 Score: " + str(f1))
print("Weighted Precision: " + str(weightedPrecision))
print("Weighted Recall: " + str(weightedRecall))
print("Area Under Curve: " + str(auc))

### This seems like the lowest AOC. The random forest classifier wasn't very successful.

### Now that we looked at some unsupervised learning algorithms, we can conclude that based on our selected features and our data, a decision tree was the best performing algorithm, albiet not performing as well either. However, the original dataset stated that none of the transactions were originally identified as fraud, so finding some fraudulent transactions showed that we made some progress!

# Future Considerations

## Unsupervised Learning

### With unsupervised learning, we won't be using any labels. The algorithm will be analyzing similar datapoints and detecting possible outliers.
### We could try using K-Means Clustering, but we want to detect anomalies, so we should start with a 1-class support vector machine (SVM). Since the data has a low distribution of fraudulent transactions, this would be the best option. Unfortunately, PySpark and MLLib don't have a package for 1-Class SVMs. 

## Plotting
### We could possibly plot the data to further adjust the hyperparameters in the model.

## Hyperparameter Tuning and Evaluation
### We could possibly test each algoirthm using different hyperparameters and chosing the combination of parameters + algorithm type to determine the best outcome