#Credit Card Fraud Detection

**Data**: The dataset covers credit card transactions done by European cardholders in September 2013.
In this dataset, we have 492 frauds out of 284,807 transactions that happened in two days. The dataset is heavily skewed, with the positive class (frauds) accounting for just 0.172 percent of all transactions. It contains only PCA transformation result as numerical input variables. The major components derived with PCA are features V1, V2,... V28; the only features not changed with PCA are 'Time' and 'Amount.' The seconds elapsed between each transaction and the first transaction in the dataset are stored in the 'Time' field. The transaction Amount is represented by the feature 'Amount'. The feature 'Class' has a value of 1 when there is a fraud and 0 when there isn't. This dataset is from [Kaggle- credit card fraud detection](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud).
             
**Goal**: The goal of this approach is to maintain or improve the performance of classifiers in the context of excessively imbalanced and large datasets using Big Data techniques.

**Approach**: Local 

**Steps**: 1. Divide data into Clusters 
           2. Repartion of data based on clusters
           3. SMOTE+ENN on each partition - using Map reduce
           4. Reduce the data to sparkDf
           5. Fit Classifier

In [0]:
sqlContext.clearCache()

##Import required packages and libraries

In [0]:
# Import all the required libraries
from pyspark.sql import functions as F
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


##Read and understand the data

In [0]:
# Load the dataset which is in Comma-Separated Value (CSV) format.
df = spark.read.option("header",True).csv("/mnt/sjjhkkdf/creditcard.csv")
df=df.drop("Time")
  
# Cache DataFrame so that we only read it from disk once
df.cache()

#Check the name, datatype and values of columns
df.take(1)

In [0]:
df.count()

####Data description
**Attribute information**:
```
The features V1, V2,... V28 are derived from PCA transformation.

'Time'- The seconds elapsed between each transaction and the first transaction in the dataset 

'Amount'- The transaction Amount 

'Class'- a value of 1 when there is a fraud and 0 when there isn't.

```
**The target variable is the class of the transaction i.e. fraud (1) or not (0).**

##Data preparation

As all the features are in string, cast all features to floats except Time and Class

In [0]:
#get the list of name of the columns except 'Time' and 'Class'
targets = [i for i in df.schema.names if i not in ["Class"]]
# "Time",
for col in targets: 
  df = df.withColumn(col, df['`{}`'.format(col)].cast('float'))

#Cast 'Time' and 'Class' to integer
for col in ["Class"]:
  df = df.withColumn(col, df['`{}`'.format(col)].cast('integer'))

# "Time",
#Check the datatype of columns
df.printSchema()

#Visualize the data
# display(df)

###Data Visualization

#### Histogram of classes
Total fraud transactions in the dataset

In [0]:
import matplotlib.pyplot as plt
df_count=df.withColumn("Class",df.Class.cast('string')).groupby('Class').count().take(2)
print(df_count)
(x_values, y_values) = zip(*df_count)
plt.bar(x_values, y_values)
plt.title('Histogram of number of non-fraudulent and fradulent transactions')
plt.xlabel('Transactions class')
plt.ylabel('Frequency')
plt.show()

In [0]:
df_min = df.filter(df.Class == 1)
df_new = df.filter(df.Class == 0).sample(0.3)

un = df_min.union(df_new)

# un.filter(df_new.Class == 0).count()

# df_train, df_test = un.randomSplit([0.7,0.3], seed=1)

df_train, df_test = df.randomSplit([0.7,0.3], seed=1)

###Create Vector Assembler

In [0]:
# Note there are two differnt API's for kmeans one for dataframes and another for RDD's
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.KMeans.html

# Convert features into a vector column
assembler = VectorAssembler(
  inputCols= ["V1", "V2", "V3", "V4", "V5", "V6", "V7", "V8", "V9", "V10", "V11", "V12", "V13", "V14", "V15", "V16", "V17", "V18", "V19", "V20", "V21", "V22", "V23", "V24", "V25", "V26", "V27", "V28", "Amount"], 
  outputCol= "features"
)

output = assembler.transform(df_train)
print("Assembled predictor columns to vector column 'features'")
output.select("features", "Class").show(truncate=True)


###Clustering
Create clusters

In [0]:
df_train.drop("time")
a = list(df_train.columns)
# df_train_cluster = pre_smote_df_process(df_train, a, cat_cols=[], target_col = "Class" )
# Apply KMeans clustering using same seed value
import time
start_time=time.time()


kmeans = KMeans().setK(6).setSeed(1)\
        .setFeaturesCol("features")\
        .setPredictionCol("Cluster_no")
kmeansModel1 = kmeans.fit(output)        
df_clusters1=kmeansModel1.transform(output)
# df_clusters1.show(10)

In [0]:
# drop 'features' column 
df_clusters1 = df_clusters1.drop("features")

#### Get the required number of partitions

In [0]:
# df_cluster_sub,df_cluster_sub_test= df_clusters1.randomSplit([0.5,0.5], seed=1)

# df_clusters_partition=df_cluster_sub.sort(df_cluster_sub.Cluster_no.desc())
# type(df_clusters_partition)
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameWriter
# df_clusters_partition.repartition(1)

required_partitions = df_clusters1.select('Cluster_no').distinct().count()
print(required_partitions)

#### Repartition of data based on the clusters

In [0]:
def partitioner(x):
  return int(x[0])


df_clusters_partition=df_clusters1.withColumn("Cluster_no",df_clusters1.Cluster_no.cast('string'))
df_clusters_partition=df_clusters_partition.repartition(required_partitions)
df_clusters_partition=df_clusters_partition.rdd.map(lambda x: (x["Cluster_no"],x)).partitionBy(required_partitions,partitioner).map(lambda x: x[1])
# .partitionBy(required_partitions,partitioner)

In [0]:
df_clusters_partition.getNumPartitions()

#### Convert rdd to dataframe

In [0]:
df_partition=df_clusters_partition.toDF()

In [0]:
df_partition.rdd.getNumPartitions()

In [0]:
# df_partition.select(spark_partition_id().alias("partitionId")).groupBy("partitionid").count().show()

In [0]:
# countInClusters=df_clusters1.groupBy("Cluster_no").count()

In [0]:
# countInClusters.sort(countInClusters.Cluster_no.desc()).show(400)

In [0]:
df_partition.cache()

Get the column names

In [0]:
column_names=df_partition.columns
column_names.remove("Cluster_no")

## SMOTE+ENN

### Apply SMOTEENN on each partition

In [0]:

def toPandas_partition(instances):
    import pandas as pd
    panda_df = pd.DataFrame(columns = column_names)      # using the global variable
    
    for instance in instances:  # each instance is of Row type
        dict_row=instance.asDict()
#         del dict_row['Cluster_no']
        panda_df = panda_df.append(dict_row, ignore_index=True)
    return panda_df
  
def self_training(partition):
    import pandas as pd
    import numpy as np
    from pyspark.sql import Row

    lab = toPandas_partition(partition)
     
    X = lab.loc[:,'V1':'Amount']
    Y = lab.loc[:,"Class"]
    Y=Y.astype('int')
     
    if len(Y)!=sum(Y): ratio= sum(Y)/(len(Y)-sum(Y))
    else: ratio=0
    
    from imblearn.combine import SMOTEENN
    from imblearn.over_sampling import SMOTE
    
    if ratio>0 and sum(Y)>5 and ratio<1:
      sm2=SMOTEENN()
      X_res, y_res = sm2.fit_resample(X,Y)
      X_res=X_res.join(y_res)
      return [X_res]
    else:
      return [lab]

In [0]:
from pyspark.sql.functions import col
df_partition=df_partition.drop(col("Cluster_no"))

In [0]:
df_partition.count()

In [0]:
df_final=df_partition.rdd.mapPartitions(self_training).map(lambda x: (1,x)).reduceByKey(lambda x,y: x.append(y))

In [0]:
final=df_final.take(1)[0][1]

In [0]:
final

Check the 'Class' values after SMOTEENN

In [0]:
final['Class'].value_counts()

###Convert the pandaDF to sparkDF

In [0]:
from pyspark import SparkContext
from pyspark.sql import SQLContext 
from pyspark.sql import SparkSession
#Create PySpark SQL context
sql = SQLContext(sc)

#Create PySpark DataFrame from Pandas
finalDF=sql.createDataFrame(final)

In [0]:
end_time=time.time()

## Training

### Classifier

In [0]:
from pyspark.ml.classification import RandomForestClassifier
classifier = RandomForestClassifier(featuresCol = 'features', labelCol = 'Class')

###Create Pipeline

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, classifier])

pipeline_model = pipeline.fit(finalDF)

In [0]:
pred = pipeline_model.transform(df_test)

##Evaluation

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

binary_evaluator = BinaryClassificationEvaluator(labelCol ="Class")
total_time=end_time-start_time
print("Test Area Under ROC: " + str(binary_evaluator.evaluate(pred, {binary_evaluator.metricName: "areaUnderROC"})))
print("Test Area Under PR: " + str(binary_evaluator.evaluate(pred, {binary_evaluator.metricName: "areaUnderPR"})))
print("----------------------------------------------------------------")

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="accuracy")
print("Accuracy: " + str(evaluator_accuracy.evaluate(pred)))
print("----------------------------------------------------------------")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="precisionByLabel")
print("Precision: " + str(evaluator_precision.evaluate(pred)))
print("----------------------------------------------------------------")

evaluator_recall = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="recallByLabel")
print("Recall: " + str(evaluator_recall.evaluate(pred)))
print("----------------------------------------------------------------")

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="f1")
print("F1: " + str(evaluator_f1.evaluate(pred)))
print("----------------------------------------------------------------")

print("Total time taken: ", total_time)