In [1]:
#/FileStore/tables/creditcard.csv
#Loading the dataset
data = spark.read.csv("/FileStore/tables/creditcard.csv",inferSchema=True, header=True)

In [2]:
# Transforming seconds into hours - dividing by 3600 and taking modulo 24
from pyspark.sql.types import StructType,DoubleType,IntegerType
from pyspark.sql.functions import round

data = data.withColumn("Hour", round(data.Time.cast(DoubleType())/3600,0) % 24)

In [3]:
import matplotlib.pyplot as plt
#import seaborn as sn
import pandas as pd
import numpy as np

In [4]:
#Correlation matrix
#sn.set(style="white", font_scale=2)

#Compute the correlation matrix
#corr = data.select('Hour','V1','V2','V3','V4','V5','V6','V7','V8','V9','V10','V11','V12','V13','V14','V15','V16','V17','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28', 'Amount' ).toPandas().corr()

# Generate a mask for the upper triangle
#mask = np.zeros_like(corr, dtype=np.bool)
#mask[np.triu_indices_from(mask)] = True

#Set up the matlplotlib figure
#f, ax = plt.subplots(figsize=(18,15))
#f.suptitle("Correlation Matrix", fontsize = 40)

# Generate a custome diverging colormap
#cmap = sn.diverging_palette(220,10, as_cmap= True)

#Draw the heatmap with the mask and correct aspect ratio
#sn.heatmap(corr, mask=mask, cmap=cmap, vmax=0.3, center=0, square=True, linewidths=0.5,cbar_kws={"shrink": 0.5})

#display(f)

In [5]:
#Splitting data into train and test sets
train_data, test_data = data.randomSplit([0.7,0.3])

In [6]:
#Taking care of the imbalanced set
from imblearn.over_sampling import SMOTE, ADASYN


train_data_p = train_data.select('Hour','V1','V2','V3','V4','V5','V6','V7','V8','V9','V10','V11','V12','V13','V14','V15','V16','V17','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28', 'Amount', 'Class').toPandas()

#Undersampling to balance the dataset
#fraud_indices = np.array(train_data_p[train_data_p.Class == 1].index)
#number_records_fraud = len(fraud_indices)

#normal_indices = train_data_p[train_data_p.Class == 0].index

#random_normal_indices = np.random.choice(normal_indices, number_records_fraud, replace=False)
#random_normal_indices = np.array(random_normal_indices)

#under_sample_indices = np.concatenate([fraud_indices,random_normal_indices])

#under_sample_train_data = train_data_p.iloc[under_sample_indices,:]


y_train = train_data_p.iloc[:, 30:31]
X_train = train_data_p.iloc[:, 0:30]


#Oversampling to balance dataset using SMOTE

X_train_resample_smote, y_train_resample_smote = SMOTE(kind='svm').fit_sample(X_train,y_train)

X_train_resample_smote = pd.DataFrame(data = X_train_resample_smote, columns={'Hour','V1','V2','V3','V4','V5','V6','V7','V8','V9','V10','V11','V12','V13','V14','V15','V16','V17','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28', 'Amount'})
y_train_resample_smote = pd.DataFrame(data = y_train_resample_smote, columns = {'Class'})

over_sample_train_data_smote = pd.concat([X_train_resample_smote, y_train_resample_smote], axis=1)

#Oversampling to balance dataset using ADASYN

#X_train_resample_adasyn, y_train_resample_adasyn = ADASYN().fit_sample(X_train,y_train)

#X_train_resample_adasyn = pd.DataFrame(data = X_train_resample_adasyn, columns={'V1','V2','V3','V4','V5','V6','V7','V8','V9','V10','V11','V12','V13','V14','V15','V16','V17','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28', 'Amount'})
#y_train_resample_adasyn = pd.DataFrame(data = y_train_resample_adasyn, columns = {'Class'})

#over_sample_train_data_adasyn = pd.concat([X_train_resample_adasyn, y_train_resample_adasyn], axis=1)

#Defining schema of the event (transaction)
schema = StructType() \
        .add("Hour", DoubleType()) \
        .add("V1", DoubleType()) \
        .add("V2", DoubleType()) \
        .add("V3", DoubleType()) \
        .add("V4", DoubleType()) \
        .add("V5", DoubleType()) \
        .add("V6", DoubleType()) \
        .add("V7", DoubleType()) \
        .add("V8", DoubleType()) \
        .add("V9", DoubleType()) \
        .add("V10", DoubleType()) \
        .add("V11", DoubleType()) \
        .add("V12", DoubleType()) \
        .add("V13", DoubleType()) \
        .add("V14", DoubleType()) \
        .add("V15", DoubleType()) \
        .add("V16", DoubleType()) \
        .add("V17", DoubleType()) \
        .add("V18", DoubleType()) \
        .add("V19", DoubleType()) \
        .add("V20", DoubleType()) \
        .add("V21", DoubleType()) \
        .add("V22", DoubleType()) \
        .add("V23", DoubleType()) \
        .add("V24", DoubleType()) \
        .add("V25", DoubleType()) \
        .add("V26", DoubleType()) \
        .add("V27", DoubleType()) \
        .add("V28", DoubleType()) \
        .add("Amount", DoubleType()) \
        .add("label", IntegerType())

#under_sample_train_data = spark.createDataFrame(under_sample_train_data,schema=schema)

over_sample_train_data_smote = spark.createDataFrame(over_sample_train_data_smote,schema=schema)

#over_sample_train_data_adasyn = spark.createDataFrame(over_sample_train_data_adasyn,schema=schema)

In [7]:
#Creating the features vector
from pyspark.ml.feature import VectorAssembler

columns=['Hour','V1','V2','V3','V4','V5','V6','V7','V8','V9','V10','V11','V12','V13','V14','V15','V16','V17','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28', 'Amount']

assembler = VectorAssembler(inputCols=columns, outputCol='features')

train_data = assembler.transform(over_sample_train_data_smote).select('features', 'label')

#test_data = assembler.transform(test_data.withColumnRenamed('Class', 'label')).select('features', 'label')


In [8]:
test_data = assembler.transform(test_data.withColumnRenamed('Class', 'label')).select('features', 'label')

In [9]:
#from pyspark.ml import Pipeline
#from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
#from pyspark.ml.evaluation import BinaryClassificationEvaluator
#from pyspark.ml.classification import RandomForestClassifier

#rfc =  RandomForestClassifier(labelCol = 'Class', featuresCol = 'features', maxDepth = 21, numTrees = 42)
#rfc =  RandomForestClassifier(labelCol = 'label', featuresCol = 'features')

#Pipeline
#pipeline = Pipeline(stages = [assembler, rfc])

#Grid
#grid = ParamGridBuilder() \
#      .addGrid(rfc.maxDepth, [20,21]) \
#      .addGrid(rfc.numTrees, [39,40]) \
#      .build()

#Evaluator - AUC by default
#ev = BinaryClassificationEvaluator()

#K-fold Cross validation
#cv = CrossValidator(estimator = pipeline, \
#                   estimatorParamMaps = grid, \
#                   evaluator = ev, \
#                   numFolds = 10)

#rfc_model = cv.fit(over_sample_train_data_smote)

#rfc_model.bestModel.stages[-1].extractParamMap()

In [10]:
from pyspark.ml.classification import RandomForestClassifier

rfc =  RandomForestClassifier(labelCol = 'label', featuresCol = 'features', maxDepth = 21, numTrees = 40)

rfc_model = rfc.fit(train_data)

rfc_preds = rfc_model.transform(test_data)

In [11]:
# Plotting the confusion matrix
from sklearn.metrics import confusion_matrix

y_true = test_data.select("label")
y_true = y_true.toPandas()

y_pred = rfc_preds.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
cnf_matrix

In [12]:
recall = cnf_matrix[1][1]/(cnf_matrix[1][0]+cnf_matrix[1][1])
print("Recall:", recall)
precision = cnf_matrix[1][1]/(cnf_matrix[0][1]+cnf_matrix[1][1])
print("Precision:", precision)
print("F1:", 2*(recall*precision)/(recall+precision))

In [13]:
rfc_model.write().overwrite().save("/tmp/spark-random-forest-model")