# SparK MLib - Credit Card Fraud Classification

In [0]:
#Let's mount Google Drive So We can Retrieve the Data
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
#There is One File of Creit Card Data. Source  Kaggle.com
!ls "/content/gdrive/My Drive/CC Data/"

creditcardfraud.zip


In [0]:
#Unzip the files to reveal creditcard.csv
#!rm -r *
!unzip -qq "/content/gdrive/My Drive/CC Data/creditcardfraud.zip"

replace creditcard.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y


In [0]:
!ls

creditcard.csv	sample_data		   spark-2.4.3-bin-hadoop2.7.tgz
gdrive		spark-2.4.3-bin-hadoop2.7


# **Install and Load Up Spark**

In [0]:
#Install Latest Version of Spark As of Current Data. 2.4.3

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
df = spark.read.format("csv").option("inferSchema", True).option("header", True).load('creditcard.csv').withColumnRenamed('Class', 'label')
df.show(5)

+----+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|     

In [0]:
df.printSchema()

root
 |-- Time: decimal(10,0) (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double

Examining the table above. This data is a collection of PCA features along with an amount total and a class total.

In [0]:
# total amount of records is 284807
df.count()

284807

In [0]:
df.describe().show()

+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|              Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V14|                 V15|

In [0]:
#check missing values for each column
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(column), column)).alias('MV') for column in df.columns]).show()

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV| MV|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+



In [0]:
#Lets Check the distribution of the class feature. Note  0 = No Fraud, while 1 = Fraud
df.groupby('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|   492|
|    0|284315|
+-----+------+



###Unbalanced Class Problem
As you can see above, the fraud class (1) is only 492 records out to the 284315. This is a class unbalanced class situation.

In [0]:
#Split the data train vs test

weights = [.8, .2]  #80% Train, 20% Test given the unbalanced nature of the data
seed = 42
dfTrain, dfTest = df.randomSplit(weights, seed)
dfTrain.cache(), dfTest.cache()  #IMPORTANT TO CACHE!!!!
dfTrain.count(), dfTest.count()

(228135, 56672)

# Running A Model On the Unbalanced Data

In [0]:
#Logistic Regression Model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression().setLabelCol("label")
paramMap = {lr.maxIter: 10, lr.regParam: .1, lr.elasticNetParam: 0.01}

In [0]:
#All our features are numberical, we can use VectorAssembler right away to create a feature vector that Spark needs

from pyspark.ml.feature import VectorAssembler

#assemble our features
IntFeat = df.columns[1:31]
assembler = VectorAssembler(inputCols= IntFeat, outputCol = "features")

In [0]:
#Define a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler] + [lr])
model_Train = pipeline.fit(dfTrain, paramMap)

In [0]:
#run the model on the Test Data (holdout)

output = model_Train.transform(dfTest).select("features","label", "prediction", "rawPrediction", "probability")
prediction = output.select("label", "prediction", "rawPrediction", )

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
metric = evaluator.evaluate(prediction)
print("Accuracy = %s" % metric)

Accuracy = 0.9987648221343873


In [0]:
#Keep in mind, given the unbalanced class, a high accuracy means the model is good at predicting the non fraud class. But what about the fraud class?
#Run a confusion matrix

In [0]:
#Metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
predRDD = prediction.select('label', 'prediction').withColumn("label", prediction.label.cast('float')).rdd
metrics = BinaryClassificationMetrics(predRDD)

print("Summary Stats:")
print("Area Under Presion Recall = %s" %  metrics.areaUnderPR)
print("Area Under ROC = %s" % metrics.areaUnderROC)

Summary Stats:
Area Under Presion Recall = 0.34579439252336447
Area Under ROC = 0.9993820075924781


In [0]:
#The above scores for accuracy and Area Under ROC look good, however, when we look at Area Under PR, the results are quite poor.
#This is not a suprise when your classes are heavily unbalanced. Looking at the actual Class vs predected class.
#This model only classifies only 30% of true fraud cases. We can do better.

prediction.crosstab("label", 'prediction').show()

+----------------+-----+---+
|label_prediction|  0.0|1.0|
+----------------+-----+---+
|               1|   70| 37|
|               0|56565|  0|
+----------------+-----+---+



# Running A Model On a More Balanced Dataset

###What to do about Unbalanced Class problem?
Generall, there are a number of things you can do to help aleviate the problem such trying different algorithms, collecting more data, sub-sampling the majority class, over-sampling the minority class, etc... In this example, we are going to over-weight the loss function towards the minority class. This should provide a more robust model. The function below was inspired by the following post...

https://stackoverflow.com/questions/33372838/dealing-with-unbalanced-datasets-in-spark-mllib

In [0]:
#Lets increase the balance among classes

#from pyspark.sql.functions import udf

def balanceDataset(DataFrame, String = "label"):
  datasetSize= DataFrame.count()                                       
  positives = DataFrame.filter(dfTrain.label == 1).count()
  balancingRatio = positives/datasetSize
  weighteddfTrain = DataFrame.withColumn("classWeightCol", (when(col("label") == 1, 1 - balancingRatio).otherwise(balancingRatio)))
  return weighteddfTrain
  
#spark.udf.register("BalanceDataset", balanceDataset)  

In [0]:
from pyspark.sql.functions import isnan, when, count, col

weightedDatasetdf= balanceDataset(dfTrain)

In [0]:
#Logistic Regression Model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression().setWeightCol('classWeightCol').setLabelCol("label")
#paramMap = {lr.maxIter: 10, lr.regParam: .1, lr.elasticNetParam: 0.01}
paramMap = {lr.maxIter: 5, lr.regParam: .05, lr.elasticNetParam: 0.05}

In [0]:
#All our features are numberical, we can use VectorAssembler right away to create a feature vector that Spark needs

from pyspark.ml.feature import VectorAssembler

#assemble our features
IntFeat = weightedDatasetdf.columns[1:30]
assembler = VectorAssembler(inputCols= IntFeat, outputCol = "features")

In [0]:
#Define a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler] + [lr])
model_Train = pipeline.fit(weightedDatasetdf, paramMap)

In [0]:
#run the model on the Test Data (holdout)
weightedDatasetdftest= balanceDataset(dfTest)

output = model_Train.transform(weightedDatasetdftest).select("features","label", "prediction", "rawPrediction", "probability")
prediction = output.select("label", "prediction")

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
metric = evaluator.evaluate(prediction)
print("Accuracy = %s" % metric)

Accuracy = 0.9946710897797855


In [0]:
#Metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
predRDD = prediction.select('label', 'prediction').withColumn("label", prediction.label.cast('float')).rdd
metrics = BinaryClassificationMetrics(predRDD)

print("Summary Stats:")
print("Area Under Presion Recall = %s" %  metrics.areaUnderPR)
print("Area Under ROC = %s" % metrics.areaUnderROC)

Summary Stats:
Area Under Presion Recall = 0.5367959175490737
Area Under ROC = 0.6212388001770046


In [0]:
prediction.crosstab("label", 'prediction').show()

+----------------+-----+---+
|label_prediction|  0.0|1.0|
+----------------+-----+---+
|               1|   15| 92|
|               0|56278|287|
+----------------+-----+---+



Using weighted col in the logisitc regression model significantly improve our accuracy with respect to the fraud class!! Area under the jumps to .61 from .35. All good results. Can we improve by using cross validation and a grid search for hyperparamters.

# Adding Cross-Validation and Grid Search for Hyperparameters

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from  pyspark.ml.evaluation import BinaryClassificationEvaluator

#assemble our features

cvevaluator = BinaryClassificationEvaluator(metricName = 'areaUnderPR')  #can select metric to evaluate
cvparamGrid = ParamGridBuilder().addGrid(cvlr.maxIter, [2]).addGrid(cvlr.regParam, [.001, .05, .1, .2])\
.addGrid(cvlr.elasticNetParam, [.0001, .01, .05, .1]).build()

numFolds=2

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=cvparamGrid,
    evaluator=cvevaluator,
    numFolds=numFolds)

cvModel = crossval.fit(weightedDatasetdf)

#cvPrediction = cvModel.transform(weightedDatasetdftest).select("label", "prediction")
cvPrediction = cvModel.transform(weightedDatasetdftest).select("label", "prediction")

#Note: Automatically selects best model, if you would like to see bestmodel then:
bestModel = cvModel.bestModel

In [0]:
cvPrediction = output.select("label", "prediction")

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
metric = evaluator.evaluate(cvPrediction)
print("Accuracy = %s" % metric)

Accuracy = 0.9946710897797855


In [0]:
#Metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
predRDD = cvPrediction.select('label', 'prediction').withColumn("label", cvPrediction.label.cast('float')).rdd
metrics = BinaryClassificationMetrics(predRDD)

print("Summary Stats:")
print("Area Under Presion Recall = %s" %  metrics.areaUnderPR)
print("Area Under ROC = %s" % metrics.areaUnderROC)

Summary Stats:
Area Under Presion Recall = 0.5367959175490737
Area Under ROC = 0.6212388001770046


In [0]:
pcvPrediction.crosstab("label", 'prediction').show()

+----------------+-----+---+
|label_prediction|  0.0|1.0|
+----------------+-----+---+
|               1|   15| 92|
|               0|56278|287|
+----------------+-----+---+



In [0]:
SbestModel = bestModel.stages[1]
bestParams = SbestModel.extractParamMap()
bestParams

{Param(parent='LogisticRegression_f92809a0413e', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_f92809a0413e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.1, Param(parent='LogisticRegression_f92809a0413e', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_f92809a0413e', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_f92809a0413e', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_f92809a0413e', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_f92809a0413e', name='maxIter', doc='maximum number of iterations (>= 0)'): 2, Par

In [0]:
cvmodel.params

NameError: ignored