In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, DateType, FloatType, IntegerType, LongType

In [None]:
# Spark Session 
spark = SparkSession.builder.appName('Fraud Detector').getOrCreate()

In [None]:
# Read
abspath = "../"
df_train = spark.read.csv(os.path.join(abspath, "data/clean_fraudTrain.csv"), header=True)
df_test = spark.read.csv(os.path.join(abspath, "data/clean_fraudTest.csv"), header=True)

In [None]:
df_train.limit(5).toPandas()

In [None]:
df_train.printSchema()

In [None]:
def cast_df(df):
    df = df.withColumn("credit_card_num",df.credit_card_num.cast(LongType()))
    df = df.withColumn("amount",df.amount.cast(FloatType()))
    df = df.withColumn("lat",df.lat.cast(FloatType()))
    df = df.withColumn("long",df.long.cast(FloatType()))
    df = df.withColumn("city_pop",df.city_pop.cast(IntegerType()))
    df = df.withColumn("merch_lat",df.merch_lat.cast(FloatType()))
    df = df.withColumn("merch_long",df.merch_long.cast(FloatType()))
    df = df.withColumn("is_fraud",df.is_fraud.cast(IntegerType()))
    df = df.withColumn("age",df.age.cast(IntegerType()))
    return df

In [None]:
df_train = cast_df(df_train)
df_test = cast_df(df_test)

In [None]:
# Encode Categorical Variables
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Pipeline

cat_col = ['shop', 'category', 'gender', 'job']

list_string_indexer = [StringIndexer(inputCol=c, outputCol=c + "_indexed") for c in cat_col]
ppl = Pipeline(stages=list_string_indexer)

df_train_indexed = ppl.fit(df_train).transform(df_train).drop(*cat_col)
df_test_indexed = ppl.fit(df_test).transform(df_test).drop(*cat_col)

In [None]:
df_train_indexed.limit(5).toPandas()

In [None]:
from pyspark.ml.feature import VectorAssembler

numericCols = ['credit_card_num', 'amount', 'lat', 'long', 'city_pop', 'merch_lat', 'merch_long', 'age', 'shop_indexed', 'category_indexed', 'gender_indexed', 'job_indexed']
assembler = VectorAssembler(inputCols=numericCols, outputCol='features')

df_train_indexed = assembler.transform(df_train_indexed)
df_test_indexed = assembler.transform(df_test_indexed)

In [None]:
df_train_indexed.limit(5).toPandas()

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='is_fraud', featuresCol='features',  maxBins=32, numTrees=5)
model = rf.fit(df_train_indexed)

In [None]:
predictions = model.transform(df_test_indexed)

In [None]:
predictions.limit(10).toPandas()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='is_fraud', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predications)

print("Acurracy: {0:.2f}", accuracy)
print("Test Error: {0:.2f}", (1.0 - accuracy))

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = predictions.select(['predication', 'is_fraud']).withColumn('is_fraud', F.col('is_fraud').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction', 'is_fraud'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

In [None]:
# Confusion Matrix
conf_matrix = metrics.confusionMatrix().toArray()

In [None]:
import seaborn as sns

ax = sns.heatmap(conf_matrix, annot=True, cmap='Blues')
ax.set_xlabel('\nPredicted Values')
ax.set_ylabel('Actual Values')

# Ticket labels - list must be in alphabetical order
ax.xaxis.set_ticklabels(['False', 'True'])
ax.yaxis.set_ticklabels(['False', 'True'])

plt.show()

In [None]:
# Decode labels
list_index_string = [IndexToString(inputCol=c + "_indexed", outputCol=c ) for c in cat_col]
ppl = Pipeline(stages=list_index_string)

df_train_decoded = ppl.fit(df_train_indexed).transform(df_train_indexed).drop(*['shop_indexed', 'category_indexed', 'gender_indexed', 'job_indexed', 'features'])
df_train_decoded = ppl.fit(predictions).transform(predictions).drop(*['shop_indexed', 'category_indexed', 'gender_indexed', 'job_indexed', 'features', 'rawPrediction', 'probability'])

In [None]:
df_test_decoded.filter(df_test_decoded.is_fraud != df_test_decoded.prediction).toPandas()

In [None]:
df_test_decoded.filter(df_test_decoded.is_fraud == 1 && df_test_decoded.prediction == 0).toPandas()

In [None]:
df_test_decoded.filter(df_test_decoded.is_fraud == 0 && df_test_decoded.prediction == 1).toPandas()

## Summary

It seems like we created a good model here. However some notes:

* Accuracy is not a good metric for imbalanced data.
* Our model could still fail on production! The training data might be from a wrong season or just too old and would suffer a lot seeing new patterns in production. So monitoring ongoning performance of your model is extremely important.
* Does our model replace data scientist? No, it empowers them. Instead of building stuff from scratch, data scientists can focus on finding better features. Also it needs experitse to properly evaluate a model.