In [1]:
# Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType,DateType,FloatType,IntegerType,LongType

In [2]:
# Spark Session
spark = SparkSession.builder.appName('Fraud-Detector').getOrCreate()

22/01/04 20:12:06 WARN Utils: Your hostname, francesc-Inspiron-5570 resolves to a loopback address: 127.0.1.1; using 192.168.1.141 instead (on interface wlp3s0)
22/01/04 20:12:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/04 20:12:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Read datasets
df_train = spark.read.csv('data/clean_fraudTrain.csv', header=True)
df_test = spark.read.csv('data/clean_fraudTest.csv', header=True)

In [4]:
# Show Pretty Data
df_train.limit(5).toPandas()

Unnamed: 0,timestamp,credit_card_num,shop,category,amount,gender,lat,long,city_pop,job,merch_lat,merch_long,is_fraud,age
0,2019-01-01 00:00:18,2703186189652095,"fraud_Rippin, Kub and Mann",misc_net,4.97,F,36.0788,-81.1781,3495,"Psychologist, counselling",36.011293,-82.048315,0,31
1,2019-01-01 00:00:44,630423337322,"fraud_Heller, Gutmann and Zieme",grocery_pos,107.23,F,48.8878,-118.2105,149,Special educational needs teacher,49.159047,-118.186462,0,41
2,2019-01-01 00:00:51,38859492057661,fraud_Lind-Buckridge,entertainment,220.11,M,42.1808,-112.262,4154,Nature conservation officer,43.150704,-112.154481,0,57
3,2019-01-01 00:01:16,3534093764340240,"fraud_Kutch, Hermiston and Farrell",gas_transport,45.0,M,46.2306,-112.1138,1939,Patent attorney,47.034331,-112.561071,0,52
4,2019-01-01 00:03:06,375534208663984,fraud_Keeling-Crist,misc_pos,41.96,M,38.4207,-79.4629,99,Dance movement psychotherapist,38.674999,-78.632459,0,33


In [5]:
# Cast Data
df_train.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- credit_card_num: string (nullable = true)
 |-- shop: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- city_pop: string (nullable = true)
 |-- job: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- age: string (nullable = true)



In [6]:
def cast_df(df):
    df = df.drop("timestamp")
    df = df.withColumn("credit_card_num",df.credit_card_num.cast(LongType()))
    df = df.withColumn("amount",df.amount.cast(FloatType()))
    df = df.withColumn("lat",df.lat.cast(FloatType()))
    df = df.withColumn("long",df.long.cast(FloatType()))
    df = df.withColumn("city_pop",df.city_pop.cast(IntegerType()))
    df = df.withColumn("merch_lat",df.merch_lat.cast(FloatType()))
    df = df.withColumn("merch_long",df.merch_long.cast(FloatType()))
    df = df.withColumn("is_fraud",df.is_fraud.cast(IntegerType()))
    df = df.withColumn("age",df.age.cast(IntegerType()))
    return df

In [7]:
def drop_cols(df,cols):
    for i in cols:
        df = df.drop(i)
    return df
        

In [8]:
# Cast
df_train = cast_df(df_train)
df_test = cast_df(df_test)

In [9]:
# Encode Categorical Variables
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Pipeline

cat_col = ['shop','category','gender','job']

stage_string = [StringIndexer(inputCol= c, outputCol= c+"_indexed") for c in cat_col]
ppl = Pipeline(stages= stage_string)

df_train_indexed = drop_cols(ppl.fit(df_train).transform(df_train),cat_col)
df_test_indexed = drop_cols(ppl.fit(df_test).transform(df_test),cat_col)

                                                                                

In [10]:
df_train_indexed.limit(5).toPandas()

Unnamed: 0,credit_card_num,amount,lat,long,city_pop,merch_lat,merch_long,is_fraud,age,shop_indexed,category_indexed,gender_indexed,job_indexed
0,2703186189652095,4.97,36.0788,-81.178101,3495,36.011292,-82.048317,0,31,571.0,11.0,0.0,141.0
1,630423337322,107.230003,48.887798,-118.210503,149,49.159046,-118.186462,0,41,81.0,1.0,0.0,61.0
2,38859492057661,220.110001,42.180801,-112.262001,4154,43.150703,-112.15448,0,57,319.0,6.0,1.0,457.0
3,3534093764340240,45.0,46.230598,-112.1138,1939,47.034332,-112.561073,0,52,40.0,0.0,1.0,232.0
4,375534208663984,41.959999,38.4207,-79.462898,99,38.674999,-78.632462,0,33,519.0,10.0,1.0,297.0


In [11]:
from pyspark.ml.feature import VectorAssembler

numericCols = ['credit_card_num', 'amount', 'lat', 'long','city_pop','merch_lat','merch_long','age']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")

df_train_indexed = assembler.transform(df_train_indexed)
df_test_indexed = assembler.transform(df_test_indexed)

In [12]:
df_test_indexed.limit(5).toPandas()

Unnamed: 0,credit_card_num,amount,lat,long,city_pop,merch_lat,merch_long,is_fraud,age,shop_indexed,category_indexed,gender_indexed,job_indexed,features
0,2291163933867244,2.86,33.9659,-80.935501,333497,33.986389,-81.200714,0,52,469.0,7.0,1.0,40.0,"[2291163933867244.0, 2.859999895095825, 33.965..."
1,3573030041201292,29.84,40.320702,-110.435997,302,39.450497,-109.960434,0,30,386.0,7.0,0.0,33.0,"[3573030041201292.0, 29.84000015258789, 40.320..."
2,3598215285024754,41.279999,40.672901,-73.536499,34496,40.495811,-74.196114,0,50,440.0,9.0,0.0,28.0,"[3598215285024754.0, 41.279998779296875, 40.67..."
3,3591919803438423,60.049999,28.5697,-80.819099,54767,28.812399,-80.883064,0,33,500.0,10.0,1.0,328.0,"[3591919803438423.0, 60.04999923706055, 28.569..."
4,3526826139003047,3.19,44.252899,-85.016998,1126,44.959148,-85.884735,0,65,641.0,13.0,1.0,258.0,"[3526826139003047.0, 3.190000057220459, 44.252..."


In [13]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="is_fraud", featuresCol="features", numTrees=1)
predictions = rf.fit(df_train_indexed).transform(df_test_indexed)

                                                                                

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))



Accuracy = 0.994666553210756
Test Error = 0.005333446789243967




In [18]:
# Confusion Matrix

from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = predictions.select(['prediction','is_fraud']).withColumn('is_fraud', F.col('is_fraud').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','is_fraud'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[5.53367e+05 2.07000e+02]
 [1.98400e+03 1.61000e+02]]


                                                                                

In [15]:
# Decode Labels
index_string = [IndexToString(inputCol= c+"_indexed", outputCol=c ) for c in cat_col]
ppl = Pipeline(stages= index_string)

df_train_decoded = ppl.fit(df_train_indexed).transform(df_train_indexed)
df_test_decoded = ppl.fit(df_test_indexed).transform(df_test_indexed)