# Fraud Detection

## Import Spark SQL and Spark ML Libraries

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

## Load Source Data

The data for the project is about mobile money transactions based on a sample of real transactions extracted from one month of financial logs from a mobile money service implemented in an African country. The original logs were provided by a multinational company, who is the provider of the mobile financial service which is currently running in more than 14 countries all around the world.

This synthetic dataset is scaled down 1/4 of the original dataset and it is created just for Kaggle.
The data comes in a .csv format.

In [6]:
from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_f63cbb38899d47179c49ed4a7cf03ccf(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '265dd6d8a99a4549a24ac9574846808d')
    hconf.set(prefix + '.username', '886a93bbc2564a539f02a62ed61e1a61')
    hconf.set(prefix + '.password', 'h8bjj]J[1DME7LnC')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_f63cbb38899d47179c49ed4a7cf03ccf(name)

spark = SparkSession.builder.getOrCreate()

#df = spark.read\
  #.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  #.option('header', 'true')\
  #.option("inferSchema", "true")\
  #.load('swift://ITBSProjectFraudDetection.' + name + '/frauddetectionsmall.csv')
df = sqlContext.sql("SELECT * FROM frauddetection")
df.take(5)
df.dtypes


In [7]:

'''df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option("inferSchema", "true")\
  .load('swift://ITBSProjectFraudDetection.' + name + '/frauddetectionsmall.csv')
df.take(5)'''


In [8]:
'''%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt

fraud = df.toPandas()
f, ax = plt.subplots(1, 1, figsize=(8, 8))
fraud.type.value_counts().plot(kind='bar', title="Transaction type", ax=ax, figsize=(8,8))
plt.show()
plt.figure(0)
cond = (fraud['isFraud'] >= 1)
taf = fraud[cond].type.value_counts().plot(kind='bar',  title="Fraud transactions grouped by type")
plt.show(taf)
plt.figure(1)
cond2 = (fraud['isFraud'] < 1)
taf2 = fraud[cond2].type.value_counts().plot(kind='bar',  title="No fraud transactions grouped by type")
plt.show(taf2)
#fraud['type'] = fraud['type'].apply(convert)
#fraud1 = normalize(fraud)
plt.figure(2)
medianprops = dict(linestyle='-', linewidth=2, color='blue')
bx1 = fraud[cond2].boxplot(column=['oldbalanceDest', 'newbalanceDest'], by='isFraud', medianprops=medianprops)
bx2 = fraud[cond2].boxplot(column=['oldbalanceOrg', 'newbalanceOrig'], by='isFraud', medianprops=medianprops)
bx3 = fraud[cond2].boxplot(column=['amount'], by='isFraud', medianprops=medianprops)
#bx4 = fraud[cond].boxplot(column=['type'], by='isFraud', medianprops=medianprops)
plt.show(bx1)
plt.show(bx2)
plt.show(bx3)
#plt.figure(3)
#hist1 = fraud[cond2].plot.hist(by='isFraud', stacked=True, bins=20)'''

## Scatter Plots
Next, we want to interpret trends in our data using scatter plots.

In [10]:
'''plt.figure(4)
sc1 = fraud.plot.scatter(x='oldbalanceDest', y='newbalanceDest')
sc2 = fraud.plot.scatter(x='oldbalanceOrg', y='newbalanceOrig')
sc1 = fraud.plot.scatter(x='oldbalanceDest', y='oldbalanceOrg')
sc2 = fraud.plot.scatter(x='oldbalanceOrg', y='newbalanceDest')
sc3 = fraud.plot.scatter(x='amount', y='isFraud')
sc4 = fraud.plot.scatter(x='oldbalanceDest', y='isFraud')
sc5 = fraud.plot.scatter(x='newbalanceDest', y='isFraud')
sc6 = fraud.plot.scatter(x='oldbalanceOrg', y='isFraud')
sc7 = fraud.plot.scatter(x='newbalanceOrig', y='isFraud')
plt.show(sc1)
plt.show(sc2)
plt.show(sc3)
plt.show(sc4)
plt.show(sc5)
plt.show(sc6)
plt.show(sc7)'''

## Select the columns
In the next step, we will drop the columns that are useless for our model.

In [12]:
df2 = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", (col("isFraud").cast("Int").alias("label")))
df2.take(5)

## Split the data
In the next step we split the data in a train and test set.

In [14]:
splits = df2.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print "Training Rows:", train_rows, " Testing Rows:", test_rows
train.show(5)
test.show(5)

## Define a pipeline and train the model
We need to prepare the features.

In [16]:
strIdx = StringIndexer(inputCol = "type", outputCol = "typeCat")
labelIdx = StringIndexer(inputCol = "label", outputCol = "idxLabel")
catVect = VectorAssembler(inputCols = ["typeCat"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

cl = []
pipeline = []

cl.insert(0, DecisionTreeClassifier(labelCol="idxLabel", featuresCol="features"))
cl.insert(1, RandomForestClassifier(labelCol="idxLabel", featuresCol="features"))
cl.insert(2, LogisticRegression(labelCol="idxLabel", featuresCol="features"))

# Pipeline process the series of transformation above, which is 7 transformation
for i in range(3):
    pipeline.insert(i, Pipeline(stages=[strIdx, labelIdx, catVect, catIdx, numVect, minMax, featVect, cl[i]]))
print "Pipeline complete!"

In [17]:
model = []

paramGrid = (ParamGridBuilder().addGrid(cl[0].impurity, ("gini", "entropy")).addGrid(cl[0].maxDepth, [5, 10, 20]).addGrid(cl[0].maxBins, [5, 10, 20]).build())
cv = TrainValidationSplit(estimator=pipeline[0], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model.insert(0, cv.fit(train))
print "Model 1 completed"

paramGrid2 = (ParamGridBuilder().addGrid(cl[1].impurity, ("gini", "entropy")).addGrid(cl[1].maxDepth, [5, 10, 20]).addGrid(cl[1].maxBins, [5, 10, 20]).build())
cv2 = TrainValidationSplit(estimator=pipeline[1], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, trainRatio=0.8)
model.insert(1, cv2.fit(train))
print "Model 2 completed"

paramGrid3 = (ParamGridBuilder().addGrid(cl[2].regParam, [0.01, 0.5, 2.0]).addGrid(cl[2].threshold, [0.30, 0.35, 0.5]).addGrid(cl[2].maxIter, [1, 5]).addGrid(cl[2].elasticNetParam, [0.0, 0.5, 1]).build())
cv3 = TrainValidationSplit(estimator=pipeline[2], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model.insert(2, cv3.fit(train))
print "Model 3 completed"

## Test the model
We transform the test dataframe to generate label predictions.

In [19]:
'''predictions = model.transform(test)
predicted = predictions.select("features", "prediction", "probability", "trueLabel")
predicted.show(100, truncate=False)
for row in predicted.collect():
    print row'''
prediction = [] 
predicted = []
for i in range(3):
  prediction.insert(i, model[i].transform(test))
  predicted.insert(i, prediction[i].select("features", "prediction", "probability", "trueLabel"))
  predicted[i].show(30)

## Evaluation
In the next step we evaluate the model

In [21]:
#from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="trueLabel", rawPredictionCol="prediction")
for i in range(3):




    #evaluator = MulticlassClassificationEvaluator(
    #labelCol="trueLabel", predictionCol="prediction", metricName="weightedRecall")
    areUPR = evaluator.evaluate(predicted[i], {evaluator.metricName: "areaUnderPR"})
    areUROC = evaluator.evaluate(predicted[i], {evaluator.metricName: "areaUnderROC"})
    print("AreaUnderPR = %g " % (areUPR))
    print("AreaUnderROC = %g " % (areUROC))

    tp = float(predicted[i].filter("prediction == 1.0 AND truelabel == 1").count())
    fp = float(predicted[i].filter("prediction == 1.0 AND truelabel == 0").count())
    tn = float(predicted[i].filter("prediction == 0.0 AND truelabel == 0").count())
    fn = float(predicted[i].filter("prediction == 0.0 AND truelabel == 1").count())

    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    print("Precision = %g " % (precision))
    print("Recall = %g " % (recall))

    metrics = sqlContext.createDataFrame([
    ("TP", tp),
    ("FP", fp),
    ("TN", tn),
    ("FN", fn),
    ("Precision", tp / (tp + fp)),
    ("Recall", tp / (tp + fn))],["metric", "value"])
    metrics.show()
