## Import

In [0]:
#importing all the necessary packages

%matplotlib inline 
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import Row
import numpy as np
import pandas as pd
import seaborn as sns
import re
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import NaiveBayes 
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import PipelineModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import TrainValidationSplitModel

## Load Dataset

In [0]:
# Loading Train Dataset from Azure blob storage container

train_transaction = spark.read.format('csv').option("header", 'true').load("abfss://team21@sauondbrwebigdatalrs1.dfs.core.windows.net/train_transaction.csv")
train_identity = spark.read.format('csv').option("header", 'true').load("abfss://team21@sauondbrwebigdatalrs1.dfs.core.windows.net/train_identity.csv")

# Loading Test Dataset from Azure blob storage container

test_transaction = spark.read.format('csv').option("header", 'true').load("abfss://team21@sauondbrwebigdatalrs1.dfs.core.windows.net/test_transaction.csv")
test_identity = spark.read.format('csv').option("header", 'true').load("abfss://team21@sauondbrwebigdatalrs1.dfs.core.windows.net/test_identity.csv")

## Merging Datasets

In [0]:
# Using left outer join to get train_df and test_df respectively

train_df = train_transaction.join(train_identity,["TransactionID"],"left")
test_df = test_transaction.join(test_identity,["TransactionID"],"left")

###The following columns and their description are taken straight from Kaggle:
Reference Link: https://www.kaggle.com/competitions/ieee-fraud-detection/discussion/101203

    TransactionDT: timedelta from a given reference datetime (not an actual timestamp)
    TransactionAMT: transaction payment amount in USD
    ProductCD: product code, the product for each transaction
    card1 - card6: payment card information, such as card type, card category, issue bank, country, etc.
    addr: address
    dist: distance
    P_ and (R__) emaildomain: purchaser and recipient email domain
    C1-C14: counting, such as how many addresses are found to be associated with the payment card, etc. The actual meaning is masked.
    D1-D15: timedelta, such as days between previous transaction, etc.
    M1-M9: match, such as names on card and address, etc.
    Vxxx: Vesta engineered rich features, including ranking, counting, and other entity relations.
    id_1 - id_38: identity information parameters

In [0]:
#To show the first 5 elements of the datasets

print(train_df.limit(5).toPandas())
print(test_df.limit(5).toPandas())

  TransactionID isFraud  ... DeviceType                     DeviceInfo
0       2987004       0  ...     mobile  SAMSUNG SM-G892A Build/NRD90M
1       3100441       0  ...     mobile                     iOS Device
2       3100442       0  ...     mobile                     iOS Device
3       3100443       0  ...    desktop                    Trident/7.0
4       3217438       0  ...       None                           None

[5 rows x 434 columns]
  TransactionID TransactionDT TransactionAmt  ... id-38 DeviceType DeviceInfo
0       3779617      22364583           49.0  ...  None       None       None
1       3894291      26465823           54.5  ...  None       None       None
2       3894292      26466013          226.0  ...  None       None       None
3       4005646      30142294        1178.66  ...  None       None       None
4       4005648      30142325           59.0  ...  None       None       None

[5 rows x 433 columns]


## Basic Preprocessing

In [0]:
#Print Count of Train and Test Dataset

print(train_df.count())
print(test_df.count())

590540
506691


In [0]:
#Print Schema of Train and Test Datasets

train_df.printSchema()
test_df.printSchema()

In [0]:
#id column names differ in train_df and test_df. Correcting that:

test_df = test_df.toDF(*(c.replace('id-', 'id_') for c in test_df.columns)) 

In [0]:
#Drop duplicate rows from both train and test and caching the datasets

train_df = train_df.distinct().cache()
test_df = test_df.distinct().cache()

In [0]:
train_df.count()

In [0]:
test_df.count()

In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Calculating missing values in each dataset

missing_data_train = train_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in train_df.columns])
missing_data_test = test_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in test_df.columns])

#Calculating missing values percentage in each dataset

missing_value_percentage_train = missing_data_train.select([(F.col(c)/590540).alias(c) for c in train_df.columns]).collect()
missing_value_percentage_test = missing_data_test.select([(F.col(c)/506691).alias(c) for c in test_df.columns]).collect()

In [0]:
#Printing the missing values in each column in each dataset

missing_data_train.show(2,False,True)
missing_data_test.show(2,False,True)

In [0]:
#Printing the percentage of missing values in each column in each dataset

print(missing_value_percentage_train)
print("\n",missing_value_percentage_test)

In [0]:
#Calculating Distinct Values in each column in each datset
#approx_count_distinct function is used to approximately find the no. of distinct values in all 432 fields.

distinct_values_train = train_df.select([F.approx_count_distinct(F.col(c)).alias(c) for c in train_df.columns]).collect()
distinct_values_test = test_df.select([F.approx_count_distinct(F.col(c)).alias(c) for c in test_df.columns]).collect()

In [0]:
#Printing the number of distinct values in each column in each dataset

print(distinct_values_train)
print("\n",distinct_values_test)

In [0]:
#Showing imbalance through visualization 

isFraud_lst = train_df.groupBy('isFraud').count().collect()
(x_values, y_values) = zip(*isFraud_lst)
plt.bar(x_values, y_values, color = ['red','green'])
plt.title('Distribution of Class Labels')
plt.xlabel('isFraud')
plt.ylabel('Count')
plt.text(0,y_values[0]-30000,y_values[0],ha = 'center')
plt.text(1,y_values[1]+8000,y_values[1],ha = 'center')
plt.show()

In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Putting categorical data and numeric data in lists for ease. The list of categorical columns is provided by Kaggle.

catf = []
catf = ['ProductCD', 'card1', 'card2', 'card3', 'card4', 'card5', \
            'card6', 'addr1', 'addr2', 'P_emaildomain', 'R_emaildomain', 'M1', 'M2', \
            'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9', \
            'DeviceType', 'DeviceInfo']
catf+=['id_'+str(i) for i in range(12,39)]
print(catf)
numf = [feature for feature in train_df.columns if feature not in catf and not feature == 'isFraud']
print(numf)
numf_plus_label = numf + ['isFraud']

#List comprehension is used to seperate numerical from categorical

In [0]:
#Casting numeric feature columns and isFraud column to float data type. If the column exists in the numf list, then change its datatype to float.

train_df = train_df.select(*(c for c in train_df.columns if c not in numf_plus_label),*(F.col(c).cast("float").alias(c) for c in numf_plus_label))
test_df = test_df.select(*(c for c in test_df.columns if c not in numf),*(F.col(c).cast("float").alias(c) for c in numf))

In [0]:
#Looking at correlation plot of C columns.
#First using regex to search for the column names that are of the form Cxx.
#Using vector assembler to prepare the data for correlation.
#heatmap is used to plot the data.

C_columns = []
for ele in train_df.columns:
  
  if re.search('C\d+',ele):
    C_columns.append(ele)

C_columns_df = train_df.select(C_columns)
C_columns_assembler = VectorAssembler(inputCols=C_columns_df.columns,outputCol="C_columns_vectors")
C_columns_assembler_ouput = C_columns_assembler.setHandleInvalid("keep").transform(C_columns_df)
C_columns_matrix = Correlation.corr(C_columns_assembler_ouput, "C_columns_vectors")

print("Pearson correlation matrix:\n" + str((C_columns_matrix.head()[0])))

x= ['C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12', 'C13', 'C14', 'isFraud'] # labels for x-axis
y= ['C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12', 'C13', 'C14','isFraud'] # labels for y-axis

plt.figure(figsize=(16, 7))
sns.heatmap(C_columns_matrix.head()[0].toArray(),annot=True, xticklabels=x, yticklabels=y,cmap="YlGnBu", vmin=-1, vmax=1)


In [0]:
#Looking at correlation plot of ID columns.
#First using regex to search for the column names that are of the form id_xx.
#Using vector assembler to prepare the data for correlation.
#heatmap is used to plot the data.

ID_columns = []
for ele in train_df.columns:
  
  if re.search(r'^id_0[1-9]$|^id_1[0-1]$',ele):
    ID_columns.append(ele)

ID_columns_df = train_df.select(ID_columns)
ID_columns_assembler = VectorAssembler(inputCols=ID_columns_df.columns,outputCol="ID_columns_vectors")
ID_columns_assembler_ouput = ID_columns_assembler.setHandleInvalid("skip").transform(ID_columns_df)
ID_columns_matrix = Correlation.corr(ID_columns_assembler_ouput, "ID_columns_vectors")

print("Pearson correlation matrix:\n" + str((ID_columns_matrix.head()[0])))

x= ['id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06', 'id_07', 'id_08', 'id_09', 'id_10', 'id_11'] # labels for x-axis
y= ['id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06', 'id_07', 'id_08', 'id_09', 'id_10', 'id_11'] # labels for y-axis

plt.figure(figsize=(16, 7))
sns.heatmap(ID_columns_matrix.head()[0].toArray(),annot=True, xticklabels=x, yticklabels=y,cmap="YlGnBu", vmin=-1, vmax=1)

In [0]:
#Looking at correlation plot of D columns.
#First using regex to search for the column names that are of the form Dxx.
#Using vector assembler to prepare the data for correlation.
#heatmap is used to plot the data.

D_columns = []
for ele in train_df.columns:
  
  if re.search(r'D\d+',ele):
    D_columns.append(ele)

D_columns_df = train_df.select(D_columns)
D_columns_assembler = VectorAssembler(inputCols=D_columns_df.columns,outputCol="D_columns_vectors")
D_columns_assembler_ouput = D_columns_assembler.setHandleInvalid("keep").transform(D_columns_df)
D_columns_matrix = Correlation.corr(D_columns_assembler_ouput, "D_columns_vectors")

print("Pearson correlation matrix:\n" + str((D_columns_matrix.head()[0])))

x= ['D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9', 'D10', 'D11', 'D12', 'D13', 'D14', 'D15'] # labels for x-axis
y= ['D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9', 'D10', 'D11', 'D12', 'D13', 'D14', 'D15'] # labels for y-axis

plt.figure(figsize=(16, 7))
sns.heatmap(D_columns_matrix.head()[0].toArray(),annot=True, xticklabels=x, yticklabels=y,cmap="YlGnBu", vmin=-1, vmax=1)

In [0]:
#Looking at correlation plot of V columns.
#First putting all V columns with the same missing values in a 2-D list.
#Then correlation plots for each group is plotted after using vector assembler.
#In each group's correlation plot, a further subgroup is made of columns with pearson coeff >0.95.
#heatmap is used to plot the data.

missing_data_train_lst = missing_data_train.select(["V"+str(i) for i in range(1,340)]).collect()

unique_val = {}
for i in range(len(missing_data_train_lst[0])):
  if missing_data_train_lst[0][i] not in unique_val:
    unique_val[missing_data_train_lst[0][i]] = []
  unique_val[missing_data_train_lst[0][i]].append(i+1)

V_superset_columns = []
for ele in unique_val.values():
   V_superset_columns.append(["V"+str(i) for i in ele])

makegroupings_dict ={}
def makegroupings(V_subset_columns_matrix,V_subset_columns):
  alreadycomputed = []
  for i in range(len(V_subset_columns_matrix)):
    if i in alreadycomputed:
      continue
    else:
      makegroupings_dict[V_subset_columns[i]] = []
      for j in range(len(V_subset_columns_matrix[0])):
        if V_subset_columns_matrix[i][j]>0.75:
          makegroupings_dict[V_subset_columns[i]].append(V_subset_columns[j])
          alreadycomputed.append(j)
  return makegroupings_dict

for V_subset_columns in V_superset_columns:
  V_subset_columns_df = train_df.select(V_subset_columns)
  V_subset_columns_assembler = VectorAssembler(inputCols=V_subset_columns_df.columns,outputCol="V_subset_columns_vectors")
  V_subset_columns_assembler_ouput = V_subset_columns_assembler.setHandleInvalid("skip").transform(V_subset_columns_df)
  V_subset_columns_matrix = Correlation.corr(V_subset_columns_assembler_ouput, "V_subset_columns_vectors")
  V_subset_columns_matrix = V_subset_columns_matrix.head()[0].toArray()
  
  plt.figure(figsize=(20, 20))
  sns.heatmap(V_subset_columns_matrix,annot=True, xticklabels=V_subset_columns, yticklabels=V_subset_columns,cmap="YlGnBu", vmin=-1, vmax=1)
  plt.show()
  makegroupings(V_subset_columns_matrix,V_subset_columns)
  


In [0]:
#Replacing null values in categorical columns with 'missing' and in numerical columns with -999

numf.remove('TransactionID')
train_df = train_df.na.fill("missing",catf).na.fill(-999,numf)
test_df  = test_df.na.fill("missing",catf).na.fill(-999,numf)

## ML PART 1: RUNNING ON UNPROCESSED DATA

In [0]:
#Pipeline for ML Part 1 is saved and can be loaded as follows (If this cell is used then no need to use the next cell):
#pipelineModel = PipelineModel.load("abfss://team21@sauondbrwebigdatalrs1.dfs.core.windows.net/model")

In [0]:
#string indexer -> one hot encoding -> vector assembler ->pipeline -> transforming train and test datasets 

stages = []
for catcol in catf:
    stringIndexer = StringIndexer(inputCol = catcol, outputCol = catcol + 'Index', handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[catcol + "Vec"],handleInvalid = 'keep')
    stages += [stringIndexer, encoder]

assemblerInputs = [c + "Vec" for c in catf] + numf
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid = 'keep')
stages += [assembler]

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(train_df)


In [0]:
#The pipeline model transforms the train and test dataset after being fitted on the train dataset.

train_df_base = pipelineModel.transform(train_df)
selectedCols = ['features'] + train_df.columns
train_df_base = train_df_base.select(selectedCols)

test_df_base = pipelineModel.transform(test_df)
selectedCols = ['features'] + test_df.columns
test_df_base = test_df_base.select(selectedCols)

In [0]:
#To extract second element from each row in the probability column

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

In [0]:
#Logistic Regression model 

lr = LogisticRegression(featuresCol = 'features', labelCol = 'isFraud', maxIter=10)
lrModel = lr.fit(train_df_base.select('isFraud','features'))

predictions = lrModel.transform(test_df_base)
probability_col_lr = predictions.select("TransactionID",ith("probability", lit(1)))
pd_lr = probability_col_lr.toPandas()

In [0]:
#Display the TransactionID and Probability column

display(pd_lr)

In [0]:
#Gradient Boosted Trees

gbt = GBTClassifier(labelCol="isFraud", featuresCol="features", maxIter = 5)
gbtModel = gbt.fit(train_df_base.select('isFraud','features'))

predictions = gbtModel.transform(test_df_base)
probability_col_gbt = predictions.select("TransactionID",ith("probability", lit(1)))
pd_gbt = probability_col_gbt.toPandas()

In [0]:
#Display the TransactionID and Probability column
display(pd_gbt)

## Dropping Features - Pre ML Part 2

In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Finding columns with more than 90 percent missing values

filtered_columns_train = []
for i in range(len(missing_value_percentage_train[0])):
  if missing_value_percentage_train[0][i]>0.9:
    filtered_columns_train.append(train_df.columns[i])

filtered_columns_test = []
for i in range(len(missing_value_percentage_test[0])):
  if missing_value_percentage_test[0][i]>0.9:
    filtered_columns_test.append(test_df.columns[i])

In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Finding columns with only a single distinct value

filtered_columns_distinct_train = []
for i in range(len(distinct_values_train[0])):
  if distinct_values_train[0][i]==1:
    filtered_columns_distinct_train.append(train_df.columns[i])

filtered_columns_distinct_test = []
for i in range(len(distinct_values_test[0])):
  if distinct_values_test[0][i]==1:
    filtered_columns_distinct_test.append(test_df.columns[i])

In [0]:
#Once the sub groups are made after plotting the correlation plots, a representative is chosen from each sub group.
#This representative has the highest number of distinct values in its subgroup.
#The rest of the V columns are dropped.

V_distinct_count_dict = {}
V_columns_names = ["V"+str(i) for i in range(1,340)]
V_columns_distinct_count = train_df.select([F.approx_count_distinct(F.col(c)).alias(c) for c in V_columns_names]).collect()

for i in range(len(V_columns_names)):
  V_distinct_count_dict[V_columns_names[i]] = V_columns_distinct_count[0][i]

V_keep = []
for value in makegroupings_dict.values():
  current_highest_count = 0
  
  for ele in value:
    if V_distinct_count_dict[ele]>current_highest_count:
      current_highest_V = ele
      current_highest_count = V_distinct_count_dict[ele]
  V_keep.append(current_highest_V)

V_drop = [c for c in V_columns_names if c not in V_keep]



In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Adding all the columns to be dropped in one list and ensuring that all values are unique by taking set.

columns_to_drop = list(set(V_drop + filtered_columns_train + filtered_columns_test + filtered_columns_distinct_train + filtered_columns_distinct_test))
print("The number of columns that will be dropped are: ",len(columns_to_drop))
print("\nThe columns that will be dropped are:\n",columns_to_drop)

In [0]:
#Dropping columns by using select statement combined with list comprehension.

train_df = train_df.drop(*[F.col(c) for c in columns_to_drop])
test_df = test_df.drop(*[F.col(c) for c in columns_to_drop])

In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Updating list of categorical and numerical features as a lot of features have been dropped.

catf = []
catf = ['ProductCD', 'card1', 'card2', 'card3', 'card4', 'card5', \
            'card6', 'addr1', 'addr2', 'P_emaildomain', 'R_emaildomain', 'M1', 'M2', \
            'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9', \
            'DeviceType', 'DeviceInfo']
catf+=['id_'+str(i) for i in range(12,39)]

catf = [f for f in catf if f in train_df.columns]
numf = [f for f in train_df.columns if f not in catf and not f == 'isFraud']
numf.remove("TransactionID")
print(catf)
print(numf)
numf_plus_label = numf + ['isFraud']

## ML PART 2: RUNNING ON PREPROCESSED DATA

In [0]:
#string indexer -> one hot encoding -> vector assembler ->pipeline -> transforming train and test datasets 

stages2 = []
for catcol in catf:
    stringIndexer2 = StringIndexer(inputCol = catcol, outputCol = catcol + 'Index', handleInvalid = 'keep')
    encoder2 = OneHotEncoder(inputCols=[stringIndexer2.getOutputCol()], outputCols=[catcol + "Vec"],handleInvalid = 'keep')
    stages2 += [stringIndexer2, encoder2]

assemblerInputs = [c + "Vec" for c in catf] + numf
assembler2 = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid = 'keep')
stages2 += [assembler2]

pipeline2 = Pipeline(stages = stages2)
pipelineModel2 = pipeline2.fit(train_df)


In [0]:
#The pipeline model transforms the train and test dataset after being fitted on the train dataset.

train_df_preprocessed = pipelineModel2.transform(train_df)
selectedCols = ['features'] + train_df.columns
train_df_preprocessed = train_df_preprocessed.select(selectedCols)

test_df_preprocessed = pipelineModel2.transform(test_df)
selectedCols = ['features'] + test_df.columns
test_df_preprocessed = test_df_preprocessed.select(selectedCols)

In [0]:
#To extract second element from each row in the probability column

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

In [0]:
#Logistic Regression

lr2 = LogisticRegression(featuresCol = 'features', labelCol = 'isFraud', maxIter=10)
lrModel2 = lr2.fit(train_df_preprocessed.select('isFraud','features'))

predictions = lrModel2.transform(test_df_preprocessed)
probability_col_lr2 = predictions.select("TransactionID",ith("probability", lit(1)))

pd_lr2 = probability_col_lr2.toPandas()

In [0]:
#Display the TransactionID and Probability column
display(pd_lr2)

In [0]:
#Gradient Boosted Trees

gbt2 = GBTClassifier(labelCol="isFraud", featuresCol="features", maxIter = 5)
gbtModel2 = gbt2.fit(train_df_preprocessed.select('isFraud','features'))

predictions = gbtModel2.transform(test_df_preprocessed)
probability_col_gbt2 = predictions.select("TransactionID",ith("probability", lit(1)))

pd_gbt2 = probability_col_gbt2.toPandas()

In [0]:
#Display the TransactionID and Probability column
display(pd_gbt2)

## Adding Features - Pre ML Part 3

In [0]:
#Replacing "missing" and -999 with None for this step.
#We are doing this because we will be engineering features from existing columns in the next steps and we want to have None in those columns wherever applicable.

train_df = train_df.select([F.when(F.col(c)=="missing",None).when(F.col(c)==-999,None).otherwise(F.col(c)).alias(c) for c in train_df.columns])

test_df = test_df.select([F.when(F.col(c)=="missing",None).when(F.col(c)==-999,None).otherwise(F.col(c)).alias(c) for c in test_df.columns])

In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Reference Link: https://sparkbyexamples.com/pyspark/pyspark-add-new-column-to-dataframe/#add-column-based-on-condition

# adding transaction hour column by using the existing TransactionDT column.
train_df = train_df.withColumn("New_Transaction_Hour", F.floor(F.col('TransactionDT')/3600)%24)
test_df = test_df.withColumn("New_Transaction_Hour", F.floor(F.col('TransactionDT')/3600)%24)

# adding uid (unique identifier) column (combination of card1, card2, card3, card4, card5, card6, addr1 and P_emaildomain)
train_df= train_df.withColumn("uid", F.concat(train_df["card1"],train_df["card2"],train_df["card3"],train_df["card4"],train_df["card5"],train_df["card6"], train_df["addr1"],train_df["P_emaildomain"]))

test_df = test_df.withColumn("uid", F.concat(test_df["card1"],test_df["card2"],test_df["card3"],test_df["card4"],test_df["card5"],test_df["card6"], test_df["addr1"],test_df["P_emaildomain"]))


In [0]:
#Reference Link: https://medium.com/@mr.priyankmishra/a-realistic-approach-to-ieee-cis-fraud-detection-25faea54137
#Updating list of categorical and numerical features 

catf = []
catf = ['ProductCD', 'card1', 'card2', 'card3', 'card4', 'card5', \
            'card6', 'addr1', 'addr2', 'P_emaildomain', 'R_emaildomain', 'M1', 'M2', \
            'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9', \
            'DeviceType', 'DeviceInfo','uid']
catf+=['id_'+str(i) for i in range(12,39)]

catf = [f for f in catf if f in train_df.columns]
numf = [f for f in train_df.columns if f not in catf and not f == 'isFraud']
numf.remove("TransactionID")
print(catf)
print(numf)
numf_plus_label = numf + ['isFraud']

In [0]:
#Re-imputing missing values as feature engineering is done 

train_df = train_df.na.fill("missing",catf).na.fill(-999,numf)
test_df  = test_df.na.fill("missing",catf).na.fill(-999,numf)

In [0]:
train_df.limit(5).toPandas()

In [0]:
train_df = train_df.unpersist()
test_df = test_df.unpersist()

In [0]:
train_df = train_df.cache()
test_df = test_df.cache()

In [0]:
train_df.count()

In [0]:
test_df.count()

## ML PART 3: RUNNING AFTER FEATURE ENGINEERING

In [0]:
#string indexer -> one hot encoding -> vector assembler ->pipeline -> transforming train and test datasets 

stages3 = []
for catcol in catf:
    stringIndexer3 = StringIndexer(inputCol = catcol, outputCol = catcol + 'Index', handleInvalid = 'keep')
    encoder3 = OneHotEncoder(inputCols=[stringIndexer3.getOutputCol()], outputCols=[catcol + "Vec"],handleInvalid = 'keep')
    stages3 += [stringIndexer3, encoder3]
  
assemblerInputs = [c + "Vec" for c in catf] + numf
assembler3 = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid = 'keep')
stages3 += [assembler3]

pipeline3 = Pipeline(stages = stages3)
pipelineModel3 = pipeline3.fit(train_df)


In [0]:
#The pipeline model transforms the train and test dataset after being fitted on the train dataset.

train_df_feature = pipelineModel3.transform(train_df)
selectedCols = ['features'] + train_df.columns
train_df_feature = train_df_feature.select(selectedCols)

test_df_feature = pipelineModel3.transform(test_df)
selectedCols = ['features'] + test_df.columns
test_df_feature = test_df_feature.select(selectedCols)


In [0]:
#To extract second element from each row in the probability column

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

In [0]:
#Logistic Regression

lr3 = LogisticRegression(featuresCol = 'features', labelCol = 'isFraud', maxIter=10)
lrModel3 = lr3.fit(train_df_feature.select("isFraud","features"))
predictions = lrModel3.transform(test_df_feature)
probability_col_lr3 = predictions.select("TransactionID",ith("probability", lit(1)))
pd_lr3 = probability_col_lr3.toPandas()


In [0]:
#Display the TransactionID and Probability column
display(pd_lr3)

In [0]:
#Gradient Boosted Trees

gbt3 = GBTClassifier(labelCol="isFraud", featuresCol="features", maxIter = 5)
gbtModel3 = gbt3.fit(train_df_feature.select("isFraud","features"))

predictions = gbtModel3.transform(test_df_feature)
probability_col_gbt3 = predictions.select("TransactionID",ith("probability", lit(1)))
pd_gbt3 = probability_col_gbt3.toPandas()

In [0]:
#Display the TransactionID and Probability column
display(pd_gbt3)

## Preparing Train Validation Dataset

In [0]:
#Reference Link: https://medium.com/@junwan01/oversampling-and-undersampling-with-pyspark-5dbc25cdf253
#Using sampleBy function to create a undersampled dataset for cross validator
#Using these ratios, the number of samples of both classes will be equal in train_df_subset

fractions = {1: 1.0, 0: 0.036}
train_df_subset = train_df_feature.sampleBy("isFraud", fractions=fractions, seed=40)



In [0]:
#Caching the subset
train_df_subset = train_df_subset.cache()

## ML PART 4: Applying TrainValidationSplit to the Models

In [0]:
#Reference Link: https://spark.apache.org/docs/latest/ml-tuning.html
#An estimator, evaluator and param grid are initialized. They are fed to the TrainValidationSplit function. A unique seed is provided for reproducible results.
#This function then chooses the best parameters for the logistic regression model and transforms the test dataset.

lr_tv = LogisticRegression(featuresCol = 'features', labelCol = 'isFraud', maxIter = 10)
pipeline_tv = Pipeline(stages=[lr_tv])

paramGrid = ParamGridBuilder() \
    .addGrid(lr_tv.regParam, [0, 0.01, 0.1]) \
    .addGrid(lr_tv.elasticNetParam, [0, 0.3, 0.5])\
    .build()

trainval = TrainValidationSplit(estimator=pipeline_tv,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol = 'isFraud'),
                          seed = -8807458389991577450)

trainvalModel = trainval.fit(train_df_subset.select('features','isFraud'))

predictions = trainvalModel.transform(test_df_feature)
probability_col_lr_tv = predictions.select('TransactionID',ith("probability", lit(1)))

pd_lr_tv = probability_col_lr_tv.toPandas()

In [0]:
#Display the TransactionID and Probability column
display(pd_lr_tv)

In [0]:
#Reference Link: https://spark.apache.org/docs/latest/ml-tuning.html
#An estimator, evaluator and param grid are initialized. They are fed to the TrainValidationSplit function. A unique seed is provided for reproducible results.
#This function then chooses the best parameters for the gradient boosted tree model and transforms the test dataset.


gbt_tv = GBTClassifier(featuresCol = 'features', labelCol = 'isFraud', maxIter = 5)
pipeline_tv = Pipeline(stages=[gbt_tv])

paramGrid = ParamGridBuilder()\
             .addGrid(gbt_tv.maxDepth, [3, 5, 7])\
             .build()
trainval = TrainValidationSplit(estimator=pipeline_tv,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol = 'isFraud'),
                          seed = -8807458389991577450)

trainvalModel = trainval.fit(train_df_subset.select('features','isFraud'))

predictions = trainvalModel.transform(test_df_feature)
probability_col_gbt_tv = predictions.select('TransactionID',ith("probability", lit(1)))

pd_gbt_tv = probability_col_gbt_tv.toPandas()

In [0]:
#Display the TransactionID and Probability column
display(pd_gbt_tv)