UNIVERSIDADE ESTADUAL DO CEARÁ \
MESTRADO ACADÊMICO EM CIÊNCIA DA COMPUTAÇÃO \
MINERAÇÃO MASSIVA DE DADOS

Daniel Gleison Moreira Lira \
daniel.gleison@aluno.uece.br

Atualizado em 23/10/2020



# Mecanismo de predição de fraudes financeiras utilizando aprendizado de máquina e computação distribuída
---


## Database

https://www.kaggle.com/ntnu-testimon/paysim1

PS_20174392719_1491204439457_log.csv\
Date created: 2017-03-31

### Predicted attribute:
Class of fraud detection

### Number of Instances:
6.353.307

### Number of Attributes:
11 attributes (5 Decimal, 3 Integer, 3 String) and the class

### Attribute Information:

This is a sample of 1 row with headers explanation:

1,PAYMENT,1060.31,C429214117,1089.0,28.69,M1591654462,0.0,0.0,0,0

step - maps a unit of time in the real world. In this case 1 step is 1 hour of time. Total steps 744 (30 days simulation).

type - CASH-IN, CASH-OUT, DEBIT, PAYMENT and TRANSFER.

amount - amount of the transaction in local currency.

nameOrig - customer who started the transaction

oldbalanceOrg - initial balance before the transaction

newbalanceOrig - new balance after the transaction

nameDest - customer who is the recipient of the transaction

oldbalanceDest - initial balance recipient before the transaction. Note that there is not information for customers that start with M (Merchants).

newbalanceDest - new balance recipient after the transaction. Note that there is not information for customers that start with M (Merchants).

isFraud - This is the transactions made by the fraudulent agents inside the simulation. In this specific dataset the fraudulent behavior of the agents aims to profit by taking control or customers accounts and try to empty the funds by transferring to another account and then cashing out of the system.

isFlaggedFraud - The business model aims to control massive transfers from one account to another and flags illegal attempts. An illegal attempt in this dataset is an attempt to transfer more than 200.000 in a single transaction.

### Missing Attribute Values: 
None

### Class Distribution: 
2 Classes \
6.354.407 without fraud and 8.213 with fraud

## References:

1. E. A. Lopez-Rojas , A. Elmir, and S. Axelsson. "PaySim: A financial mobile money simulator for fraud detection". In: The 28th European Modeling and Simulation Symposium-EMSS, Larnaca, Cyprus. 2016

In [1]:
# Spark Lib
import findspark
findspark.init()

In [2]:
# Load libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils

from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import VectorAssembler, VectorIndexer

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC, OneVsRest

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

from pyspark.ml.linalg import Vectors
from pyspark.mllib.util import MLUtils

## SKLearn Lib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import cross_val_score
from joblib import dump, load
import pickle

import time
start_time = time.time()
%matplotlib inline

## Criação do ambiente Spark

In [3]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("MachineLearningFraud") \
        .getOrCreate()
spark

## Importação e compreensão dos dados

In [98]:
data_path='./data/'
df_original = spark.read.format('csv')\
                   .options(sep=',',header='true',inferschema='true').\
                   load(data_path+'PS_20174392719_1491204439457_log.csv')
df_original.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M123070170

## Balanceamento do dataset

In [5]:
df0 = df_original[df_original.isFraud==0]
df1 = df_original[df_original.isFraud==1]
df0.groupby('isFraud').count().show()
df1.groupby('isFraud').count().show()

+-------+-------+
|isFraud|  count|
+-------+-------+
|      0|6354407|
+-------+-------+

+-------+-----+
|isFraud|count|
+-------+-----+
|      1| 8213|
+-------+-----+



In [6]:
guessedFraction = 1.0
noOfSamples = df1.count()
df0 = df0.sample(True, guessedFraction).limit(noOfSamples)
df0.groupby('isFraud').count().show()

+-------+-----+
|isFraud|count|
+-------+-----+
|      0| 8213|
+-------+-----+



In [7]:
df_balanceado = df0.union(df1)
df_balanceado.count()

16426

## Análise exploratória dos dados

In [8]:
df_original = df_balanceado

In [9]:
df_original.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [10]:
df_original.count()

16426

In [11]:
df_original.groupBy('IsFraud').count().show()

+-------+-----+
|IsFraud|count|
+-------+-----+
|      1| 8213|
|      0| 8213|
+-------+-----+



In [12]:
df_original.groupBy('isFlaggedFraud').count().show()

+--------------+-----+
|isFlaggedFraud|count|
+--------------+-----+
|             1|   16|
|             0|16410|
+--------------+-----+



### Estatística descritiva

In [13]:
df_original.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
step,16426,185.98344088639962,238.10960108876253,1,743
type,16426,,,CASH_IN,TRANSFER
amount,16426,782382.2715463294,1839555.886107465,0.0,1.0E7
nameOrig,16426,,,C1000036340,C999864329
oldbalanceOrg,16426,1290386.3020224043,2968752.08120663,0.0,5.958504037E7
newbalanceOrig,16426,574737.2564872755,2140685.0270821685,0.0,4.958504037E7
nameDest,16426,,,C1000039615,M999221400
oldbalanceDest,16426,744096.2936320467,3034475.953579213,0.0,2.3623051682E8
newbalanceDest,16426,1190471.7977517343,3504874.881273131,0.0,2.3672649466E8


### Identificação de valores ausentes

In [14]:
from pyspark.sql.functions import isnull, when, count, col
df_original.select([count(when(isnull(c), c)).alias(c) for c in df_original.columns]).show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



### Matrix de correlação

In [15]:
df_original.toPandas().corr()

Unnamed: 0,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
step,1.0,0.314259,0.127043,-0.112525,-0.040873,0.033887,0.766185,0.046107
amount,0.314259,1.0,0.720727,0.144021,-0.006876,0.252523,0.372702,0.069244
oldbalanceOrg,0.127043,0.720727,1.0,0.784409,0.067099,0.201479,0.121025,0.068658
newbalanceOrig,-0.112525,0.144021,0.784409,1.0,0.125671,0.086783,-0.178614,0.105656
oldbalanceDest,-0.040873,-0.006876,0.067099,0.125671,1.0,0.899212,-0.065861,-0.007657
newbalanceDest,0.033887,0.252523,0.201479,0.086783,0.899212,1.0,0.025461,-0.010606
isFraud,0.766185,0.372702,0.121025,-0.178614,-0.065861,0.025461,1.0,0.031225
isFlaggedFraud,0.046107,0.069244,0.068658,0.105656,-0.007657,-0.010606,0.031225,1.0


## Transformação do dataset

### Indexação dos atributos

In [16]:
indexer = StringIndexer(inputCol='type', outputCol='indexType').fit(df_original)
df_indexado = indexer.transform(df_original)

indexer = StringIndexer(inputCol='isFraud', outputCol='label').fit(df_original)
df_indexado = indexer.transform(df_indexado)

labelReverse = IndexToString().setInputCol('label')
indexTypeReverse = IndexToString().setInputCol('indexType')

df_indexado.show(5)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+---------+-----+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|indexType|label|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+---------+-----+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|      2.0|  0.0|
|   1|PAYMENT|7861.64|C1912850431|    176087.23|     168225.59| M633326333|           0.0|           0.0|      0|             0|      2.0|  0.0|
|   1|PAYMENT|4024.36|C1265012928|       2671.0|           0.0|M1176932104|           0.0|           0.0|      0|             0|      2.0|  0.0|
|   1|  DEBIT|5337.77| C712410124|      41720.0|      36382.23| C195600860|       41898.0|      40348.79|      0|             0|  

### Exclusão de atributos

In [17]:
df_selecionado = df_indexado.drop('type','nameOrig','nameDest','isFraud','isFlaggedFraud')
df_selecionado.show(5)

+----+-------+-------------+--------------+--------------+--------------+---------+-----+
|step| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|indexType|label|
+----+-------+-------------+--------------+--------------+--------------+---------+-----+
|   1|9839.64|     170136.0|     160296.36|           0.0|           0.0|      2.0|  0.0|
|   1|7861.64|    176087.23|     168225.59|           0.0|           0.0|      2.0|  0.0|
|   1|4024.36|       2671.0|           0.0|           0.0|           0.0|      2.0|  0.0|
|   1|5337.77|      41720.0|      36382.23|       41898.0|      40348.79|      4.0|  0.0|
|   1|5337.77|      41720.0|      36382.23|       41898.0|      40348.79|      4.0|  0.0|
+----+-------+-------------+--------------+--------------+--------------+---------+-----+
only showing top 5 rows



## Seleção dos atributos

In [18]:
df_selecionado.show(5)

+----+-------+-------------+--------------+--------------+--------------+---------+-----+
|step| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|indexType|label|
+----+-------+-------------+--------------+--------------+--------------+---------+-----+
|   1|9839.64|     170136.0|     160296.36|           0.0|           0.0|      2.0|  0.0|
|   1|7861.64|    176087.23|     168225.59|           0.0|           0.0|      2.0|  0.0|
|   1|4024.36|       2671.0|           0.0|           0.0|           0.0|      2.0|  0.0|
|   1|5337.77|      41720.0|      36382.23|       41898.0|      40348.79|      4.0|  0.0|
|   1|5337.77|      41720.0|      36382.23|       41898.0|      40348.79|      4.0|  0.0|
+----+-------+-------------+--------------+--------------+--------------+---------+-----+
only showing top 5 rows



## Criação da matrix de classificação

In [19]:
ignore = ['label']
list = [x for x in df_selecionado.columns if x not in ignore]

assembler = VectorAssembler(
            inputCols= list,
            outputCol='features')

df_transformado = (assembler.transform(df_selecionado).select('label','features'))
df_transformado.show(truncate = False, n = 5)
df_transformado.count()

+-----+---------------------------------------------------+
|label|features                                           |
+-----+---------------------------------------------------+
|0.0  |[1.0,9839.64,170136.0,160296.36,0.0,0.0,2.0]       |
|0.0  |[1.0,7861.64,176087.23,168225.59,0.0,0.0,2.0]      |
|0.0  |[1.0,4024.36,2671.0,0.0,0.0,0.0,2.0]               |
|0.0  |[1.0,5337.77,41720.0,36382.23,41898.0,40348.79,4.0]|
|0.0  |[1.0,5337.77,41720.0,36382.23,41898.0,40348.79,4.0]|
+-----+---------------------------------------------------+
only showing top 5 rows



16426

## Divisão do dataset para treinamento e teste

In [20]:
train_sample = 0.7
test_sample = 0.3
seed = 1234

(train, test) = df_transformado.randomSplit([train_sample, test_sample],seed)

num_train = df_transformado.count() * train_sample
num_test = df_transformado.count() * test_sample

print('Percentual da base de treinamento', train_sample*100, '%')
print('Percentual da base de teste', test_sample*100, '%')
print('Quantidade de registros da base de treinamento:', train.count())
print('Quantidade de registros da base de treinamento:', test.count())

Percentual da base de treinamento 70.0 %
Percentual da base de teste 30.0 %
Quantidade de registros da base de treinamento: 11559
Quantidade de registros da base de treinamento: 4867


In [21]:
train.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 5773|
|  1.0| 5786|
+-----+-----+



In [22]:
test.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2440|
|  1.0| 2427|
+-----+-----+



## Treinamento, execução e avaliação dos modelos de predição

### Decision Tree (DT)

In [23]:
# Treinamento do modelo de predição
start_time = time.time()
trainer = DecisionTreeClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', probabilityCol='probability',\
                                 rawPredictionCol='rawPrediction', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,\
                                 maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='gini', seed=None)
model_dt = trainer.fit(train)
time_dt_train = time.time() - start_time

In [24]:
# Execução do modelo de predição na base de teste
start_time = time.time()
result_dt = model_dt.transform(test)
time_dt_pred = time.time() - start_time

In [25]:
# Cálculo da acurácia do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='accuracy')
accuracy_dt = evaluator.evaluate(result_dt) * 100

In [26]:
# Cálculo do recall do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedRecall')
recall_dt = evaluator.evaluate(result_dt) * 100

In [27]:
# Cálculo da precisão do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedPrecision')
precision_dt = evaluator.evaluate(result_dt) * 100

In [28]:
# Cálculo da F1 score do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='f1')
f1_dt = evaluator.evaluate(result_dt) * 100

In [29]:
# Matriz de confusão
y_true = result_dt.select("label").toPandas()
y_pred = result_dt.select("prediction").toPandas()
mc_dt = confusion_matrix(y_true, y_pred)
tn_dt, fp_dt, fn_dt, tp_dt = confusion_matrix(y_true, y_pred).ravel()
print(mc_dt)

[[2440    0]
 [  15 2412]]


In [30]:
# Exibição dos resultados
evaluator_dt = spark.createDataFrame(
    [(round(accuracy_dt,2), round(recall_dt,2), round(precision_dt,2), round(f1_dt,2),\
      int(fp_dt), int(fn_dt),\
      round(time_dt_train,2), round(time_dt_pred,2))],\
    ['acurácia','recall','precisão','f1 score',\
     'falso positivo', 'falso negativo',\
     'tempo treinamento','tempo predição'])
print("Resultados do modelo Decision Tree:")
evaluator_dt.show()

Resultados do modelo Decision Tree:
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|acurácia|recall|precisão|f1 score|falso positivo|falso negativo|tempo treinamento|tempo predição|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|   99.69| 99.69|   99.69|   99.69|             0|            15|             9.18|          0.09|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+



In [31]:
result_dt.show(truncate = False, n = 5)

+-----+-------------------------------------+-------------+-----------------------------------------+----------+
|label|features                             |rawPrediction|probability                              |prediction|
+-----+-------------------------------------+-------------+-----------------------------------------+----------+
|0.0  |(7,[0,1,4],[1.0,10782.94,100585.0])  |[1087.0,15.0]|[0.9863883847549909,0.013611615245009074]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[1087.0,15.0]|[0.9863883847549909,0.013611615245009074]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[1087.0,15.0]|[0.9863883847549909,0.013611615245009074]|0.0       |
|0.0  |(7,[0,1,4],[1.0,34629.16,38735.74])  |[1087.0,15.0]|[0.9863883847549909,0.013611615245009074]|0.0       |
|0.0  |(7,[0,1,4],[1.0,193605.38,249452.05])|[1087.0,15.0]|[0.9863883847549909,0.013611615245009074]|0.0       |
+-----+-------------------------------------+-------------+-------------------------------------

### Random Forest (RF)

In [32]:
# Treinamento do modelo de predição
start_time = time.time()
trainer = RandomForestClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', probabilityCol='probability',\
                                 rawPredictionCol='rawPrediction', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,\
                                 numTrees=50, featureSubsetStrategy='auto', seed=None, subsamplingRate=1.0,\
                                 maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='gini')
model_rf = trainer.fit(train)
time_rf_train = time.time() - start_time

In [33]:
# Execução do modelo de predição na base de teste
start_time = time.time()
result_rf = model_rf.transform(test)
time_rf_pred = time.time() - start_time

In [34]:
# Cálculo da acurácia do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='accuracy')
accuracy_rf = evaluator.evaluate(result_rf) * 100

In [35]:
# Cálculo do recall do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedRecall')
recall_rf = evaluator.evaluate(result_rf) * 100

In [36]:
# Cálculo da precisão do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedPrecision')
precision_rf = evaluator.evaluate(result_rf) * 100

In [37]:
# Cálculo da F1 score do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='f1')
f1_rf = evaluator.evaluate(result_rf) * 100

In [38]:
# Matriz de confusão
y_true = result_rf.select("label").toPandas()
y_pred = result_rf.select("prediction").toPandas()
mc_rf = confusion_matrix(y_true, y_pred)
tn_rf, fp_rf, fn_rf, tp_rf = confusion_matrix(y_true, y_pred).ravel()
print(mc_rf)

[[2440    0]
 [  14 2413]]


In [39]:
# Exibição dos resultados
evaluator_rf = spark.createDataFrame(
    [(round(accuracy_rf,2), round(recall_rf,2), round(precision_rf,2), round(f1_rf,2),\
      int(fp_rf), int(fn_rf),\
      round(time_rf_train,2), round(time_rf_pred,2))],\
    ['acurácia','recall','precisão','f1 score',\
     'falso positivo', 'falso negativo',\
     'tempo treinamento','tempo predição'])
print("Resultados do modelo Random Forest:")
evaluator_rf.show()

Resultados do modelo Random Forest:
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|acurácia|recall|precisão|f1 score|falso positivo|falso negativo|tempo treinamento|tempo predição|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|   99.71| 99.71|   99.71|   99.71|             0|            14|             9.96|          0.09|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+



In [40]:
result_rf.show(truncate = False, n = 5)

+-----+-------------------------------------+---------------------------------------+-----------------------------------------+----------+
|label|features                             |rawPrediction                          |probability                              |prediction|
+-----+-------------------------------------+---------------------------------------+-----------------------------------------+----------+
|0.0  |(7,[0,1,4],[1.0,10782.94,100585.0])  |[48.93452715114445,1.0654728488555614] |[0.9786905430228887,0.021309456977111223]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[48.93452715114445,1.0654728488555614] |[0.9786905430228887,0.021309456977111223]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[48.93452715114445,1.0654728488555614] |[0.9786905430228887,0.021309456977111223]|0.0       |
|0.0  |(7,[0,1,4],[1.0,34629.16,38735.74])  |[49.1429428335959,0.8570571664041015]  |[0.982858856671918,0.017141143328082028] |0.0       |
|0.0  |(7,[0,1,4],[1.0,1936

### Neural Network Perceptron (NNP)

In [41]:
# Treinamento do modelo de predição
start_time =  time.time()
layers = [7, 5, 5, 2]
trainer = MultilayerPerceptronClassifier(featuresCol='features', labelCol='label',\
          maxIter=100, layers=layers, blockSize=128, seed=1234)
model_nnp = trainer.fit(train)
time_nnp_train = time.time() - start_time

In [42]:
# Execução do modelo de predição na base de teste
start_time =  time.time()
result_nnp = model_nnp.transform(test)
time_nnp_pred = time.time() - start_time

In [43]:
# Cálculo da acurácia do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='accuracy')
accuracy_nnp = evaluator.evaluate(result_nnp) * 100

In [44]:
# Cálculo do recall do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedRecall')
recall_nnp = evaluator.evaluate(result_nnp) * 100

In [45]:
# Cálculo da precisão do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedPrecision')
precision_nnp = evaluator.evaluate(result_nnp) * 100

In [46]:
# Cálculo da F1 score do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='f1')
f1_nnp = evaluator.evaluate(result_nnp) * 100

In [47]:
# Matriz de confusão
y_true = result_nnp.select("label").toPandas()
y_pred = result_nnp.select("prediction").toPandas()
mc_nnp = confusion_matrix(y_true, y_pred)
tn_nnp, fp_nnp, fn_nnp, tp_nnp = confusion_matrix(y_true, y_pred).ravel()
print(mc_nnp)

[[1902  538]
 [  47 2380]]


In [48]:
# Exibição dos resultados
evaluator_nnp = spark.createDataFrame(
    [(round(accuracy_nnp,2), round(recall_nnp,2), round(precision_nnp,2), round(f1_nnp,2),\
      int(fp_nnp), int(fn_nnp),\
      round(time_nnp_train,2), round(time_nnp_pred,2))],\
    ['acurácia','recall','precisão','f1 score',\
     'falso positivo', 'falso negativo',\
     'tempo treinamento','tempo predição'])
print("Resultados do modelo Neural Network Perceptron:")
evaluator_nnp.show()

Resultados do modelo Neural Network Perceptron:
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|acurácia|recall|precisão|f1 score|falso positivo|falso negativo|tempo treinamento|tempo predição|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|   87.98| 87.98|    89.6|   87.86|           538|            47|            18.01|          0.07|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+



In [49]:
result_nnp.show(truncate = False, n = 5)

+-----+-------------------------------------+-----------------------------------------+---------------------------------------+----------+
|label|features                             |rawPrediction                            |probability                            |prediction|
+-----+-------------------------------------+-----------------------------------------+---------------------------------------+----------+
|0.0  |(7,[0,1,4],[1.0,10782.94,100585.0])  |[1.133290816479007,-0.016314562083979967]|[0.759438830350663,0.24056116964933716]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[1.133290816479007,-0.016314562083979967]|[0.759438830350663,0.24056116964933716]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[1.133290816479007,-0.016314562083979967]|[0.759438830350663,0.24056116964933716]|0.0       |
|0.0  |(7,[0,1,4],[1.0,34629.16,38735.74])  |[0.920516837987315,0.17737734588834053]  |[0.6776819948854382,0.3223180051145617]|0.0       |
|0.0  |(7,[0,1,4],[1.0,1936

### Naive Bayes (NB)

In [50]:
# Treinamento do modelo de predição
start_time =  time.time()
trainer = NaiveBayes(smoothing=1.0, modelType='multinomial')
model_nb = trainer.fit(train)
time_nb_train = time.time() - start_time

In [51]:
# Execução do modelo de predição na base de teste
start_time =  time.time()
result_nb = model_nb.transform(test)
time_nb_pred = time.time() - start_time

In [52]:
# Cálculo da acurácia do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='accuracy')
accuracy_nb = evaluator.evaluate(result_nb) * 100

In [53]:
# Cálculo do recall do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedRecall')
recall_nb = evaluator.evaluate(result_nb) * 100

In [54]:
# Cálculo da precisão do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedPrecision')
precision_nb = evaluator.evaluate(result_nb) * 100

In [55]:
# Cálculo da F1 score do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='f1')
f1_nb = evaluator.evaluate(result_nb) * 100

In [56]:
# Matriz de confusão
y_true = result_nb.select("label").toPandas()
y_pred = result_nb.select("prediction").toPandas()
mc_nb = confusion_matrix(y_true, y_pred)
tn_nb, fp_nb, fn_nb, tp_nb = confusion_matrix(y_true, y_pred).ravel()
print(mc_nb)

[[1549  891]
 [ 301 2126]]


In [57]:
# Exibição dos resultados
evaluator_nb = spark.createDataFrame(
    [(round(accuracy_nb,2), round(recall_nb,2), round(precision_nb,2), round(f1_nb,2),\
      int(fp_nb), int(fn_nb),\
      round(time_nb_train,2), round(time_nb_pred,2))],\
    ['acurácia','recall','precisão','f1 score',\
     'falso positivo', 'falso negativo',\
     'tempo treinamento','tempo predição'])
print("Resultados do modelo Naive Bayes:")
evaluator_nb.show()

Resultados do modelo Naive Bayes:
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|acurácia|recall|precisão|f1 score|falso positivo|falso negativo|tempo treinamento|tempo predição|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|   75.51| 75.51|   77.12|   75.15|           891|           301|             6.51|          0.06|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+



In [58]:
result_nb.show(truncate = False, n = 5)

+-----+-------------------------------------+-----------------------------------------+-----------+----------+
|label|features                             |rawPrediction                            |probability|prediction|
+-----+-------------------------------------+-----------------------------------------+-----------+----------+
|0.0  |(7,[0,1,4],[1.0,10782.94,100585.0])  |[-185597.63318901055,-241851.82351438346]|[1.0,0.0]  |0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[-551910.7905068266,-809608.9740410324]  |[1.0,0.0]  |0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[-551910.7905068266,-809608.9740410324]  |[1.0,0.0]  |0.0       |
|0.0  |(7,[0,1,4],[1.0,34629.16,38735.74])  |[-184955.59537742924,-131224.79568210608]|[0.0,1.0]  |1.0       |
|0.0  |(7,[0,1,4],[1.0,193605.38,249452.05])|[-1081538.196092862,-808275.1143704863]  |[0.0,1.0]  |1.0       |
+-----+-------------------------------------+-----------------------------------------+-----------+----------+
o

### Logistic Regression (LR)

In [59]:
# Treinamento do modelo de predição
start_time =  time.time()
trainer = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
model_lr = trainer.fit(train)
time_lr_train = time.time() - start_time

In [60]:
# Execução do modelo de predição na base de teste
start_time =  time.time()
result_lr = model_lr.transform(test)
time_lr_pred = time.time() - start_time

In [61]:
# Cálculo da acurácia do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='accuracy')
accuracy_lr = evaluator.evaluate(result_lr) * 100

In [62]:
# Cálculo do recall do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedRecall')
recall_lr = evaluator.evaluate(result_lr) * 100

In [63]:
# Cálculo da precisão do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedPrecision')
precision_lr = evaluator.evaluate(result_lr) * 100

In [64]:
# Cálculo da F1 score do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='f1')
f1_lr = evaluator.evaluate(result_lr) * 100

In [65]:
# Matriz de confusão
y_true = result_lr.select("label").toPandas()
y_pred = result_lr.select("prediction").toPandas()
mc_lr = confusion_matrix(y_true, y_pred)
tn_lr, fp_lr, fn_lr, tp_lr = confusion_matrix(y_true, y_pred).ravel()
print(mc_lr)

[[2438    2]
 [  64 2363]]


In [66]:
# Exibição dos resultados
evaluator_lr = spark.createDataFrame(
    [(round(accuracy_lr,2), round(recall_lr,2), round(precision_lr,2), round(f1_lr,2),\
      int(fp_lr), int(fn_lr),\
      round(time_lr_train,2), round(time_lr_pred,2))],\
    ['acurácia','recall','precisão','f1 score',\
     'falso positivo', 'falso negativo',\
     'tempo treinamento','tempo predição'])
print("Resultados do modelo Logistic Regression:")
evaluator_lr.show()

Resultados do modelo Logistic Regression:
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|acurácia|recall|precisão|f1 score|falso positivo|falso negativo|tempo treinamento|tempo predição|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|   98.64| 98.64|   98.68|   98.64|             2|            64|              9.1|          0.03|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+



In [67]:
result_lr.show(truncate = False, n = 5)

+-----+-------------------------------------+----------------------------------------+----------------------------------------+----------+
|label|features                             |rawPrediction                           |probability                             |prediction|
+-----+-------------------------------------+----------------------------------------+----------------------------------------+----------+
|0.0  |(7,[0,1,4],[1.0,10782.94,100585.0])  |[1.5862996712454818,-1.5862996712454818]|[0.8300948556474514,0.16990514435254858]|0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[1.4886690792839434,-1.4886690792839434]|[0.815878424625442,0.18412157537455792] |0.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[1.4886690792839434,-1.4886690792839434]|[0.815878424625442,0.18412157537455792] |0.0       |
|0.0  |(7,[0,1,4],[1.0,34629.16,38735.74])  |[1.5487225374262987,-1.5487225374262987]|[0.8247291499651642,0.1752708500348357] |0.0       |
|0.0  |(7,[0,1,4],[1.0,1936

## Suport Vector Machines (SVM)

In [68]:
# Treinamento do modelo de predição
start_time =  time.time()
trainer = LinearSVC(featuresCol='features', labelCol='label',\
                    maxIter=100, regParam=0.1)
model_svm = trainer.fit(train)
time_svm_train = time.time() - start_time

In [69]:
# Execução do modelo de predição na base de teste
start_time =  time.time()
result_svm = model_svm.transform(test)
time_svm_pred = time.time() - start_time

In [70]:
# Cálculo da acurácia do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='accuracy')
accuracy_svm = evaluator.evaluate(result_svm) * 100

In [71]:
# Cálculo do recall do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedRecall')
recall_svm = evaluator.evaluate(result_svm) * 100

In [72]:
# Cálculo da precisão do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='weightedPrecision')
precision_svm = evaluator.evaluate(result_svm) * 100

In [73]:
# Cálculo da F1 score do modelo de predição
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction',\
            metricName='f1')
f1_svm = evaluator.evaluate(result_svm) * 100

In [74]:
# Matriz de confusão
y_true = result_svm.select("label").toPandas()
y_pred = result_svm.select("prediction").toPandas()
mc_svm = confusion_matrix(y_true, y_pred)
tn_svm, fp_svm, fn_svm, tp_svm = confusion_matrix(y_true, y_pred).ravel()
print(mc_svm)

[[2211  229]
 [ 119 2308]]


In [75]:
# Exibição dos resultados
evaluator_svm = spark.createDataFrame(
    [(round(accuracy_svm,2), round(recall_svm,2), round(precision_svm,2), round(f1_svm,2),\
      int(fp_svm), int(fn_svm),\
      round(time_svm_train,2), round(time_svm_pred,2))],\
    ['acurácia','recall','precisão','f1 score',\
     'falso positivo', 'falso negativo',\
     'tempo treinamento','tempo predição'])
print("Resultados do modelo Suport Vector Machines:")
evaluator_svm.show()

Resultados do modelo Suport Vector Machines:
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|acurácia|recall|precisão|f1 score|falso positivo|falso negativo|tempo treinamento|tempo predição|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+
|   92.85| 92.85|   92.94|   92.85|           229|           119|           729.52|          0.08|
+--------+------+--------+--------+--------------+--------------+-----------------+--------------+



In [76]:
result_svm.show(truncate = False, n = 5)

+-----+-------------------------------------+------------------------------------------+----------+
|label|features                             |rawPrediction                             |prediction|
+-----+-------------------------------------+------------------------------------------+----------+
|0.0  |(7,[0,1,4],[1.0,10782.94,100585.0])  |[-0.20519130192207402,0.20519130192207402]|1.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[-0.1943811660229432,0.1943811660229432]  |1.0       |
|0.0  |(7,[0,1,4],[1.0,12401.9,349763.0])   |[-0.1943811660229432,0.1943811660229432]  |1.0       |
|0.0  |(7,[0,1,4],[1.0,34629.16,38735.74])  |[-0.21240636640930594,0.21240636640930594]|1.0       |
|0.0  |(7,[0,1,4],[1.0,193605.38,249452.05])|[-0.2327208356740759,0.2327208356740759]  |1.0       |
+-----+-------------------------------------+------------------------------------------+----------+
only showing top 5 rows



## Resultados

In [77]:
models = ['Decision Tree','Random Forest','Neural Network Perceptron','Naive Bayes','Logistic Regression','Suport Vector Machines']

### Accuracy

In [78]:
print('Ranking Accuracy %')

list = ((models[0],accuracy_dt),\
        (models[1],accuracy_rf),\
        (models[2],accuracy_nnp),\
        (models[3],accuracy_nb),\
        (models[4],accuracy_lr),\
        (models[5],accuracy_nb))
df_acuracia = spark.createDataFrame(list, ['Modelo', 'Acuracia'])
df_acuracia.sort(df_acuracia.Acuracia.desc()).show(truncate = False)


Ranking Accuracy %
+-------------------------+-----------------+
|Modelo                   |Acuracia         |
+-------------------------+-----------------+
|Random Forest            |99.71234846928293|
|Decision Tree            |99.69180193137457|
|Logistic Regression      |98.64392849804808|
|Neural Network Perceptron|87.98027532360797|
|Naive Bayes              |75.50852681323197|
|Suport Vector Machines   |75.50852681323197|
+-------------------------+-----------------+



### Recall

In [79]:
print('Ranking')

list = ((models[0],recall_dt),\
        (models[1],recall_rf),\
        (models[2],recall_nnp),\
        (models[3],recall_nb),\
        (models[4],recall_lr),\
        (models[5],recall_svm))
df_recall = spark.createDataFrame(list, ['Modelo', 'Recall'])
df_recall.sort(df_recall.Recall.desc()).show(truncate = False)

Ranking
+-------------------------+-----------------+
|Modelo                   |Recall           |
+-------------------------+-----------------+
|Random Forest            |99.71234846928292|
|Decision Tree            |99.69180193137457|
|Logistic Regression      |98.64392849804808|
|Suport Vector Machines   |92.84980480788987|
|Neural Network Perceptron|87.98027532360797|
|Naive Bayes              |75.50852681323197|
+-------------------------+-----------------+



### Precision

In [80]:
print('Ranking Precision %')

list = ((models[0],precision_dt),\
        (models[1],precision_rf),\
        (models[2],precision_nnp),\
        (models[3],precision_nb),\
        (models[4],precision_lr),\
        (models[5],precision_svm))
df_precision = spark.createDataFrame(list, ['Modelo', 'Precisao'])
df_precision.sort(df_precision.Precisao.desc()).show(truncate = False)

Ranking Precision %
+-------------------------+-----------------+
|Modelo                   |Precisao         |
+-------------------------+-----------------+
|Random Forest            |99.71398951306045|
|Decision Tree            |99.69368501529692|
|Logistic Regression      |98.67543661321872|
|Suport Vector Machines   |92.93838143225915|
|Neural Network Perceptron|89.59701340348573|
|Naive Bayes              |77.11625293876516|
+-------------------------+-----------------+



### F1 score

In [81]:
list = ((models[0],f1_dt),\
        (models[1],f1_rf),\
        (models[2],f1_nnp),\
        (models[3],f1_nb),\
        (models[4],f1_lr),\
        (models[5],f1_svm))
df_f1 = spark.createDataFrame(list, ['Modelo', 'F1'])
df_f1.sort(df_f1.F1.desc()).show(truncate = False)

+-------------------------+-----------------+
|Modelo                   |F1               |
+-------------------------+-----------------+
|Random Forest            |99.71234387889739|
|Decision Tree            |99.69179646661719|
|Logistic Regression      |98.64366223187787|
|Suport Vector Machines   |92.8465827587081 |
|Neural Network Perceptron|87.86002379155052|
|Naive Bayes              |75.15152754554644|
+-------------------------+-----------------+



### Tempo de Treinamento

In [82]:
list = ((models[0],time_dt_train),\
        (models[1],time_rf_train),\
        (models[2],time_nnp_train),\
        (models[3],time_nb_train),\
        (models[4],time_lr_train),\
        (models[5],time_svm_train))
df_time_train = spark.createDataFrame(list, ['Modelo', 'Tempo_Treinamento'])
df_time_train.sort(df_time_train.Tempo_Treinamento.asc()).show(truncate = False)

+-------------------------+------------------+
|Modelo                   |Tempo_Treinamento |
+-------------------------+------------------+
|Naive Bayes              |6.50537896156311  |
|Logistic Regression      |9.096431016921997 |
|Decision Tree            |9.177486658096313 |
|Random Forest            |9.961931467056274 |
|Neural Network Perceptron|18.005624532699585|
|Suport Vector Machines   |729.5157809257507 |
+-------------------------+------------------+



### Tempo de Predição

In [83]:
list = ((models[0],time_dt_pred),\
        (models[1],time_rf_pred),\
        (models[2],time_nnp_pred),\
        (models[3],time_nnp_pred),\
        (models[4],time_nnp_pred),\
        (models[5],time_nnp_pred))
df_time_pred = spark.createDataFrame(list, ['Modelo', 'Tempo_Predicao'])
df_time_pred.sort(df_time_pred.Tempo_Predicao.asc()).show(truncate = False)

+-------------------------+-------------------+
|Modelo                   |Tempo_Predicao     |
+-------------------------+-------------------+
|Neural Network Perceptron|0.07020425796508789|
|Naive Bayes              |0.07020425796508789|
|Logistic Regression      |0.07020425796508789|
|Suport Vector Machines   |0.07020425796508789|
|Decision Tree            |0.0850825309753418 |
|Random Forest            |0.08755016326904297|
+-------------------------+-------------------+



### Falso Positivo

In [84]:
list = ((models[0],int(fp_dt)),\
        (models[1],int(fp_rf)),\
        (models[2],int(fp_nnp)),\
        (models[3],int(fp_nb)),\
        (models[4],int(fp_lr)),\
        (models[5],int(fp_svm)))
df_fp = spark.createDataFrame(list, ['Modelo', 'Falso_Positivo'])
df_fp.sort(df_fp.Falso_Positivo.asc()).show(truncate = False)

+-------------------------+--------------+
|Modelo                   |Falso_Positivo|
+-------------------------+--------------+
|Random Forest            |0             |
|Decision Tree            |0             |
|Logistic Regression      |2             |
|Suport Vector Machines   |229           |
|Neural Network Perceptron|538           |
|Naive Bayes              |891           |
+-------------------------+--------------+



### Falso Negativo

In [85]:
list = ((models[0],int(fn_dt)),\
        (models[1],int(fn_rf)),\
        (models[2],int(fn_nnp)),\
        (models[3],int(fn_nb)),\
        (models[4],int(fn_lr)),\
        (models[5], int(fn_svm)))
df_fn = spark.createDataFrame(list, ['Modelo', 'Falso_Negativo'])
df_fn.sort(df_fn.Falso_Negativo.asc()).show(truncate = False)

+-------------------------+--------------+
|Modelo                   |Falso_Negativo|
+-------------------------+--------------+
|Random Forest            |14            |
|Decision Tree            |15            |
|Neural Network Perceptron|47            |
|Logistic Regression      |64            |
|Suport Vector Machines   |119           |
|Naive Bayes              |301           |
+-------------------------+--------------+



### Comparativo

In [86]:
df = df_acuracia.join(df_fp, "Modelo")
df = df.join(df_fn, "Modelo")
df = df.join(df_time_train, "Modelo")
df = df.join(df_time_pred, "Modelo")
df.sort(df.Acuracia.desc()).show(truncate = False)

+-------------------------+-----------------+--------------+--------------+------------------+-------------------+
|Modelo                   |Acuracia         |Falso_Positivo|Falso_Negativo|Tempo_Treinamento |Tempo_Predicao     |
+-------------------------+-----------------+--------------+--------------+------------------+-------------------+
|Random Forest            |99.71234846928293|0             |14            |9.961931467056274 |0.08755016326904297|
|Decision Tree            |99.69180193137457|0             |15            |9.177486658096313 |0.0850825309753418 |
|Logistic Regression      |98.64392849804808|2             |64            |9.096431016921997 |0.07020425796508789|
|Neural Network Perceptron|87.98027532360797|538           |47            |18.005624532699585|0.07020425796508789|
|Naive Bayes              |75.50852681323197|891           |301           |6.50537896156311  |0.07020425796508789|
|Suport Vector Machines   |75.50852681323197|229           |119           |729.5

## Teste de classificação do melhor modelo

In [87]:
df_selecionado.show(1)

+----+-------+-------------+--------------+--------------+--------------+---------+-----+
|step| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|indexType|label|
+----+-------+-------------+--------------+--------------+--------------+---------+-----+
|   1|9839.64|     170136.0|     160296.36|           0.0|           0.0|      2.0|  0.0|
+----+-------+-------------+--------------+--------------+--------------+---------+-----+
only showing top 1 row



In [88]:
#valores para teste
entradas = [1.0, 9839.64,170136.0, 160296.36,0.0,0.0,2.0]
resultado = 0.0

In [89]:
#criando o dataframe Spark para execução do modelo
df_teste = spark.createDataFrame(
    [(resultado,\
      Vectors.dense([entradas[0],entradas[1],entradas[2],entradas[3],entradas[4],entradas[5],entradas[6]]))],
    ['label', 'features'])

In [90]:
#resultado da predição utilizando o modelo DT
resultado = model_dt.transform(df_teste)
resultado.select('label','prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
+-----+----------+



In [110]:
spark.stop()