In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark import SparkContext
sc =SparkContext()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

In [2]:
df = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('frauddetectionsmall.csv')

In [3]:
display(df)

DataFrame[step: int, type: string, amount: double, nameOrig: string, oldbalanceOrg: double, newbalanceOrig: double, nameDest: string, oldbalanceDest: double, newbalanceDest: double, isFraud: int, isFlaggedFraud: int]

In [4]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [5]:
#select data column
df2 = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", (col("isFraud").cast("Int").alias("label")))
display(df2)

DataFrame[type: string, amount: double, oldbalanceOrg: double, newbalanceOrig: double, oldbalanceDest: double, newbalanceDest: double, label: int]

In [6]:
#Split the data
splits = df2.randomSplit([0.7, 0.3])
train = splits[0]

In [7]:
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()

In [8]:
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)
train.show(5)
test.show(5)

Training Rows: 7126  Testing Rows: 3074
+-------+-------+-------------+--------------+--------------+--------------+-----+
|   type| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|label|
+-------+-------+-------------+--------------+--------------+--------------+-----+
|CASH_IN| 484.57|   5422437.76|    5422922.33|    5638778.53|    5579568.65|    0|
|CASH_IN| 863.08|   9290756.54|    9291619.62|       5577.88|        4714.8|    0|
|CASH_IN|1076.27|   3538789.28|    3539865.55|      22774.25|      23539.55|    0|
|CASH_IN| 1252.3|    2843880.9|     2845133.2|     145455.99|      144203.7|    0|
|CASH_IN|1332.59|   2533796.06|    2535128.65|        8693.0|       7360.41|    0|
+-------+-------+-------------+--------------+--------------+--------------+-----+
only showing top 5 rows

+-------+-------+-------------+--------------+--------------+--------------+---------+
|   type| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|trueLabel|
+-------+-----

In [9]:
test.printSchema()

root
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- trueLabel: integer (nullable = true)



In [10]:
#defining Pipeline whihc provides a simple construction, tuning and testing for ML workflows.
strIdx = StringIndexer(inputCol = "type", outputCol = "typeCat")
labelIdx = StringIndexer(inputCol = "label", outputCol = "idxLabel")

In [11]:
# number is meaningful so that it should be number features
catVect = VectorAssembler(inputCols = ["typeCat"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"], outputCol="numFeatures")

In [12]:
# number vector is normalized
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

In [13]:
cl = []
pipeline = []


In [14]:
cl.insert(0, DecisionTreeClassifier(labelCol="idxLabel", featuresCol="features"))
cl.insert(1, RandomForestClassifier(labelCol="idxLabel", featuresCol="features"))
cl.insert(2, LogisticRegression(labelCol="idxLabel", featuresCol="features"))

In [15]:
# Pipeline process the series of transformation above, which is 7 transformation
for i in range(3):
    pipeline.insert(i, Pipeline(stages=[strIdx, labelIdx, catVect, catIdx, numVect, minMax, featVect, cl[i]]))
    #piplineModel = pipeline.fit(train)
print ("Pipeline complete!")

Pipeline complete!


In [16]:
#Train Validation Split
#we will used a train validation split instead of cross validation for every model because it takes much less time to train the model with the train validation split.
model = []

#When we will be using the whole dataset, we will use the TrainValidationSplit instead of the CrossValidator!
paramGrid = (ParamGridBuilder().addGrid(cl[0].impurity, ("gini", "entropy")).addGrid(cl[0].maxDepth, [5, 10, 20]).addGrid(cl[0].maxBins, [5, 10, 20]).build())
cv = CrossValidator(estimator=pipeline[0], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, numFolds=5)
#cv = TrainValidationSplit(estimator=pipeline[0], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model.insert(0, cv.fit(train))
print ("Model 1 completed")


Model 1 completed


In [17]:
paramGrid2 = (ParamGridBuilder().addGrid(cl[1].impurity, ("gini", "entropy")).addGrid(cl[1].maxDepth, [5, 10, 20]).addGrid(cl[1].maxBins, [5, 10, 20]).build())
cv2 = CrossValidator(estimator=pipeline[1], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, numFolds=5)
#cv2 = TrainValidationSplit(estimator=pipeline[1], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, trainRatio=0.8)
model.insert(1, cv2.fit(train))
print ("Model 2 completed")


Model 2 completed


In [18]:
paramGrid3 = (ParamGridBuilder().addGrid(cl[2].regParam, [0.01, 0.5, 2.0]).addGrid(cl[2].threshold, [0.30, 0.35, 0.5]).addGrid(cl[2].maxIter, [1, 5]).addGrid(cl[2].elasticNetParam, [0.0, 0.5, 1]).build())
cv3 = CrossValidator(estimator=pipeline[2], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid3, numFolds=5)
#cv3 = TrainValidationSplit(estimator=pipeline[2], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model.insert(2, cv3.fit(train))
print ("Model 3 completed")

Model 3 completed


In [19]:
#Test the model
prediction = [] 
predicted = []
for i in range(3):
  prediction.insert(i, model[i].transform(test))
  predicted.insert(i, prediction[i].select("features", "prediction", "probability", "trueLabel"))
  predicted[i].show(30)

+--------------------+----------+-----------+---------+
|            features|prediction|probability|trueLabel|
+--------------------+----------+-----------+---------+
|[1.0,2.6839006414...|       0.0|  [1.0,0.0]|        0|
|[1.0,7.8092018663...|       0.0|  [1.0,0.0]|        0|
|[1.0,8.3100019860...|       0.0|  [1.0,0.0]|        0|
|[1.0,9.0937021733...|       0.0|  [1.0,0.0]|        0|
|[1.0,1.2693803033...|       0.0|  [1.0,0.0]|        0|
|[1.0,1.6969904055...|       0.0|  [1.0,0.0]|        0|
|[1.0,3.4123508155...|       0.0|  [1.0,0.0]|        0|
|[1.0,4.5162510793...|       0.0|  [1.0,0.0]|        0|
|[1.0,5.2616512575...|       0.0|  [1.0,0.0]|        0|
|[1.0,5.7945213848...|       0.0|  [1.0,0.0]|        0|
|[1.0,6.2848915020...|       0.0|  [1.0,0.0]|        0|
|[1.0,9.0027421516...|       0.0|  [1.0,0.0]|        0|
|[1.0,9.2562422122...|       0.0|  [1.0,0.0]|        0|
|[1.0,9.4654822622...|       0.0|  [1.0,0.0]|        0|
|[1.0,9.5750622884...|       0.0|  [1.0,0.0]|   

In [20]:
#from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="trueLabel", rawPredictionCol="prediction")
for i in range(3):
    #evaluator = MulticlassClassificationEvaluator(
    #labelCol="trueLabel", predictionCol="prediction", metricName="weightedRecall")
    areUPR = evaluator.evaluate(predicted[i], {evaluator.metricName: "areaUnderPR"})
    areUROC = evaluator.evaluate(predicted[i], {evaluator.metricName: "areaUnderROC"})
    print("AreaUnderPR = %g " % (areUPR))
    
    print("AreaUnderROC = %g " % (areUROC))

    tp = float(predicted[i].filter("prediction == 1.0 AND truelabel == 1").count())
    fp = float(predicted[i].filter("prediction == 1.0 AND truelabel == 0").count())
    tn = float(predicted[i].filter("prediction == 0.0 AND truelabel == 0").count())
    fn = float(predicted[i].filter("prediction == 0.0 AND truelabel == 1").count())

    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    print("Precision = %g " % (precision))
    print("Recall = %g " % (recall))

    metrics = sqlContext.createDataFrame([
    ("TP", tp),
    ("FP", fp),
    ("TN", tn),
    ("FN", fn),
    ("Precision", tp / (tp + fp)),
    ("Recall", tp / (tp + fn))],["metric", "value"])
    metrics.show()

AreaUnderPR = 0.809319 
AreaUnderROC = 0.807692 
Precision = 1 
Recall = 0.615385 
+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|              16.0|
|       FP|               0.0|
|       TN|            3048.0|
|       FN|              10.0|
|Precision|               1.0|
|   Recall|0.6153846153846154|
+---------+------------------+

AreaUnderPR = 0.828387 
AreaUnderROC = 0.826923 
Precision = 1 
Recall = 0.653846 
+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|              17.0|
|       FP|               0.0|
|       TN|            3048.0|
|       FN|               9.0|
|Precision|               1.0|
|   Recall|0.6538461538461539|
+---------+------------------+

AreaUnderPR = 0.523297 
AreaUnderROC = 0.519231 
Precision = 1 
Recall = 0.0384615 
+---------+--------------------+
|   metric|               value|
+---------+--------------------+
|       TP|                 1