# Project: Fraud Detection 

## 1. Overview

### PaySim simulates mobile money transactions based on a sample of real transacions extracted from one month of financial logs from a mobile money service implemented in an African country. The original logs were provided by a multinational company, who is the provider of the mobile financial service which is currently running in more than 14 countries all around the world. The objective of the project is to predict if a transaction is fraudulent or not.

## 2. Preprocess the data

#### We´ll use PySpark to preprocess the data.

In [144]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from  pyspark.sql.functions import monotonically_increasing_id, desc, row_number
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
import functools
import seaborn as sb
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
from collections import Counter
from imblearn.over_sampling import RandomOverSampler
import matplotlib.pyplot as mpt
from pyspark.sql.types import IntegerType
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.sql.window import Window
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

#from sklearn.datasets import make_classification

In [3]:
# global variables

global df_bank, results 

In [4]:
# creation of the SparkSession

spark = SparkSession.builder.appName("FraudDetection").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/13 22:57:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# spark dataframe 

df = spark.read.csv('fraudDetection.csv', header=True)

                                                                                

#### Let´s take a look to the data with the first 10 rows.

In [6]:
df.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [7]:
df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



#### There are 11 columns, some of them are numerical and others are categorical. Let´s count the number of registers.

In [8]:
print(f"The total number of registers is:",df.count())

[Stage 2:>                                                        (0 + 10) / 10]

The total number of registers is: 6362620


                                                                                

#### We have more than six miliions of transactions in the dataset.

### 2.1 Feature Engineering

#### Firstly, we´ll create a function to create a new variable.

In [9]:
### 2.1.1.- creation of a new variable: type2

df_type2 = df.withColumn("type2",f.concat(f.substring("nameOrig",1,1),f.substring("nameDest",1,1)))

In [10]:
df_type2.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|   CM|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|   CC|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|   CC|
|   1| PAYMENT|11668.14|C2048537720|     

#### We´ve created a new column named "type2" which is composed by the first character of the column "nameOrig" and the first character of the column "nameDest"

In [11]:
### 2.1.2.1.- One Hot Encoding: column "type"

df_type2.show(3)

+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+
|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|
|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|   CM|
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|   CC|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+
only showing top 3 rows



#### We´ll use some libraries of Spark for Machine Learning (SparkML).

In [12]:
### StringIndexer Initialization
### column: type

indexer_type = StringIndexer(inputCol="type",outputCol="types_indexed")
indexerModel_type = indexer_type.fit(df_type2)


                                                                                

In [13]:
### Transform the DataFrame using the fitted StringIndexer model

indexed_df_type2 = indexerModel_type.transform(df_type2)
indexed_df_type2.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|   CM|          1.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|   CC|          3.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|  

#### Here, we´ve set each of the elements of the "type" column into indexes.

In [14]:
### apply One-Hot-Encoding to the indexed column, that is, 
### "types_indexed"

encoder_type = OneHotEncoder(dropLast=False, inputCol="types_indexed", outputCol="types_onehot")
encoder_type_df = encoder_type.fit(indexed_df_type2).transform(indexed_df_type2)
encoder_type_df.show(truncate=False)


+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+
|step|type    |amount   |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed|types_onehot |
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+
|1   |PAYMENT |9839.64  |C1231006815|170136.0     |160296.36     |M1979787155|0.0           |0.0           |0      |0             |CM   |1.0          |(5,[1],[1.0])|
|1   |PAYMENT |1864.28  |C1666544295|21249.0      |19384.72      |M2044282225|0.0           |0.0           |0      |0             |CM   |1.0          |(5,[1],[1.0])|
|1   |TRANSFER|181.0    |C1305486145|181.0        |0.0           |C553264065 |0.0           |0.0           |1      |0             |CC   |3.0          |(5,[3],[1.0])|
|1  

In [15]:
encoder_type_df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)
 |-- type2: string (nullable = true)
 |-- types_indexed: double (nullable = false)
 |-- types_onehot: vector (nullable = true)



In [16]:
encoder_type_df_split = encoder_type_df.select('*',vector_to_array('types_onehot').alias('types_onehot_split'))
encoder_type_df_split.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed| types_onehot|  types_onehot_split|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|(5,[1],[1.0])|[0.0, 1.0, 0.0, 0...|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|   CM|          1.0|(5,[1],[1.0])|[0.0, 1.0, 0.0, 0...|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553

In [17]:
### now, we´ll split the "types_onehot_split" into five columns, one per category

num_categories = len(encoder_type_df_split.first()['types_onehot_split'])
cols_expanded = [(f.col('types_onehot_split')[i].alias(f"{indexerModel_type.labels[i]}")) for i in range(num_categories)]
type_df = encoder_type_df_split.select('*',*cols_expanded)


In [18]:
type_df.show(100)

+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+
|step|    type|    amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed| types_onehot|  types_onehot_split|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT|
+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+
|   1| PAYMENT|   9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|(5,[1],[1.0])|[0.0, 1.0, 0.0, 0...|     0.0|    1.0|    0.0|     0.0|  0.0|
|   1| PAYMENT|   1864.28|C1666544295|      21249.0|      19384.72|M2044282225|         

#### We´ve applied One-Hot-Encoding to the column "type" resulting in five new columns:
+ CASH_OUT
+ CASH_IN
+ PAYMENT
+ TRANSFER 
+ DEBIT

#### Now, we´ll apply this procedure to the column "type2".

In [19]:
### 2.1.2.2.- One Hot Encoding: column "type2"

type_df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed| types_onehot|  types_onehot_split|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|(5,[1],[1.0])|[0.0, 1.0, 0.0, 0...|     0.0|    1.0|    0.0|     0.0|  0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|    

In [20]:
### StringIndexer Initialization
### column: type2

indexer_type = StringIndexer(inputCol="type2",outputCol="types_indexed2")
indexerModel_type = indexer_type.fit(type_df)

                                                                                

In [21]:
### Transform the DataFrame using the fitted StringIndexer model

indexed_df_type = indexerModel_type.transform(type_df)
indexed_df_type.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed| types_onehot|  types_onehot_split|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT|types_indexed2|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|(5,[1],[1.0])|[0.0, 1.0, 0.0, 0...|     0.0|    1.0|    0.0|     0.0|  0.0|           1.0|
|   1| PAYMENT| 1864.28|C1666544295|

In [22]:
### apply One-Hot-Encoding to the indexed column, that is, 
### "types_indexed2"

encoder_type2 = OneHotEncoder(dropLast=False, inputCol="types_indexed2", outputCol="types_onehot2")
encoder_type2_df = encoder_type2.fit(indexed_df_type).transform(indexed_df_type)
encoder_type2_df.show(truncate=False)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+-------------------------+--------+-------+-------+--------+-----+--------------+-------------+
|step|type    |amount   |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed|types_onehot |types_onehot_split       |CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT|types_indexed2|types_onehot2|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+-------------------------+--------+-------+-------+--------+-----+--------------+-------------+
|1   |PAYMENT |9839.64  |C1231006815|170136.0     |160296.36     |M1979787155|0.0           |0.0           |0      |0             |CM   |1.0          |(5,[1],[1.0])|[0.0, 1.0, 0.0, 0.0, 0.0]|0.0     |1.0    |0.0    |0

In [23]:
encoder_type2_df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)
 |-- type2: string (nullable = true)
 |-- types_indexed: double (nullable = false)
 |-- types_onehot: vector (nullable = true)
 |-- types_onehot_split: array (nullable = false)
 |    |-- element: double (containsNull = false)
 |-- CASH_OUT: double (nullable = true)
 |-- PAYMENT: double (nullable = true)
 |-- CASH_IN: double (nullable = true)
 |-- TRANSFER: double (nullable = true)
 |-- DEBIT: double (nullable = true)
 |-- types_indexed2: double (nullable = false)
 |-- types_onehot2: vector (nullable = true)



In [24]:
encoder_type2_df_split = encoder_type2_df.select('*',vector_to_array('types_onehot2').alias('types_onehot_split2'))
encoder_type2_df_split.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+--------------+-------------+-------------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed| types_onehot|  types_onehot_split|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT|types_indexed2|types_onehot2|types_onehot_split2|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+--------------+-------------+-------------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|(5,[1],[1.0])|[0.0, 1.0, 

In [25]:
### now, we´ll split the "types_onehot_split2" into two columns, one per category

num_categories = len(encoder_type2_df_split.first()['types_onehot_split2'])
cols_expanded = [(f.col('types_onehot_split2')[i].alias(f"{indexerModel_type.labels[i]}")) for i in range(num_categories)]
encoder_type2_df_split = encoder_type2_df_split.select('*',*cols_expanded)

In [26]:
encoder_type2_df_split.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+--------------+-------------+-------------------+---+---+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type2|types_indexed| types_onehot|  types_onehot_split|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT|types_indexed2|types_onehot2|types_onehot_split2| CC| CM|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-----+-------------+-------------+--------------------+--------+-------+-------+--------+-----+--------------+-------------+-------------------+---+---+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|   CM|          1.0|(

#### We´ve split the "type2" column into two columns based on One-Hot-Encoding. Now, we´ll eliminate some unnecessaruy columns. Let´s check out all the columns.

In [27]:
encoder_type2_df_split.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)
 |-- type2: string (nullable = true)
 |-- types_indexed: double (nullable = false)
 |-- types_onehot: vector (nullable = true)
 |-- types_onehot_split: array (nullable = false)
 |    |-- element: double (containsNull = false)
 |-- CASH_OUT: double (nullable = true)
 |-- PAYMENT: double (nullable = true)
 |-- CASH_IN: double (nullable = true)
 |-- TRANSFER: double (nullable = true)
 |-- DEBIT: double (nullable = true)
 |-- types_indexed2: double (nullable = false)
 |-- types_onehot2: vector (nullable = true)
 |-- types_onehot_split2

#### Now, we´ll eliminate the unnecessary columns:
+ nameOrig
+ nameDest
+ isFlaggedFraud
+ newbalanceDest
+ oldbalanceDest
+ oldbalanceOrg
+ newbalanceOrig 
+ types_indexed
+ types_onehot
+ types_onehot_split
+ types_indexed2
+ types_onehot2
+ types_onehot_split2
+ type
+ type2

In [29]:
df_bank = encoder_type2_df_split.drop("nameOrig","nameDest","isFlaggedFraud","newbalanceDest","oldbalanceDest",
                       "oldbalanceOrg","newbalanceOrig","type","types_indexed","types_onehot",
                       "types_onehot_split","type2","types_indexed2","types_onehot2","types_onehot_split2" )
df_bank.show(5)

+----+--------+-------+--------+-------+-------+--------+-----+---+---+
|step|  amount|isFraud|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|
+----+--------+-------+--------+-------+-------+--------+-----+---+---+
|   1| 9839.64|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
|   1| 1864.28|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
|   1|   181.0|      1|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   1|   181.0|      1|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1|11668.14|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
+----+--------+-------+--------+-------+-------+--------+-----+---+---+
only showing top 5 rows



In [30]:
df_bank.count()

6362620

#### We can see that there are the same quantity of registers.

### 2.2 Data Cleaning

In [31]:
### 2.2.1.- Eliminate duplicated

num_all_rows = df_bank.count()
num_all_rows

6362620

In [32]:
num_duplicated_rows = df_bank.distinct().count() 

24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:17 WARN RowBasedKeyValueBatch: Calling spill() on

In [33]:
print(f"The total number of duplicated rows is:",num_all_rows - num_duplicated_rows)

The total number of duplicated rows is: 7597


#### We can see that there are 7597 duplicated rows.

In [34]:
df_bank = df_bank.dropDuplicates()

In [35]:
df_bank.count()

24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:33 WARN RowBasedKeyValueBatch: Calling spill() on

6355023

#### We can see the duplicated registers have been removed because there fewer registers than before.

In [36]:
### 2.2.2.- Eliminate null values

df_bank.dropna()

DataFrame[step: string, amount: string, isFraud: string, CASH_OUT: double, PAYMENT: double, CASH_IN: double, TRANSFER: double, DEBIT: double, CC: double, CM: double]

In [37]:
df_bank.count()

24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:45 WARN RowBasedKeyValueBatch: Calling spill() on

6355023

#### We can see that there were no null values in the dataset because the number of registers is the same. Let´s take a look at the "clean" dataset.

In [38]:
df_bank.show(10)

24/05/13 22:58:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:58:57 WARN RowBasedKeyValueBatch: Calling spill() on

+----+---------+-------+--------+-------+-------+--------+-----+---+---+
|step|   amount|isFraud|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|
+----+---------+-------+--------+-------+-------+--------+-----+---+---+
|   1| 51121.05|      0|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1|373068.26|      0|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1|  2643.45|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1| 95636.49|      0|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1| 21898.97|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1| 88987.11|      0|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1|  3545.37|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
|   1|  1122.14|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
|   1|  2637.07|      0|     0.0|    0.0|    0.0|     0.0|  1.0|1.0|0.0|
|   1| 16265.77|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
+----+---------+-------+--------+-------+-------+--

                                                                                

## 3. Exploratory Data Analysis (EDA)

### 3.1 Visualization

#### The visualization will be done using a functions which leverages the method histogram() of pyspark. 

In [39]:
# definition of the "histogram" function

def histogram(df, col, bins=10, xname=None, yname=None):
    
    '''
    This function makes a histogram from spark dataframe named 
    df for column name col. 
    '''
    
    # Calculating histogram in Spark 
    vals = df.select(col).rdd.flatMap(lambda x: x).histogram(bins)
    
    # Preprocessing histogram points and locations 
    width = vals[0][1] - vals[0][0]
    loc = [vals[0][0] + (i+1) * width for i in range(len(vals[1]))]
    
    # Making a bar plot 
    mpt.bar(loc, vals[1], width=width)
    mpt.xlabel(col)
    mpt.ylabel(yname)
    mpt.show()

In [40]:
df_bank.printSchema()

root
 |-- step: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- CASH_OUT: double (nullable = true)
 |-- PAYMENT: double (nullable = true)
 |-- CASH_IN: double (nullable = true)
 |-- TRANSFER: double (nullable = true)
 |-- DEBIT: double (nullable = true)
 |-- CC: double (nullable = true)
 |-- CM: double (nullable = true)



#### There are some features that need to be converted to integers such as "step","amount" and "isFraud".

In [41]:
# convert string columns into integer columns

df_bank = df_bank.withColumn("step",df_bank["step"].cast(IntegerType()))

In [42]:
df_bank = df_bank.withColumn("amount",df_bank["amount"].cast(IntegerType()))

In [43]:
df_bank = df_bank.withColumn("isFraud",df_bank["isFraud"].cast(IntegerType()))

In [44]:
df_bank.printSchema()

root
 |-- step: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- CASH_OUT: double (nullable = true)
 |-- PAYMENT: double (nullable = true)
 |-- CASH_IN: double (nullable = true)
 |-- TRANSFER: double (nullable = true)
 |-- DEBIT: double (nullable = true)
 |-- CC: double (nullable = true)
 |-- CM: double (nullable = true)



#### We´ve seen that all the features are "integer" types now. Therefore, we´re able to perform various visualizations with the histogram method. That´s what we´ll do next.

In [45]:
# histogram: "step"

histogram(df_bank, 'step', bins=15, yname='frequency')

24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:06 WARN RowBasedKeyValueBatch: Calling spill() on

KeyboardInterrupt: 

24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 22:59:08 WARN RowBasedKeyValueBatch: Calling spill() on

In [None]:
# histogram: "amount"

histogram(df_bank, 'amount', bins=15, yname='frequency')

In [None]:
# histogram: "Debit"

histogram(df_bank, 'Debit', bins=15, yname='frequency')


In [None]:
# histogram: "Payment"

histogram(df_bank, 'Payment', bins=15, yname='frequency')


In [None]:
# histogram: "CASH_OUT"

histogram(df_bank, 'CASH_OUT', bins=15, yname='frequency')


In [None]:
# histogram: "CASH_IN"

histogram(df_bank, 'CASH_IN', bins=15, yname='frequency')


In [None]:
# histogram: "TRANSFER"

histogram(df_bank, 'TRANSFER', bins=15, yname='frequency')


In [None]:
# histogram: "CC"

histogram(df_bank, 'CC', bins=15, yname='frequency')


In [None]:
# histogram: "CM"

histogram(df_bank, 'CM', bins=15, yname='frequency')

In [None]:
# histogram: "isFraud"

histogram(df_bank, 'isFraud', bins=15, yname='frequency')

#### Remember that our label is "isFraud", therefore, we can see that this class is unbalanced as we can see from the previous graphic. We need to perform an **Oversampling** through ***Data Balancing*** using *pyspark*. In this part, we´ll transform the dataset into a parquet file.

In [None]:
# we "write" a parquet file, that is, create a parquet file with the same data

##df_bank.write.parquet("/Users/alexangelbracho/Desktop/GitHub_projects/FraudDetection/Fraud-Detection-Project/fraudDetection.parquet")

In [46]:
# we read the parquet file

df_bank_par = spark.read.parquet("fraudDetection.parquet")

                                                                                

In [None]:
df_bank_par.printSchema()

In [None]:
df_bank_par.show(5)

### 3.2 Data Balancing

In [47]:
### oversampling with "pysaprk"

minor_df = df_bank_par.filter(f.col("isFraud")==1)
major_df = df_bank_par.filter(f.col("isFraud")==0)

In [48]:
num_df_bank_par = df_bank_par.count()

                                                                                

In [49]:
num_df_bank_par

6355023

In [50]:
num_major_df = major_df.count()

                                                                                

In [51]:
num_major_df

6346920

In [52]:
ratio = int(major_df.count()/minor_df.count())

In [53]:
print("The ratio is:",ratio)

The ratio is: 783


In [54]:
a = range(ratio)

In [55]:
# let´s duplicate the minoriry rows

oversampled_df = minor_df.withColumn("dummy",f.explode(f.array([f.lit(x) for x in a]))).drop("dummy")

In [56]:
oversampled_df.show(5)

+----+------+-------+--------+-------+-------+--------+-----+---+---+
|step|amount|isFraud|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|
+----+------+-------+--------+-------+-------+--------+-----+---+---+
|   6| 17320|      1|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   6| 17320|      1|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   6| 17320|      1|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   6| 17320|      1|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   6| 17320|      1|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
+----+------+-------+--------+-------+-------+--------+-----+---+---+
only showing top 5 rows



In [57]:
oversampled_df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- CASH_OUT: double (nullable = true)
 |-- PAYMENT: double (nullable = true)
 |-- CASH_IN: double (nullable = true)
 |-- TRANSFER: double (nullable = true)
 |-- DEBIT: double (nullable = true)
 |-- CC: double (nullable = true)
 |-- CM: double (nullable = true)



In [58]:
# we drop the unnecessary columns in the "oversampled_df" dataframe

oversampled_df = oversampled_df.drop("step","amount","CASH_OUT","CASH_IN","PAYMENT","TRANSFER","DEBIT","CC","CM")

In [59]:
oversampled_df.printSchema()

root
 |-- isFraud: integer (nullable = true)



In [60]:
num_oversampled_df = oversampled_df.count()

In [61]:
num_oversampled_df

6344649

In [62]:
num_oversampled_df + num_major_df

12691569

#### We can realize that suming "oversampled_df" and "major_df" exceeds the total number of samples. Therefore, we need to low them down to the half at least.

In [63]:
# now, we need to aggregate indexes to the "oversampled_df" dataframe

oversampled_df = oversampled_df.withColumn("index",monotonically_increasing_id())
oversampled_df.show(5)

+-------+-----+
|isFraud|index|
+-------+-----+
|      1|    0|
|      1|    1|
|      1|    2|
|      1|    3|
|      1|    4|
+-------+-----+
only showing top 5 rows



In [64]:
oversampled_df.count()

6344649

In [65]:
# we create a view of the "oversampled_df" dataframe to use sparkSQL

oversampled_df.createOrReplaceTempView("isFraud")

24/05/13 22:59:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [66]:
limit_major_df = num_major_df / 2

In [67]:
limit_major_df

3173460.0

In [68]:
limit_oversampled_df = num_df_bank_par - limit_major_df

In [69]:
limit_oversampled_df = int(limit_oversampled_df)

In [70]:
limit_oversampled_df

3181563

In [71]:
type(limit_oversampled_df)

int

In [72]:
# we use this query to select some rows of the "oversampled_df" dataframe

query = f"SELECT * FROM isFraud LIMIT {limit_oversampled_df}"

In [73]:
oversampled_df = spark.sql(query)

In [74]:
oversampled_df.show(5)

+-------+-----+
|isFraud|index|
+-------+-----+
|      1|    0|
|      1|    1|
|      1|    2|
|      1|    3|
|      1|    4|
+-------+-----+
only showing top 5 rows



In [75]:
oversampled_df.count()

3181563

In [76]:
oversampled_df.show(10)

+-------+-----+
|isFraud|index|
+-------+-----+
|      1|    0|
|      1|    1|
|      1|    2|
|      1|    3|
|      1|    4|
|      1|    5|
|      1|    6|
|      1|    7|
|      1|    8|
|      1|    9|
+-------+-----+
only showing top 10 rows



In [77]:
# we drop the unnecessary columns in the "major_df" dataframe

major_df = major_df.drop("step","amount","CASH_OUT","CASH_IN","PAYMENT","TRANSFER","DEBIT","CC","CM")
major_df.show()

+-------+
|isFraud|
+-------+
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
+-------+
only showing top 20 rows



In [78]:
major_df.count()

6346920

In [79]:
# now, we need to aggregate indexes to the "major_df" dataframe

major_df = major_df.withColumn("index",monotonically_increasing_id())
major_df.show(5)

+-------+-----+
|isFraud|index|
+-------+-----+
|      0|    0|
|      0|    1|
|      0|    2|
|      0|    3|
|      0|    4|
+-------+-----+
only showing top 5 rows



In [80]:
major_df.count()

6346920

In [81]:
limit_major_df = int(limit_major_df)

In [82]:
limit_major_df

3173460

In [83]:
# we create a view from "major_df" dataframe to do some queries

major_df.createOrReplaceTempView("isFraud")

In [84]:
# we use this query to select some rows of the "major_df" dataframe

query = f"SELECT * FROM isFraud LIMIT {limit_major_df}"

In [85]:
major_df = spark.sql(query)

In [86]:
major_df.show(5)

+-------+-----+
|isFraud|index|
+-------+-----+
|      0|    0|
|      0|    1|
|      0|    2|
|      0|    3|
|      0|    4|
+-------+-----+
only showing top 5 rows



In [87]:
major_df.count()

3173460

In [88]:
combined_df = major_df.unionAll(oversampled_df)

In [89]:
combined_df.show(50)

+-------+-----+
|isFraud|index|
+-------+-----+
|      0|    0|
|      0|    1|
|      0|    2|
|      0|    3|
|      0|    4|
|      0|    5|
|      0|    6|
|      0|    7|
|      0|    8|
|      0|    9|
|      0|   10|
|      0|   11|
|      0|   12|
|      0|   13|
|      0|   14|
|      0|   15|
|      0|   16|
|      0|   17|
|      0|   18|
|      0|   19|
|      0|   20|
|      0|   21|
|      0|   22|
|      0|   23|
|      0|   24|
|      0|   25|
|      0|   26|
|      0|   27|
|      0|   28|
|      0|   29|
|      0|   30|
|      0|   31|
|      0|   32|
|      0|   33|
|      0|   34|
|      0|   35|
|      0|   36|
|      0|   37|
|      0|   38|
|      0|   39|
|      0|   40|
|      0|   41|
|      0|   42|
|      0|   43|
|      0|   44|
|      0|   45|
|      0|   46|
|      0|   47|
|      0|   48|
|      0|   49|
+-------+-----+
only showing top 50 rows



In [90]:
combined_df.count()

                                                                                

6355023

#### The previous table contains the former unbalanced data in the feature "isFraud"; this result says that we have the same number of registers than the original dataset. Let´s check out if the the class is already balanced in this case. 

In [91]:
class_1 = combined_df.filter(f.col("isFraud")==1)
class_0 = combined_df.filter(f.col("isFraud")==0)

In [92]:
class_1.show(50)

+-------+-----+
|isFraud|index|
+-------+-----+
|      1|    0|
|      1|    1|
|      1|    2|
|      1|    3|
|      1|    4|
|      1|    5|
|      1|    6|
|      1|    7|
|      1|    8|
|      1|    9|
|      1|   10|
|      1|   11|
|      1|   12|
|      1|   13|
|      1|   14|
|      1|   15|
|      1|   16|
|      1|   17|
|      1|   18|
|      1|   19|
|      1|   20|
|      1|   21|
|      1|   22|
|      1|   23|
|      1|   24|
|      1|   25|
|      1|   26|
|      1|   27|
|      1|   28|
|      1|   29|
|      1|   30|
|      1|   31|
|      1|   32|
|      1|   33|
|      1|   34|
|      1|   35|
|      1|   36|
|      1|   37|
|      1|   38|
|      1|   39|
|      1|   40|
|      1|   41|
|      1|   42|
|      1|   43|
|      1|   44|
|      1|   45|
|      1|   46|
|      1|   47|
|      1|   48|
|      1|   49|
+-------+-----+
only showing top 50 rows



In [93]:
class_1.count()

3181563

In [94]:
class_0.show(50)

[Stage 115:>                                                      (0 + 10) / 11]

+-------+-----+
|isFraud|index|
+-------+-----+
|      0|    0|
|      0|    1|
|      0|    2|
|      0|    3|
|      0|    4|
|      0|    5|
|      0|    6|
|      0|    7|
|      0|    8|
|      0|    9|
|      0|   10|
|      0|   11|
|      0|   12|
|      0|   13|
|      0|   14|
|      0|   15|
|      0|   16|
|      0|   17|
|      0|   18|
|      0|   19|
|      0|   20|
|      0|   21|
|      0|   22|
|      0|   23|
|      0|   24|
|      0|   25|
|      0|   26|
|      0|   27|
|      0|   28|
|      0|   29|
|      0|   30|
|      0|   31|
|      0|   32|
|      0|   33|
|      0|   34|
|      0|   35|
|      0|   36|
|      0|   37|
|      0|   38|
|      0|   39|
|      0|   40|
|      0|   41|
|      0|   42|
|      0|   43|
|      0|   44|
|      0|   45|
|      0|   46|
|      0|   47|
|      0|   48|
|      0|   49|
+-------+-----+
only showing top 50 rows



                                                                                

In [95]:
class_0.count()

3173460

#### We can see that the class "isFraud" is almost the same in this dataframe, resulting in the same number of samples in the original dataset. Now, we need to merge the original dataframe "df_bank_par" with "combined_pd" dataframe.

In [96]:
df_bank_par.show(10)

+----+------+-------+--------+-------+-------+--------+-----+---+---+
|step|amount|isFraud|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|
+----+------+-------+--------+-------+-------+--------+-----+---+---+
|   1|358831|      0|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   1| 60836|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|349505|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|206097|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|   636|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
|   1|172986|      0|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1| 10856|      0|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1|154442|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1| 18622|      0|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|  8787|      0|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
+----+------+-------+--------+-------+-------+--------+-----+---+---+
only showing top 10 

In [97]:
df_bank_par = df_bank_par.drop("isFraud")

In [98]:
df_bank_par.show(10)

+----+------+--------+-------+-------+--------+-----+---+---+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|
+----+------+--------+-------+-------+--------+-----+---+---+
|   1|358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|
|   1| 60836|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|349505|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|206097|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|   636|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
|   1|172986|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1| 10856|     1.0|    0.0|    0.0|     0.0|  0.0|1.0|0.0|
|   1|154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1| 18622|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|
|   1|  8787|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|
+----+------+--------+-------+-------+--------+-----+---+---+
only showing top 10 rows



In [99]:
combined_df.count(), df_bank_par.count()

                                                                                

(6355023, 6355023)

In [100]:
combined_df.printSchema()

root
 |-- isFraud: integer (nullable = true)
 |-- index: long (nullable = false)



In [101]:
df_bank_par.printSchema()

root
 |-- step: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- CASH_OUT: double (nullable = true)
 |-- PAYMENT: double (nullable = true)
 |-- CASH_IN: double (nullable = true)
 |-- TRANSFER: double (nullable = true)
 |-- DEBIT: double (nullable = true)
 |-- CC: double (nullable = true)
 |-- CM: double (nullable = true)



In [102]:
# now, we need to aggregate indexes to the "df_bank_par" dataframe

df_bank_par = df_bank_par.withColumn("index",monotonically_increasing_id())
df_bank_par.show(5)

+----+------+--------+-------+-------+--------+-----+---+---+-----+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|index|
+----+------+--------+-------+-------+--------+-----+---+---+-----+
|   1|358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|    0|
|   1| 60836|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|    1|
|   1|349505|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|    2|
|   1|206097|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|    3|
|   1|   636|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|    4|
+----+------+--------+-------+-------+--------+-----+---+---+-----+
only showing top 5 rows



In [103]:
df_bank_par = df_bank_par.join(combined_df,on=['index']).drop('index')

In [104]:
df_bank_par.show(50)

[Stage 147:>                                                        (0 + 2) / 2]

+----+------+--------+-------+-------+--------+-----+---+---+-------+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|
+----+------+--------+-------+-------+--------+-----+---+---+-------+
|   1|358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      0|
|   1|358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      1|
|   1|154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      0|
|   1|154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      1|
|   2| 31876|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      0|
|   2| 31876|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      1|
|   2|  8048|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   2|  8048|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   3| 10728|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   3| 10728|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   4|  3581|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   4|  3581|     0.

                                                                                

#### Let´s check out again the number of samples of each class in the feature "isFraud" (label) in this dataset.

In [105]:
class_1 = df_bank_par.filter(f.col("isFraud")==1)
class_0 = df_bank_par.filter(f.col("isFraud")==0)

In [106]:
df_bank_par.show(50)

[Stage 158:>                                                        (0 + 2) / 2]

+----+------+--------+-------+-------+--------+-----+---+---+-------+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|
+----+------+--------+-------+-------+--------+-----+---+---+-------+
|   1|358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      0|
|   1|358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      1|
|   1|154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      0|
|   1|154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      1|
|   2| 31876|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      0|
|   2| 31876|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      1|
|   2|  8048|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   2|  8048|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   3| 10728|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   3| 10728|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   4|  3581|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   4|  3581|     0.

                                                                                

In [107]:
type(df_bank_par)

pyspark.sql.dataframe.DataFrame

In [108]:
df_bank_par.count()

                                                                                

Py4JJavaError: An error occurred while calling o1165.count.
: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.notEnoughMemoryToBuildAndBroadcastTableError(QueryExecutionErrors.scala:2212)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:187)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:224)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:219)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


In [109]:
class_0.show(50)



+----+-------+--------+-------+-------+--------+-----+---+---+-------+
|step| amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|
+----+-------+--------+-------+-------+--------+-----+---+---+-------+
|   1| 358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      0|
|   1| 154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      0|
|   2|  31876|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      0|
|   2|   8048|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   3|  10728|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   4|   3581|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   6|   9381|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   7|   8014|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   7| 968033|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      0|
|   7|  12667|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|   8|   2143|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|
|  10|

                                                                                

In [None]:
type(class_0)

In [None]:
class_0.count()

In [110]:
class_1.show(50)



+----+-------+--------+-------+-------+--------+-----+---+---+-------+
|step| amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|
+----+-------+--------+-------+-------+--------+-----+---+---+-------+
|   1| 358831|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      1|
|   1| 154442|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      1|
|   2|  31876|     0.0|    0.0|    1.0|     0.0|  0.0|1.0|0.0|      1|
|   2|   8048|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   3|  10728|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   4|   3581|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   6|   9381|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   7|   8014|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   7| 968033|     0.0|    0.0|    0.0|     1.0|  0.0|1.0|0.0|      1|
|   7|  12667|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|   8|   2143|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|
|  10|

                                                                                

In [None]:
class_1.count()

#### Our latest valid and "clean" dataframe is *df_bank_par* as follows:

In [None]:
df_bank_par.show(10)

#### We have a balanced class in "isFraud". Let´s check out with a histogram.

In [None]:
df_bank_par.printSchema()

In [None]:
type(df_bank_par)

In [None]:
## convert pyspark datafrane to pandas dataframe (optional)

##df_bank_par_pd = df_bank_par.toPandas()

In [None]:
### histogram: "isFraud"

###histogram(df_bank_par, 'isFraud', bins=15, yname='frequency')

In [None]:
type(df_bank_par)

In [None]:
df_bank_par.show(5)

In [None]:
df_bank_par.printSchema()


#### If we want to transform this pyspark "dataframe" df_bank_par into a pandas dataframe we can use the method to_pandas_on_spark.

In [111]:
# pandas dataframe

df_bank_par_pandas = df_bank_par.to_pandas_on_spark()
df_bank_par_pandas.head(10)

24/05/13 23:00:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Unnamed: 0,step,amount,CASH_OUT,PAYMENT,CASH_IN,TRANSFER,DEBIT,CC,CM,isFraud
0,1,358831,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0
1,1,358831,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1
2,1,154442,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0
3,1,154442,0.0,0.0,1.0,0.0,0.0,1.0,0.0,1
4,2,31876,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0
5,2,31876,0.0,0.0,1.0,0.0,0.0,1.0,0.0,1
6,2,8048,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0
7,2,8048,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1
8,3,10728,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0
9,3,10728,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1


In [None]:
df_bank_par_pandas.describe()

In [None]:
type(df_bank_par_pandas)

#### Now, let´s create a correlation graphic to understand more the relation between the variables.

In [115]:
df_bank_par.count()

24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:07 WARN RowBasedKeyValueBatch: Calling spill() on

6074421

In [113]:
# let´s remove the null values and duplicated values from the df_bank_par dataframe

df_bank_par = df_bank_par.dropna()

In [114]:
df_bank_par = df_bank_par.dropDuplicates()

#### Let´s create a function to find a correlation between the target variable "isFraud" and the features. 

In [116]:
# definition of the function "correlation_df"

def correlation_df(df,target_var,feature_cols, method):
    # assemble features into a vector
    target_var = [target_var]
    feature_cols = feature_cols
    df_cor = df.select(target_var + feature_cols)
    assembler = VectorAssembler(inputCols=target_var + feature_cols, outputCol="features")
    df_cor = assembler.transform(df_cor)

    # calculate correlation matrix
    correlation_matrix = Correlation.corr(df_cor, "features", method =method).head()[0]

    # extract the correlation coefficient between target and each feature
    target_corr_list = [correlation_matrix[i,0] for i in range(len(feature_cols)+1)][1:]

    # create a Dataframe with target variable, feature names and correlation coefficients
    correlation_data = [(feature_cols[i],float(target_corr_list[i])) for i in range(len(feature_cols))]

    correlation_df = spark.createDataFrame(correlation_data, ["feature","correlation"] )

    correlation_df = correlation_df.withColumn("abs_correlation",f.abs("correlation"))

    # print the result
    return correlation_df


In [None]:
target = "isFraud"

indep_cols = [x for x in df_bank_par.columns if x not in [target] ]

corr_values_df = correlation_df(df=df_bank_par, target_var= target, feature_cols= indep_cols, method='pearson')

print(f"The corelation between {target} and the other features is: ")

corr_values_df.show()


In [None]:
target = "amount"

indep_cols = [x for x in df_bank_par.columns if x not in [target] ]

corr_values_df = correlation_df(df=df_bank_par, target_var= target, feature_cols= indep_cols, method='pearson')

print(f"The corelation between {target} and the other features is: ")

corr_values_df.show()


In [117]:
target = "step"

indep_cols = [x for x in df_bank_par.columns if x not in [target] ]

corr_values_df = correlation_df(df=df_bank_par, target_var= target, feature_cols= indep_cols, method='pearson')

print(f"The corelation between {target} and the other features is: ")

corr_values_df.show()

24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:02:37 WARN RowBasedKeyValueBatch: Calling spill() on

The corelation between step and the other features is: 


                                                                                

+--------+--------------------+--------------------+
| feature|         correlation|     abs_correlation|
+--------+--------------------+--------------------+
|  amount| 0.01658219979913117| 0.01658219979913117|
|CASH_OUT|-0.01777480265254028| 0.01777480265254028|
| PAYMENT|0.015392054905182785|0.015392054905182785|
| CASH_IN|0.001488787061744...|0.001488787061744...|
|TRANSFER|0.001999575610757...|0.001999575610757...|
|   DEBIT|0.002118837746395...|0.002118837746395...|
|      CC|-0.01539205490514...|0.015392054905146698|
|      CM|0.015392054905182785|0.015392054905182785|
| isFraud|7.243720138771031E-4|7.243720138771031E-4|
+--------+--------------------+--------------------+



## 4. Construction of models

## A. Random Forest

### 4.1 train/test split

In [None]:
df_bank_par.printSchema()

In [None]:
#y = df_bank_par["isFraud"]  # variable "y"
#x = df_bank_par["CC"]   # variable "x"
#train_x, test_y, train_y , test_y = train_test_split(x,y,test_size=0.3, random_state=77)

In [118]:
train,test = df_bank_par.randomSplit([0.7,0.3])

In [119]:
type(train) , type(test)

(pyspark.sql.dataframe.DataFrame, pyspark.sql.dataframe.DataFrame)

In [None]:
train.show(10)

In [120]:
train.count()

24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:44 WARN RowBasedKeyValueBatch: Calling spill() on

4251531

In [121]:
test.count()

24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:03:59 WARN RowBasedKeyValueBatch: Calling spill() on

1822890

### 4.2 Training of the model: Random Forest

#### Let´s apply random forest to the dataset. 

In [125]:
# let´s make this dataset as a vector

columns = ['step','amount','CASH_OUT','PAYMENT','CASH_IN','TRANSFER','DEBIT','CC','CM','isFraud']

assembler = VectorAssembler(inputCols=columns, outputCol='features')

train = assembler.transform(train)

train.show(10)

24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:12:02 WARN RowBasedKeyValueBatch: Calling spill() on

+----+------+--------+-------+-------+--------+-----+---+---+-------+--------------------+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|            features|
+----+------+--------+-------+-------+--------+-----+---+---+-------+--------------------+
|   1|    79|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|   121|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|   141|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|   207|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|   320|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|   365|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|   402|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|   471|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|

                                                                                

In [132]:
columns = ['step','amount','CASH_OUT','PAYMENT','CASH_IN','TRANSFER','DEBIT','CC','CM','isFraud']

assembler = VectorAssembler(inputCols=columns, outputCol='features')

test = assembler.transform(test)

test.show(10)

24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:19:36 WARN RowBasedKeyValueBatch: Calling spill() on

+----+------+--------+-------+-------+--------+-----+---+---+-------+--------------------+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|            features|
+----+------+--------+-------+-------+--------+-----+---+---+-------+--------------------+
|   1|   121|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|   353|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|   798|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|   895|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|   942|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|  1006|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|
|   1|  1091|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|
|   1|  2211|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|

                                                                                

In [126]:
type(train)

pyspark.sql.dataframe.DataFrame

In [128]:
# train the model "random forest" (rf)

rf = RandomForestClassifier(featuresCol='features', labelCol='isFraud')
model_RF = rf.fit(train)

24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:13:33 WARN RowBasedKeyValueBatch: Calling spill() on

In [129]:
type(model_RF)

pyspark.ml.classification.RandomForestClassificationModel

In [133]:
# make predictions of the random forest model

predictions = model_RF.transform(test)


In [134]:
type(predictions)

pyspark.sql.dataframe.DataFrame

In [135]:
predictions.show(50)

24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:21:26 WARN RowBasedKeyValueBatch: Calling spill() on

+----+------+--------+-------+-------+--------+-----+---+---+-------+--------------------+--------------------+--------------------+----------+
|step|amount|CASH_OUT|PAYMENT|CASH_IN|TRANSFER|DEBIT| CC| CM|isFraud|            features|       rawPrediction|         probability|prediction|
+----+------+--------+-------+-------+--------+-----+---+---+-------+--------------------+--------------------+--------------------+----------+
|   1|   121|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|[2.1237789279892,...|[0.10618894639946...|       1.0|
|   1|   353|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      0|(10,[0,1,3,8],[1....|[18.1237789279892...|[0.90618894639946...|       0.0|
|   1|   798|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|[2.1237789279892,...|[0.10618894639946...|       1.0|
|   1|   895|     0.0|    1.0|    0.0|     0.0|  0.0|0.0|1.0|      1|(10,[0,1,3,8,9],[...|[2.1237789279892,...|[0.10618894639946...|    

                                                                                

#### We can clearly compare the actual values and predicted values with the output below:

In [136]:
predictions.select("isFraud","prediction").show(50)

24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:26:45 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+----------+
|isFraud|prediction|
+-------+----------+
|      1|       1.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      1|    

                                                                                

#### At a glance we can see that the predicted values are the same of the actual values, at least for the first fifty registers.

### 4.3 Evaluation of the model

#### We need to evaluate our random forest machine learning algorithm.

In [138]:
evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)

24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:33:01 WARN RowBasedKeyValueBatch: Calling spill() on

In [139]:
type(accuracy)

float

In [141]:
print(f"The accuracy is {accuracy}")

The accuracy is 1.0


In [143]:
Test_Error = (1 - accuracy)
print(f"The Test Error is {Test_Error}")


The Test Error is 0.0


#### Let´s check out the Consufion Matrix.

In [146]:
preds_and_labels = predictions.select(["prediction","isFraud"])
preds_and_labels = preds_and_labels.withColumn("isFraud", f.col("isFraud").cast(FloatType())).orderBy("prediction")

In [147]:
preds_and_labels.show(20)

24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 23:52:27 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+-------+
|prediction|isFraud|
+----------+-------+
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
+----------+-------+
only showing top 20 rows



In [148]:
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

                                                                                

In [149]:
type(metrics)

pyspark.mllib.evaluation.MulticlassMetrics

In [152]:
print("The Confusion Matrix is:")

metrics.confusionMatrix().toArray()

The Confusion Matrix is:


array([[922791.,      0.],
       [     0., 900099.]])

#### According to the confusion matrix, all the actual values will be correctly predicted. It may mean an Overfitting.

## B. Logistic Regression

## 5. Storage

### 5.1 Model

In [None]:
def saveModel(name_model):
    model_file = f"{name_model}" + ".pkl"

In [160]:
model_RF.save("rf.model")

In [None]:
model_RF.para

In [161]:
df_bank_par.rdd.saveAsPickleFile("model")

24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/14 00:45:37 WARN RowBasedKeyValueBatch: Calling spill() on

In [None]:
requirements.txt