# Credit card fraud detection using geographic data with PySpark

## Importing Necessary Libraries

In [1]:
# Import SparkSession for PySpark 
from pyspark.sql import SparkSession

# Import tools for data preprocessing
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Import ML libraries for classification
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Import tools for classification metric analysis
from sklearn.metrics import confusion_matrix, classification_report
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
# Create Spark session
spark = SparkSession.builder.appName("CreditCardFraudDetection").getOrCreate()

In [3]:
# Check session creation was successful
spark

<i>Datasets available in directory:</i> 
<br>'hdfs://lena/user/blive001/fraudTest.csv', and 
<br>'hdfs://lena/user/blive001/fraudTrain.csv'.

In [4]:
# Load datasets
test_data = spark.read.format("csv").option("inferSchema","True").option("header","true").load('hdfs://lena/user/blive001/fraudTest.csv')
train_data = spark.read.format("csv").option("inferSchema","True").option("header","true").load('hdfs://lena/user/blive001/fraudTrain.csv')

In [5]:
# Confirm datasets were loaded and check datatype
print('Data type of test dataset:', type(test_data))
print('Data type of train dataset:', type(train_data))

Data type of test dataset: <class 'pyspark.sql.dataframe.DataFrame'>
Data type of train dataset: <class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
# Check basic meta data of test data
test_data.printSchema()
test_data.describe()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



DataFrame[summary: string, _c0: string, trans_date_trans_time: string, cc_num: string, merchant: string, category: string, amt: string, first: string, last: string, gender: string, street: string, city: string, state: string, zip: string, lat: string, long: string, city_pop: string, job: string, dob: string, trans_num: string, unix_time: string, merch_lat: string, merch_long: string, is_fraud: string]

In [7]:
# Check basic meta data of train data
train_data.printSchema()
train_data.describe()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



DataFrame[summary: string, _c0: string, trans_date_trans_time: string, cc_num: string, merchant: string, category: string, amt: string, first: string, last: string, gender: string, street: string, city: string, state: string, zip: string, lat: string, long: string, city_pop: string, job: string, dob: string, trans_num: string, unix_time: string, merch_lat: string, merch_long: string, is_fraud: string]

In [8]:
# View example of data to see what format string data is in etc. 
train_data.show(vertical=True, n=2)

-RECORD 0-------------------------------------
 _c0                   | 0                    
 trans_date_trans_time | 2019-01-01 00:00:18  
 cc_num                | 2703186189652095     
 merchant              | fraud_Rippin, Kub... 
 category              | misc_net             
 amt                   | 4.97                 
 first                 | Jennifer             
 last                  | Banks                
 gender                | F                    
 street                | 561 Perry Cove       
 city                  | Moravian Falls       
 state                 | NC                   
 zip                   | 28654                
 lat                   | 36.0788              
 long                  | -81.1781             
 city_pop              | 3495                 
 job                   | Psychologist, cou... 
 dob                   | 1988-03-09           
 trans_num             | 0b242abb623afc578... 
 unix_time             | 1325376018           
 merch_lat   

In [9]:
# NUmber of entries in test and train data
print('Number of values in train_data:', train_data.count())
print('Number of values in test_data: ', test_data.count())

Number of values in train_data: 1296675
Number of values in test_data:  555719


#### Merge train and test data sets for pre-processing 

The data needs to be in train/test datasets for classification but before that step, the data needs to be pre-processed. For the next step, I will combine the two sets into one new dataset for pre-processing. It will be resplit into a train and test set at a later point.

In [10]:
# Concatinate test and training data together
all_data = train_data.union(test_data)

In [11]:
# Confirm concatination was successful and check number of samples in full set
print('Number of values in all_data:', all_data.count())

Number of values in all_data: 1852394


#### Exploritory Data Analysis and Data Pre-processing

It's now time to take a closer look at the data to find information needed for the later classification phase. I am combining the EDA with the pre-processing of the data as I will be working with and manipulating the data as points of interest arrise.

In [12]:
# View names of all columns in DataFrame
all_data.columns

['_c0',
 'trans_date_trans_time',
 'cc_num',
 'merchant',
 'category',
 'amt',
 'first',
 'last',
 'gender',
 'street',
 'city',
 'state',
 'zip',
 'lat',
 'long',
 'city_pop',
 'job',
 'dob',
 'trans_num',
 'unix_time',
 'merch_lat',
 'merch_long',
 'is_fraud']

In [13]:
# Check number of unique values in main categorical features of interest
temp_cats = ['cc_num', 'merchant', 'category', 'city', 'state', 'merch_lat', 'merch_long']
for col in temp_cats:
    print('Number of unique values of ' + '\'' + str(col) + '\'' + ':', all_data.select(col).distinct().count())

Number of unique values of 'cc_num': 999
Number of unique values of 'merchant': 693
Number of unique values of 'category': 14
Number of unique values of 'city': 906
Number of unique values of 'state': 51
Number of unique values of 'merch_lat': 1754157
Number of unique values of 'merch_long': 1809753


I suspect the 'merchant' may be a key factor in determining if the data is frand or not. This is based on a logically assumption that fraudsters are likely to work in a certain area and potentially uses merchants who may cut corners or follow the rules less strictly. As such, I want to investigate the 'merchant' column.

As the total number of data points is 1852394 and the number of unique merchants is 693, I would like to get an idea of if this in rougly equally sread or stacked in several main merchants.

In [14]:
# Generate a total amount of times each merchant appears
all_data.groupBy('merchant').count().sort('merchant').show()

+--------------------+-----+
|            merchant|count|
+--------------------+-----+
| fraud_Abbott-Rogahn| 2647|
|fraud_Abbott-Steuber| 2529|
|fraud_Abernathy a...| 2513|
|   fraud_Abshire PLC| 2733|
|fraud_Adams, Kova...| 1354|
| fraud_Adams-Barrows| 2535|
|fraud_Altenwerth,...| 2755|
|fraud_Altenwerth-...| 3594|
| fraud_Ankunding LLC| 2782|
|fraud_Ankunding-C...| 1155|
|fraud_Armstrong, ...| 2649|
|      fraud_Auer LLC| 2674|
| fraud_Auer-Mosciski| 3487|
|     fraud_Auer-West| 2793|
|fraud_Bahringer G...| 2435|
|fraud_Bahringer, ...| 3552|
|fraud_Bahringer, ...| 2569|
|fraud_Bahringer, ...| 3313|
|fraud_Bahringer-L...| 1240|
|fraud_Bahringer-S...| 2558|
+--------------------+-----+
only showing top 20 rows



In [15]:
# Show summary stastics of 'merchant' columns (mean, std, max, min)
all_data.groupBy('merchant').count().agg({'count': 'mean'}).show()
all_data.groupBy('merchant').count().agg({'count': 'stddev'}).show()
all_data.groupBy('merchant').count().agg({'count': 'max'}).show()
all_data.groupBy('merchant').count().agg({'count': 'min'}).show()

+-----------------+
|       avg(count)|
+-----------------+
|2673.007215007215|
+-----------------+

+-----------------+
|    stddev(count)|
+-----------------+
|819.2381408667195|
+-----------------+

+----------+
|max(count)|
+----------+
|      6262|
+----------+

+----------+
|min(count)|
+----------+
|      1090|
+----------+



Having investigated the 'marchant' column, the results appear fine and the be reasonably well distibuted. There do appear to be some extreme outliers, but this may be the kind which indicate fraud or could simply be a merchant in a big city. I will now move on to checking there are no NaNs in the dataframe.

#### Check for NaNs

In [18]:
# Check for NaN values in dataframe
from pyspark.sql.functions import *
print("Number of NaN values for each column:")
all_data.select([count(when((col(i)=='') | col(i).isNull() | isnan(i), i)).alias(i) for i in all_data.columns]).show(vertical=True)

Number of NaN values for each column:
-RECORD 0--------------------
 _c0                   | 0   
 trans_date_trans_time | 0   
 cc_num                | 0   
 merchant              | 0   
 category              | 0   
 amt                   | 0   
 first                 | 0   
 last                  | 0   
 gender                | 0   
 street                | 0   
 city                  | 0   
 state                 | 0   
 zip                   | 0   
 lat                   | 0   
 long                  | 0   
 city_pop              | 0   
 job                   | 0   
 dob                   | 0   
 trans_num             | 0   
 unix_time             | 0   
 merch_lat             | 0   
 merch_long            | 0   
 is_fraud              | 0   



The dataset appears to be a clean daatset, free on NaNs. Next I will move on to checking the class balance.

#### Number on instances of each class

In [19]:
# Check count of each class
print('Number of samples for class 1:', all_data.select().where(all_data["is_fraud"]==1).count())
print('Number of samples for class 1:', all_data.select().where(all_data["is_fraud"]==0).count())

Number of samples for class 1: 9651
Number of samples for class 1: 1842743


As can be seen, the dataset is heavily imbalanced with around 99.5% class 0. This will likley cause issues in training where the classifier is able to simply guess 'not fraud' every time and achieve 99.5% accuracy. To avoid this issue, I will use oversampling of the fraud class (1) to increase instances of it. I am a little concerned with how far the dat will have to be stretched via oversampling. However, given the dataset is all synthetic data to start with, it may not be too diffifult for it to stretch the 1 class. 

Note: An alternative method would be undersampling, however this would involve throwing away around 99% of our data, which I do not think would benefit training. While it is possible some hybrid approach would work best in real life, as this is completely synthetic data, I will use oversampling as my main method to aviod losing data.

#### Rebalance the dataset with over sampling

In [20]:
# Find ratio of class 0 to class 1
non_fraud = all_data.filter(col("is_fraud") == 0)
fraud = all_data.filter(col("is_fraud") == 1)
class_ratio = int(non_fraud.count()/fraud.count())
print("Ratio of non-fraud to fraud values in dataset: " + str(class_ratio) + ":1")

Ratio of non-fraud to fraud values in dataset: 190:1


In [21]:
# Oversample the fraud (1) class to approximately equal that of the non-fraud class (0)

### Note: based on code of Jun Wan in article "Oversampling and Undersampling with PySpark"[7]
length = range(class_ratio)
# Create duplicates of non-fraud class
oversampled_data = fraud.withColumn("dummy", explode(array([lit(x) for x in length]))).drop('dummy')
# Create new df with a higher number of non-fraud samples
equal_data = non_fraud.unionAll(oversampled_data)

In [22]:
# Confirm the oversampling was successful
print('Number of samples for class 1:', equal_data.select().where(equal_data["is_fraud"]==1).count())
print('Number of samples for class 1:', equal_data.select().where(equal_data["is_fraud"]==0).count())

Number of samples for class 1: 1833690
Number of samples for class 1: 1842743


The classes have now been rebalanced at apporximately a 50:50 ratio. I can now move on to transforming the data.

#### Create 'day', 'week', and 'year' variables from 'trans_date_trans_time' column

It's possible fraud is tied to certain timings, public events etc. The 'trans_date_trans_time' column appears as a more linear way of looking at time as opposed to a periodical approach, which may be more relevant for pattern recognition. Therefore, I will be transforming the current 'trans_date_trans_time' column (which is a String) to something which is more usable by creating 'day' (day of week), 'week' (week of year), and 'year' columns thorugh an intermediary 'timestamp' column.

In [23]:
# Modifying 'trans_date_trans_time' to a timestamp 
mod_df = equal_data.withColumn("timestamp", to_timestamp(col("trans_date_trans_time"), 'yyyy-MM-dd HH:mm:ss'))

In [24]:
# Add 'day', 'week', 'year' columns from 'timestamp'
mod_df=mod_df.withColumn('day',dayofweek('timestamp')).withColumn('week',weekofyear(
    'timestamp')).withColumn('year',year('timestamp'))

#### Create age variable from 'dob'

Date of birth may be difficult for the classifier to interpret so an age variable will be created to replace it with.

In [25]:
# Add 'age' column from 'dob'
mod_df=mod_df.withColumn('age', round(datediff(current_date(),col("dob"))/365.25))

In [26]:
# Confirm the new columns have been made
mod_df.show(vertical=True, n=2)

-RECORD 0-------------------------------------
 _c0                   | 0                    
 trans_date_trans_time | 2019-01-01 00:00:18  
 cc_num                | 2703186189652095     
 merchant              | fraud_Rippin, Kub... 
 category              | misc_net             
 amt                   | 4.97                 
 first                 | Jennifer             
 last                  | Banks                
 gender                | F                    
 street                | 561 Perry Cove       
 city                  | Moravian Falls       
 state                 | NC                   
 zip                   | 28654                
 lat                   | 36.0788              
 long                  | -81.1781             
 city_pop              | 3495                 
 job                   | Psychologist, cou... 
 dob                   | 1988-03-09           
 trans_num             | 0b242abb623afc578... 
 unix_time             | 1325376018           
 merch_lat   

In [27]:
# Check type of new columns
mod_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable

#### Drop columns deemed unlikely to be useful

I will now drop columns which appear unlikely to add substantial benefit but may confuse the classifier during training. The index column, '_c0', can definitely be dropped as well as columns related to customer name and address (the latitude and longitude have been kept so city is not needed and may cause confusion). The temporary 'timestamp' column can also be dropped as well as 'dob', which is now not needed, and 'trans_num'.

In [28]:
# Dropping columns with no expected benefit to classification
to_drop = ['_c0', 'trans_date_trans_time', 'first', 'last', 'street', 'city', 'zip', 'trans_num', 'timestamp', 'dob']
mod_df_small = mod_df.drop(*to_drop)

In [29]:
# Check drop was successful
mod_df_small.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- age: double (nullable = true)



#### One hot encoding of categorical variables

I will now one-hot-encode the categorical variabels which I plan on using in the final model. They will be one-hot-encoded to ensure the classifier only views them as binary variables during training.

In [30]:
# Create list of variables to be onehotencoded
cols_categorical = ["category", "gender", "job", "state", "merchant"]
 
# Create stringIndexer for indexes
stringIndexer = StringIndexer(inputCols=cols_categorical, outputCols=[x + "_index" for x in cols_categorical])
# Store stringIndexer in temp df
temp_df = stringIndexer.fit(mod_df_small).transform(mod_df_small)
# Create encoder for onehotencoded categories
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "_OHE" for x in cols_categorical])
# Create new df with encoded variables added
encoded_df = encoder.fit(temp_df).transform(temp_df)

In [31]:
# View some of the orginal and _OHE columns to check the outcome was as expected
encoded_df.select('gender_OHE', 'gender_index','job_OHE','job_index','category_OHE', 'category_index').show()

+-------------+------------+-----------------+---------+---------------+--------------+
|   gender_OHE|gender_index|          job_OHE|job_index|   category_OHE|category_index|
+-------------+------------+-----------------+---------+---------------+--------------+
|(1,[0],[1.0])|         0.0|(496,[170],[1.0])|    170.0| (13,[4],[1.0])|           4.0|
|(1,[0],[1.0])|         0.0|(496,[104],[1.0])|    104.0| (13,[0],[1.0])|           0.0|
|    (1,[],[])|         1.0|(496,[371],[1.0])|    371.0| (13,[7],[1.0])|           7.0|
|    (1,[],[])|         1.0|(496,[204],[1.0])|    204.0| (13,[3],[1.0])|           3.0|
|    (1,[],[])|         1.0|(496,[291],[1.0])|    291.0| (13,[9],[1.0])|           9.0|
|(1,[0],[1.0])|         0.0|(496,[201],[1.0])|    201.0| (13,[3],[1.0])|           3.0|
|(1,[0],[1.0])|         0.0|(496,[154],[1.0])|    154.0|(13,[12],[1.0])|          12.0|
|    (1,[],[])|         1.0|(496,[330],[1.0])|    330.0| (13,[3],[1.0])|           3.0|
|(1,[0],[1.0])|         0.0|(496

In [32]:
# Check columns and dtypes are now in dataframe
encoded_df.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- merchant_index: double (nullable = false)
 |-- category_index: double (nullable = false)
 |-- gender_index: double (nullable = false)
 |-- state_index: double (nullable = false)
 |-- job_index: double (nullable = false)
 |-- category_OHE: vector (nullable = true)
 |-- state_OHE: vector (nullable = true)
 

In [33]:
# Dropping now irrelevant columns with superseded variations
to_drop = ['merchant_index', 'state_index', 'job_index', 'gender_index', 'category_index',
          'merchant', 'state', 'job', 'gender', 'category']
clean_encoded_df = encoded_df.drop(*to_drop)

In [34]:
# Check dropping was successful and what columns and dtypes are currently in dataframe
clean_encoded_df.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- amt: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- category_OHE: vector (nullable = true)
 |-- state_OHE: vector (nullable = true)
 |-- merchant_OHE: vector (nullable = true)
 |-- gender_OHE: vector (nullable = true)
 |-- job_OHE: vector (nullable = true)



#### Data normalization with MinMaxScaler

Numeric variables will now be normalized to avoid the classifier from potentially over focusing on columns which have a higher range than other columns.

In [35]:
# Create list of variables to be MinMaxScaled
cols_numeric = ["cc_num", "amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", 
                "day", "week", "year", "age"]

# Use VectorAssembler to create vector for each column
assembler = [VectorAssembler(inputCols = [i], outputCol = i + "_vec") for i in cols_numeric]
# Create MinMaxScaler for each column
minmaxscaler = [MinMaxScaler(inputCol = i + "_vec", outputCol = i + "_scaled") for i in cols_numeric]
# Create pipeline
pipeline = Pipeline(stages = assembler + minmaxscaler)
# Normalize data
normalized_data = pipeline.fit(clean_encoded_df).transform(clean_encoded_df)

In [36]:
# Check normalized columns were created as expected
normalized_data.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- amt: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- category_OHE: vector (nullable = true)
 |-- state_OHE: vector (nullable = true)
 |-- merchant_OHE: vector (nullable = true)
 |-- gender_OHE: vector (nullable = true)
 |-- job_OHE: vector (nullable = true)
 |-- cc_num_vec: vector (nullable = true)
 |-- amt_vec: vector (nullable = true)
 |-- lat_vec: vector (nullable = true)
 |-- long_vec: vector (nullable = true)
 |-- city_pop_vec: vector (nullable = true)
 |-- unix_time_vec: vector (nullable = true)
 |-- merch_lat_vec: vector (nullable = tru

In [37]:
# Drop now obsolete columns 
normalized_data = normalized_data.drop('age_vec', 'year_vec', 'week_vec', 'day_vec', 'merch_long_vec', 'merch_lat_vec', 
                                       'unix_time_vec','city_pop_vec', 'long_vec', 'lat_vec', 'amt_vec', 'cc_num_vec', 
                                       'cc_num', 'amt', 'lat', 'long', 'city_pop', 'unix_time', 'merch_lat', 'merch_long',
                                       'day', 'week', 'year', 'age')

In [38]:
# View final dataset to ensure it is as expected
normalized_data.printSchema()

root
 |-- is_fraud: integer (nullable = true)
 |-- category_OHE: vector (nullable = true)
 |-- state_OHE: vector (nullable = true)
 |-- merchant_OHE: vector (nullable = true)
 |-- gender_OHE: vector (nullable = true)
 |-- job_OHE: vector (nullable = true)
 |-- cc_num_scaled: vector (nullable = true)
 |-- amt_scaled: vector (nullable = true)
 |-- lat_scaled: vector (nullable = true)
 |-- long_scaled: vector (nullable = true)
 |-- city_pop_scaled: vector (nullable = true)
 |-- unix_time_scaled: vector (nullable = true)
 |-- merch_lat_scaled: vector (nullable = true)
 |-- merch_long_scaled: vector (nullable = true)
 |-- day_scaled: vector (nullable = true)
 |-- week_scaled: vector (nullable = true)
 |-- year_scaled: vector (nullable = true)
 |-- age_scaled: vector (nullable = true)



We now have our final dataset with the dessired columns to work with for model training consisting of a label colum 'is_fraud' and normalized/one-hot-encoded columns:
<br><i>is_fraud</i>
<br><i>category_OHE</i> 
<br><i>state_OHE</i>
<br><i>merchant_OHE</i> 
<br><i>gender_OHE</i> 
<br><i>job_OHE</i> 
<br><i>cc_num_scaled</i> 
<br><i>amt_scaled</i> 
<br><i>lat_scaled</i> 
<br><i>long_scaled</i> 
<br><i>city_pop_scaled</i> 
<br><i>unix_time_scaled</i> 
<br><i>merch_lat_scaled</i> 
<br><i>merch_long_scaled</i> 
<br><i>day_scaled</i> 
<br><i>week_scaled</i> 
<br><i>year_scaledr</i> 
<br><i>age_scaled</i> 

#### Split data into train and test sets

The data can now be re-split into a train and test set

In [39]:
# Split data into train and test sets
train, test = normalized_data.randomSplit([0.8, 0.2])

print('Number of values in train_data:', train.count())
print('Number of values in test_data: ', test.count())

Number of values in train_data: 2941039
Number of values in test_data:  735394


#### Combine all feature columns into single feature vector

The final step before classification in PySpark is to combine all feature data into a single vector. I will do this by creating a small function to create a list of input columns and run it through VectorAssembler.

In [40]:
# Create function to take all columns except class
def drop_class_from_array(data, unwanted):
    '''
    A function which return a new array of all columns in the input array other than one specified to drop.
    '''
    return [col for col in data if col != unwanted]

In [41]:
# Run drop_class_from_array to create list of input columns (all columns excpet class)
inputCols = train.columns
inputCols = drop_class_from_array(inputCols, 'is_fraud')

In [42]:
# Check result was successful from 'drop_class_from_array'
inputCols

['category_OHE',
 'state_OHE',
 'merchant_OHE',
 'gender_OHE',
 'job_OHE',
 'cc_num_scaled',
 'amt_scaled',
 'lat_scaled',
 'long_scaled',
 'city_pop_scaled',
 'unix_time_scaled',
 'merch_lat_scaled',
 'merch_long_scaled',
 'day_scaled',
 'week_scaled',
 'year_scaled',
 'age_scaled']

In [43]:
# Create VectorAssembler
assembler = VectorAssembler(inputCols = inputCols, outputCol = "features")

In [44]:
# Use assembler on train and test data
train_assembled = assembler.transform(train)
test_assembled = assembler.transform(test)

In [45]:
# Visually check results are as expected for train data
train_assembled.select(['features', 'is_fraud']).show(n=2)

+--------------------+--------+
|            features|is_fraud|
+--------------------+--------+
|(1264,[13,909,125...|       0|
|(1264,[13,984,125...|       0|
+--------------------+--------+
only showing top 2 rows



In [46]:
# Visually check results are as expected for test data
test_assembled.select(['features', 'is_fraud']).show(n=2)

+--------------------+--------+
|            features|is_fraud|
+--------------------+--------+
|(1264,[13,782,125...|       0|
|(1264,[13,810,125...|       0|
+--------------------+--------+
only showing top 2 rows



### Classifcation 

The data is now at a point where it can be used for training for classification. 

I wil create a series of classifers to train and predict on to compare results to find the most appropriate for this dataset. The classifier to be tester are taken from Spark MLlib. The ones to be used are:
<br><i>Naive Bayes classifier</i> 
<br><i>Logistic Regression classifier</i> 
<br><i>Decision Tree classifier</i> 
<br>and the ensemble:
<br><i>RandomForest Classifier</i> 

As the main focus of the task is to detect CCF cases (while also not misclassifying too many non-CCF cases) the main metrics of interest will be accuracy and recall. 

For the sake of completeness, and as it may be relevant in certain situations, precision and ROC metrics will also be calculated.

#### Create accuracy evaluators

In [47]:
# Set up accuracy evaluators for accuracy, recall, precision, and ROC
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", 
                                              metricName="accuracy") 
recall_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", 
                                              metricName="recallByLabel") 
precision_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", 
                                              metricName="precisionByLabel")
ROC_evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName='areaUnderROC')

#### Naive Bayes Classifier

The first model to test will be a general industry standard in baseline<sup>[6]</sup>, the Naive Bayes model. I'm not expecting NB to perform extremely well on the data given it's assumption of data independence, but it will be a good first indicator to see how accurately the fraud cases can be predicted.

In [48]:
# Create Naive Bayes Classifier
NB = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol='features', labelCol='is_fraud')

In [49]:
# Fit data to the model
NB_clf = NB.fit(train_assembled)

In [50]:
# Use model to predict class
predictions_df = NB_clf.transform(test_assembled)

In [51]:
# Calculate accuracy, recall, precision
nb_accuracy = accuracy_evaluator.evaluate(predictions_df)
print("Accuracy on test set:  ", nb_accuracy)
nb_recall = recall_evaluator.evaluate(predictions_df)
print("Recall on test set:    ", nb_recall)
nb_precision = precision_evaluator.evaluate(predictions_df)
print("Precision on test set: ", nb_precision)
ROC = ROC_evaluator.evaluate(predictions_df)
print("ROC on test set:       ", ROC)

Accuracy on test set:   0.7113166547456193
Recall on test set:     0.7147962093029571
Precision on test set:  0.7104900918213667
ROC on test set:        0.7113112239245307


In [52]:
# Create confusion matrix of results
y_true = predictions_df.select(['is_fraud']).collect()
y_pred = predictions_df.select(['prediction']).collect()

print(confusion_matrix(y_true, y_pred))

[[263238 105032]
 [107264 259860]]


In [53]:
# Generate pre-made classification report as a snaity check and to get weighted and f1-scores
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.71      0.71      0.71    368270
           1       0.71      0.71      0.71    367124

    accuracy                           0.71    735394
   macro avg       0.71      0.71      0.71    735394
weighted avg       0.71      0.71      0.71    735394



NB has performed reasonably well on the dataset. It's achieved overall accuracy, precision, recall, and ROC all just over 71%. It has beaten our baseline of 50% accuracy and recall. It can be used as our standard baseline to beat moving forward.

We will now move on to LogisticRegression for comparison.

#### Logistic Regression Classifier

In [73]:
# Create LogisticRegression Classifier
LR = LogisticRegression(featuresCol = "features", predictionCol="prediction", labelCol='is_fraud')

In [74]:
# Fit data to the model
LR_clf = LR.fit(train_assembled)

In [75]:
# Use model to predict class
predictions_LR = LR_clf.transform(test_assembled)

In [80]:
# Calculate accuracy, recall, precision, and ROC
LR_accuracy = accuracy_evaluator.evaluate(predictions_LR)
print("Accuracy on test set:  ", LR_accuracy)
LR_recall = recall_evaluator.evaluate(predictions_LR)
print("Recall on test set:    ", LR_recall)
LR_precision = precision_evaluator.evaluate(predictions_LR)
print("Precision on test set: ", LR_precision)
ROC = ROC_evaluator.evaluate(predictions_LR)
print("ROC on test set:       ", ROC)

Accuracy on test set:   0.8485039485846764
Recall on test set:     0.8906614025274385
Precision on test set:  0.8220250208005453
ROC on test set:        0.8483894881778773


In [81]:
# Create confusion matrix of results
y_true = predictions_LR.select(['is_fraud']).collect()
y_pred = predictions_LR.select(['prediction']).collect()

print(confusion_matrix(y_true, y_pred))

[[328011  40267]
 [ 71017 295272]]


In [82]:
# Generate pre-made classification report as a snaity check and to get weighted and f1-scores
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.82      0.89      0.85    368278
           1       0.88      0.81      0.84    366289

    accuracy                           0.85    734567
   macro avg       0.85      0.85      0.85    734567
weighted avg       0.85      0.85      0.85    734567



LogisticRegression has certainly performed better than the NB classifer. This isn't unexpected given the assumptions of NB. 

LogisticRegression has achieved a high accuracy of 85% and a very good recall of 89%, indicating it will be quite effective if used to detect CCF. It should be noted it does have a lower precision of 82%, although this is likely of less of interest in this case as we are move interested in our success in predicting actual fraud cases. 

#### Decision Tree Classifier

In [83]:
# Create Decision Tree Classifier
DT = DecisionTreeClassifier(featuresCol = "features", predictionCol="prediction", labelCol='is_fraud')

In [84]:
# Fit data to the model
DT_clf = DT.fit(train_assembled)

In [85]:
# Use model to predict class
predictions_DT = DT_clf.transform(test_assembled)

In [86]:
# Calculate accuracy, recall, precision
DT_accuracy = accuracy_evaluator.evaluate(predictions_DT)
print("Accuracy on test set:  ", DT_accuracy)
DT_recall = recall_evaluator.evaluate(predictions_DT)
print("Recall on test set:    ", DT_recall)
DT_precision = precision_evaluator.evaluate(predictions_DT)
print("Precision on test set: ", DT_precision)
ROC = ROC_evaluator.evaluate(predictions_DT)
print("ROC on test set:       ", ROC)

Accuracy on test set:   0.9286300636973891
Recall on test set:     0.9297568684526363
Precision on test set:  0.9280231782874303
ROC on test set:        0.9286270043444489


In [87]:
# Create confusion matrix of results
y_true = predictions_DT.select(['is_fraud']).collect()
y_pred = predictions_DT.select(['prediction']).collect()

print(confusion_matrix(y_true, y_pred))

[[342409  25869]
 [ 26557 339732]]


In [88]:
# Generate pre-made classification report as a snaity check and to get weighted and f1-scores
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.93      0.93      0.93    368278
           1       0.93      0.93      0.93    366289

    accuracy                           0.93    734567
   macro avg       0.93      0.93      0.93    734567
weighted avg       0.93      0.93      0.93    734567



The Decision Tree classifier has achieved very good results with around 93% across the park in all metrics and overtakes LogisticRegression as our most successful classifier. It appears to be a good, well-balanced classifier on the dataset.

#### Random Forest Classifier

In [89]:
# Create Random Forest Classifier
RF = RandomForestClassifier(featuresCol = "features", predictionCol="prediction", labelCol='is_fraud')

In [90]:
# Fit data to the model
RF_clf = RF.fit(train_assembled)

In [91]:
# Use model to predict class
predictions_RF = RF_clf.transform(test_assembled)

In [92]:
# Calculate accuracy, recall, precision
RF_accuracy = accuracy_evaluator.evaluate(predictions_RF)
print("Accuracy on test set:  ", RF_accuracy)
RF_recall = recall_evaluator.evaluate(predictions_RF)
print("Recall on test set:    ", RF_recall)
RF_precision = precision_evaluator.evaluate(predictions_RF)
print("Precision on test set: ", RF_precision)
ROC = ROC_evaluator.evaluate(predictions_RF)
print("ROC on test set:       ", ROC)

Accuracy on test set:   0.8510646408019963
Recall on test set:     0.9625608915004426
Precision on test set:  0.7875717888048345
ROC on test set:        0.8507619207603909


In [93]:
# Create confusion matrix of results
y_true = predictions_RF.select(['is_fraud']).collect()
y_pred = predictions_RF.select(['prediction']).collect()

print(confusion_matrix(y_true, y_pred))

[[354490  13788]
 [ 95615 270674]]


In [94]:
# Generate pre-made classification report as a snaity check and to get weighted and f1-scores
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.79      0.96      0.87    368278
           1       0.95      0.74      0.83    366289

    accuracy                           0.85    734567
   macro avg       0.87      0.85      0.85    734567
weighted avg       0.87      0.85      0.85    734567



The RandomForest Classifier also performed well although with less consistent results that the Decision Tree classifier. It achieved a respectable accuracy of 85% while scoring the highest of all models in recall with 96%. 

This makes the RandomForest Classifier our best performing model specifically at detecting cases of CCF.

### Results of the models

<br><i>Naive Bayes classifier:</i> Accuracy: 0.711, Recall: 0.714
<br><i>Logistic Regression classifier:</i> Accuracy: 0.849, Recall: 0.891
<br><i>Decision Tree classifier:</i> Accuracy: 0.929, Recall: 0.930
<br><i>Random Forest classifier:</i> Accuracy: 0.851, Recall: 0.963

#### Most successful model

Depending on the overall goal of what is trying to be achieved, there are two possible choices as best model. 

If the goal is overall accuracy, the <b>Decision Tree Classifier</b> performs very well across the board achieving good results (93%) in detecting both cases of CCF and non-CCF. 

If the goal is specifially CCF detection and false positives are acceptable, the <b>Random Forest Classifier</b> performs particularly well with a recall of 96% while still achieving good overall results with an accuracy of 85%.

#### Save most successful model for reuse

In [118]:
# Save Decision Tree Classifier as 'DT_clf'
DT_clf.write().overwrite().save('hdfs://lena/user/blive001/DT_clf')

In [119]:
# Save Random Forest Classifier as 'RF_clf'
RF_clf.write().overwrite().save('hdfs://lena/user/blive001/RF_clf')

#### End Spark Session

In [54]:
spark.stop()