## Classificar clientes de acordo com a possibilidade de pagar ou não o crédito

Exemplo de utilização do algoritmo RandomForest da biblioteca Spark MLLib


Apache Spark 2.4.2

In [1]:
import math
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
# Spark session- para se trabalhar com DataFrame do Spark
spSession = SparkSession.builder.master("local").appName("DSA-SparkMLLib").getOrCreate()

In [3]:
#carregamento do dataset para o formato RDD
bankRDD = sc.textFile("data/bank.csv")

In [4]:
#colocar dados na cache
bankRDD.cache()

data/bank.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
#número de exemplos + cabeçalho
bankRDD.count()

542

In [6]:
bankRDD.take(5)

['"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"',
 '30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"',
 '33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"yes"',
 '35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"yes"',
 '30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"yes"']

## Limpeza dos dados

In [7]:
#remover o cabeçalho
bankRDD2 = bankRDD.filter(lambda line: "balance" not in line)
bankRDD2.count()

541

In [8]:
bankRDD2.take(5)

['30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"',
 '33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"yes"',
 '35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"yes"',
 '30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"yes"',
 '59;"blue-collar";"married";"secondary";"no";0;"yes";"no";"unknown";5;"may";226;1;-1;0;"unknown";"no"']

In [9]:
#Transformar os dados para valores numéricos
def transform_to_numeric(line):
    #remove "" (aspas) e faz a quebra por ";"
    attList = line.replace("\"","").split(";")
    
    age = float(attList[0])
    #marital
    single = 1.0 if attList[2] == "single" else 0.0
    married = 1.0 if attList[2] == "married" else 0.0
    divorced = 1.0 if attList[2] == "divorced" else 0.0
    #education
    primary = 1.0 if attList[3] == "primary" else 0.0
    secondary = 1.0 if attList[3] == "secondary" else 0.0
    tertiary = 1.0 if attList[3] == "tertiary" else 0.0
    #outros
    default = 0.0 if attList[4] == "no" else 1.0
    balance = float(attList[5])
    housing_loan = 0.0 if attList[6] == "no" else 1.0
    loan = 0.0 if attList[7] == "no" else 1.0
    outcome = 0.0 if attList[16] == "no" else 1.0
    
    #linhas com os objetos transformados que farão parte da análise
    linhas = Row(OUTCOME = outcome, AGE = age, SINGLE = single, MARRIED = married, DIVORCED = divorced,
                 PRIMARY = primary, SECONDARY = secondary, TERTIARY = tertiary, DEFAULT = default, 
                 BALANCE = balance, LOAN = loan, HOUSING_LOAN = housing_loan)
    
    return linhas
    

In [10]:
#aplicando a função de limpeza ao conjunto de dados
bankRDD3 = bankRDD2.map(transform_to_numeric)
bankRDD3.take(5)

[Row(AGE=30.0, BALANCE=1787.0, DEFAULT=0.0, DIVORCED=0.0, HOUSING_LOAN=0.0, LOAN=0.0, MARRIED=1.0, OUTCOME=0.0, PRIMARY=1.0, SECONDARY=0.0, SINGLE=0.0, TERTIARY=0.0),
 Row(AGE=33.0, BALANCE=4789.0, DEFAULT=0.0, DIVORCED=0.0, HOUSING_LOAN=1.0, LOAN=1.0, MARRIED=1.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=1.0, SINGLE=0.0, TERTIARY=0.0),
 Row(AGE=35.0, BALANCE=1350.0, DEFAULT=0.0, DIVORCED=0.0, HOUSING_LOAN=1.0, LOAN=0.0, MARRIED=0.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=0.0, SINGLE=1.0, TERTIARY=1.0),
 Row(AGE=30.0, BALANCE=1476.0, DEFAULT=0.0, DIVORCED=0.0, HOUSING_LOAN=1.0, LOAN=1.0, MARRIED=1.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=0.0, SINGLE=0.0, TERTIARY=1.0),
 Row(AGE=59.0, BALANCE=0.0, DEFAULT=0.0, DIVORCED=0.0, HOUSING_LOAN=1.0, LOAN=0.0, MARRIED=1.0, OUTCOME=0.0, PRIMARY=0.0, SECONDARY=1.0, SINGLE=0.0, TERTIARY=0.0)]

## Análise exploratória de dados

In [11]:
#converter para Dataframe
bankDF = spSession.createDataFrame(bankRDD3)

In [12]:
type(bankDF)

pyspark.sql.dataframe.DataFrame

In [13]:
#estatística descritiva
bankDF.describe().show()

+-------+------------------+------------------+--------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+
|summary|               AGE|           BALANCE|             DEFAULT|           DIVORCED|      HOUSING_LOAN|               LOAN|           MARRIED|            OUTCOME|           PRIMARY|         SECONDARY|            SINGLE|          TERTIARY|
+-------+------------------+------------------+--------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+
|  count|               541|               541|                 541|                541|               541|                541|               541|                541|               541|               541|               541|               541|
|   mean| 41.26987060998152|

In [14]:
#correlação entre as variáveis
for i in bankDF.columns:
    if not(isinstance(bankDF.select(i).take(1)[0][0], str)):
        print("correlação da variável OUTCOME com", i, bankDF.stat.corr('OUTCOME', i))

correlação da variável OUTCOME com AGE -0.1823210432736525
correlação da variável OUTCOME com BALANCE 0.036574866119976804
correlação da variável OUTCOME com DEFAULT -0.04536965206737378
correlação da variável OUTCOME com DIVORCED -0.07812659940926987
correlação da variável OUTCOME com HOUSING_LOAN 0.05032284653513401
correlação da variável OUTCOME com LOAN -0.030420586112717318
correlação da variável OUTCOME com MARRIED -0.3753241299133561
correlação da variável OUTCOME com OUTCOME 1.0
correlação da variável OUTCOME com PRIMARY -0.12561548832677982
correlação da variável OUTCOME com SECONDARY 0.026392774894072973
correlação da variável OUTCOME com SINGLE 0.46323284934360515
correlação da variável OUTCOME com TERTIARY 0.08494840766635618


## Pré-processamento dos dados

In [15]:
#criação do LabeledPoint (target, Vector[features]) - formato requerido pelo Spark
def transform_var(row):
    obj = (row["OUTCOME"], Vectors.dense([row["AGE"], row["BALANCE"], row["DEFAULT"], row["DIVORCED"], 
                                          row["HOUSING_LOAN"], row["LOAN"], row["MARRIED"], row["PRIMARY"], 
                                          row["SECONDARY"], row["SINGLE"], row["TERTIARY"]]))
    
    return obj

In [16]:
#converte o DataFrame em RDD novamente para poder executar a função
bankRDD4 = bankDF.rdd.map(transform_var)

In [17]:
#bankRDD4.collect()
bankRDD4.take(5)

[(0.0,
  DenseVector([30.0, 1787.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0])),
 (1.0,
  DenseVector([33.0, 4789.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (1.0,
  DenseVector([35.0, 1350.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0])),
 (1.0,
  DenseVector([30.0, 1476.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 (0.0, DenseVector([59.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]))]

In [18]:
#converte para DataFrame
bankDF_1 = spSession.createDataFrame(bankRDD4, ["label", "features"])
bankDF_1.select("features", "label").show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[30.0,1787.0,0.0,...|  0.0|
|[33.0,4789.0,0.0,...|  1.0|
|[35.0,1350.0,0.0,...|  1.0|
|[30.0,1476.0,0.0,...|  1.0|
|[59.0,0.0,0.0,0.0...|  0.0|
|[35.0,747.0,0.0,0...|  1.0|
|[36.0,307.0,0.0,0...|  1.0|
|[39.0,147.0,0.0,0...|  0.0|
|[41.0,221.0,0.0,0...|  0.0|
|[43.0,-88.0,0.0,0...|  1.0|
+--------------------+-----+
only showing top 10 rows



In [19]:
#duplicação do DateFrame para aplicar um passo diferente e realizar a comparação
bankDF_2 = bankDF_1

In [20]:
#Indexação é pré-requisito para Decision Trees
stringIndexer = StringIndexer(inputCol = "label", outputCol = "indexed")
si_model = stringIndexer.fit(bankDF_1)
obj_comum = si_model.transform(bankDF_1)
#obj_comum.collect()
obj_comum.take(5)

[Row(label=0.0, features=DenseVector([30.0, 1787.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0]), indexed=0.0),
 Row(label=1.0, features=DenseVector([33.0, 4789.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([35.0, 1350.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([30.0, 1476.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0]), indexed=1.0),
 Row(label=0.0, features=DenseVector([59.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]), indexed=0.0)]

In [21]:
#Vamos comparar o resultado do algoritmo RandomForest com e sem PCA, por isso, vamos criar uma versão do dataset
#usando PCA com 3 componentes
bankPCA = PCA(k = 3, inputCol = "features", outputCol = "pcaFeatures")
pcaModel = bankPCA.fit(bankDF_2)
pcaResult = pcaModel.transform(bankDF_2).select("label", "pcaFeatures")
pcaResult.show(truncate = False)

+-----+-------------------------------------------------------------+
|label|pcaFeatures                                                  |
+-----+-------------------------------------------------------------+
|0.0  |[-1787.018896688212,28.860399462744468,-0.1280147985324101]  |
|1.0  |[-4789.020151899341,29.910671811919972,-1.1873535094324654]  |
|1.0  |[-1350.0221889042236,34.089813318413796,0.7195970243045985]  |
|1.0  |[-1476.0189274234879,29.0402338703401,0.16333904159114176]   |
|0.0  |[-0.03786531116986686,58.977608630868865,-0.9375316432593557]|
|1.0  |[-747.02233755074,34.48658287707456,0.8977247669235038]      |
|1.0  |[-307.02304514047654,35.78871235524398,0.28043105820705017]  |
|0.0  |[-147.0249882455921,38.88983490904783,-1.0046260851665119]   |
|0.0  |[-221.02627459762581,40.842288730459394,0.29813543281478216] |
|1.0  |[87.97241072606795,43.051283841804064,-0.2932102577592304]   |
|0.0  |[-9374.023079005105,32.96339467956594,-1.1790049813332166]   |
|0.0  |[-264.0275333

In [22]:
#é necessário fazer a Indexação nesse conjunto também
stringIndexer = StringIndexer(inputCol = "label", outputCol = "indexed")
si_model = stringIndexer.fit(pcaResult)
obj_pca = si_model.transform(pcaResult)
#obj_pca.collect()
obj_pca.take(5)

[Row(label=0.0, pcaFeatures=DenseVector([-1787.0189, 28.8604, -0.128]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([-4789.0202, 29.9107, -1.1874]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-1350.0222, 34.0898, 0.7196]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-1476.0189, 29.0402, 0.1633]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-0.0379, 58.9776, -0.9375]), indexed=0.0)]

## Machine Learning

In [23]:
#Dados de treino e teste para versão sem PCA
(dados_treino_comum, dados_teste_comum) = obj_comum.randomSplit([0.7, 0.3])

In [24]:
dados_treino_comum.count()

394

In [25]:
dados_teste_comum.count()

147

In [26]:
#Criando o modelo
rfClassifer = RandomForestClassifier(labelCol = "indexed", featuresCol = "features")
modelo_comum = rfClassifer.fit(dados_treino_comum)

In [27]:
# Previsões com dados de teste
predictions_comum = modelo_comum.transform(dados_teste_comum)
predictions_comum.select("prediction", "indexed", "label", "features").take(10)

[Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([27.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([31.0, 171.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([32.0, 217.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([32.0, 948.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([32.0, 1031.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([32.0, 2349.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([32.0, 2693.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 Row(prediction=0.0, indexed=0.0, label=0.0, features=DenseVector([33.0, -988.0, 0.0, 0.0, 1.0, 1.0, 1.

In [28]:
# Avaliando a acurácia
evaluator_comum = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "indexed", metricName = "accuracy")
evaluator_comum.evaluate(predictions_comum)  

0.7687074829931972

In [29]:
# Confusion Matrix
predictions_comum.groupBy("indexed", "prediction").count().show()

+-------+----------+-----+
|indexed|prediction|count|
+-------+----------+-----+
|    1.0|       1.0|   31|
|    0.0|       1.0|    4|
|    1.0|       0.0|   30|
|    0.0|       0.0|   82|
+-------+----------+-----+



## Machine Learning com PCA

In [30]:
# Dados de Treino e de Teste
(dados_treino_pca, dados_teste_pca) = obj_pca.randomSplit([0.7, 0.3])

In [31]:
dados_treino_pca.count()

402

In [32]:
dados_teste_pca.count()

139

In [33]:
rfClassifer_pca = RandomForestClassifier(labelCol = "indexed", featuresCol = "pcaFeatures")
modelo_pca = rfClassifer_pca.fit(dados_treino_pca)

In [34]:
# Previsões com dados de teste
predictions_pca = modelo_pca.transform(dados_teste_pca)
predictions_pca.select("prediction", "indexed", "label", "pcaFeatures").take(10)

[Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-5426.0252, 37.4992, 0.1998])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-4073.0351, 53.372, -0.8574])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-4030.0229, 34.3965, -1.0844])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-4012.0313, 47.4138, 0.4222])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-3762.0275, 41.5764, 0.4076])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-2843.0225, 34.1634, -0.5605])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-2693.02, 30.2569, -1.0753])),
 Row(prediction=1.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-2349.0201, 30.4879, 0.3957])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1981.0227, 34.7223, -0.8822])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1972.0214

In [35]:
# Avaliando a acurácia
evaluator_pca = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "indexed", metricName = "accuracy")
evaluator_pca.evaluate(predictions_pca)      

0.7122302158273381

In [36]:
# Confusion Matrix
predictions_pca.groupBy("indexed", "prediction").count().show()

+-------+----------+-----+
|indexed|prediction|count|
+-------+----------+-----+
|    1.0|       1.0|   26|
|    0.0|       1.0|    8|
|    1.0|       0.0|   32|
|    0.0|       0.0|   73|
+-------+----------+-----+



In [37]:
#comapração dos resultados

print("Acurácia do modelo sem PCA:", evaluator_comum.evaluate(predictions_comum))
print("Acurácia do modelo com PCA:", evaluator_pca.evaluate(predictions_pca))

Acurácia do modelo sem PCA: 0.7687074829931972
Acurácia do modelo com PCA: 0.7122302158273381


A aplicação do algoritmo PCA para converter as features em 3 componentes apresentou resultados inferiores à utilização de todas as 11 features selecionadas. Talvez, a utilização de apenas 3 componentes não tenha sido suficiente.