# <font color='blue'>Classificação Random Forest</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

# <font color='blue'>Capítulo 11</font>

### *********** Atenção: *********** 
Utilize Java JDK 1.8 ou 11 e Apache Spark 2.4.2

****** Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook ******

## <font color='blue'>Spark MLLib - Classificação - Random Forest</font>

<strong> Descrição </strong>
<ul style="list-style-type:square">
  <li>Um dos algoritmos mais populares.</li>
  <li>É um algoritmo de Método Ensemble.</li>
  <li>Um modelo de Random Forest constrói diversos modelos e cada modelo é usado para prever resultados de forma individual. Uma votação é feita pelo Random Forest paar escolher o melhor modelo.</li>
</ul>

<dl>
  <dt>Vantagens</dt>
  <dd>- Normalmente oferece boa acurácia</dd>
  <dd>- Eficiente com muitas variáveis preditoras</dd>
  <dd>- Funciona muito bem de forma paralelizada</dd>
  <dd>- Excelente com valores missing</dd>
  <br />
  <dt>Desvantagens</dt>
  <dd>- Mais lento</dd>
  <dd>- Bias podem ocorrer com frequência</dd>
  <br />
  <dt>Aplicação</dt>
  <dd>- Pesquisa científica</dd>
  <dd>- Diagnóstico médico</dd>
</dl>

# Aprendizado com base em **Perfis** de clientes passados 

## Classificar clientes de acordo com a possibilidade de pagar ou não o crédito

---

In [1]:
# Imports
import math
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("DSA-SparkMLLib").getOrCreate()

In [3]:
# Carregando os dados e gerando um RDD
bankRDD = sc.textFile("data/bank.csv")

In [4]:
bankRDD.cache()

data/bank.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
bankRDD.count()

542

In [6]:
bankRDD.take(5)

['"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"',
 '30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"',
 '33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"yes"',
 '35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"yes"',
 '30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"yes"']

In [7]:
# Removendo a primeira linha do arquivo (cabeçalho)
firstLine = bankRDD.first()
bankRDD2 = bankRDD.filter(lambda x: x != firstLine)
bankRDD2.count()

541

---

## Limpeza dos Dados

## Transformando dados para valores numéricos, de-para

In [8]:
# Transformando os dados para valores numéricos
def transformToNumeric( inputStr) :
    
    attList = inputStr.replace("\"","").split(";")
    
# Transformando String em representação numérica
    age = float(attList[0]) 
    outcome = 0.0 if attList[16] == "no" else 1.0 # outcome reberá 0 se o valor do atributo de Index 16 for "no", senão 1.0
    single = 1.0 if attList[2] == "single" else 0.0 # single receberá 1 se o valor do atributo de Index 2 for "single", senão 0
    married = 1.0 if attList[2] == "married" else 0.0 # married receberá 1 se o valor do atributo de Index 2 for "married" senão 0
    divorced = 1.0 if attList[2] == "divorced" else 0.0
    primary = 1.0 if attList[3] == "primary" else 0.0
    secondary = 1.0 if attList[3] == "secondary" else 0.0
    tertiary = 1.0 if attList[3] == "tertiary" else 0.0
    default = 0.0 if attList[4] == "no" else 1.0
    balance = float(attList[5])
    loan = 0.0 if attList[7] == "no" else 1.0

## Função row( ) p/ criar dataset de maneira organizada c/ a representação numérica


In [None]:
 # Cria as linhas com os objetos transformados e organizados
    linhas = Row(OUTCOME = outcome, 
                 AGE = age, SINGLE = single, 
                 MARRIED = married,
                 DIVORCED = divorced,
                 PRIMARY = primary, 
                 SECONDARY = secondary, 
                 TERTIARY = tertiary, 
                 DEFAULT = default, 
                 BALANCE = balance,
                 LOAN = loan) 
    
    return linhas

## Aplicar função ao RDD

In [9]:
# Aplicando a função de limpeza ao conjunto de dados
bankRDD3 = bankRDD2.map(transformToNumeric)
bankRDD3.collect()[:15]  # visualizando alguns registros do dataset

[Row(AGE=30.0, BALANCE=1787.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=0.0, MARRIED=1.0, OUTCOME=0.0, PRIMARY=1.0, SECONDARY=0.0, SINGLE=0.0, TERTIARY=0.0),
 Row(AGE=33.0, BALANCE=4789.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=1.0, MARRIED=1.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=1.0, SINGLE=0.0, TERTIARY=0.0),
 Row(AGE=35.0, BALANCE=1350.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=0.0, MARRIED=0.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=0.0, SINGLE=1.0, TERTIARY=1.0),
 Row(AGE=30.0, BALANCE=1476.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=1.0, MARRIED=1.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=0.0, SINGLE=0.0, TERTIARY=1.0),
 Row(AGE=59.0, BALANCE=0.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=0.0, MARRIED=1.0, OUTCOME=0.0, PRIMARY=0.0, SECONDARY=1.0, SINGLE=0.0, TERTIARY=0.0),
 Row(AGE=35.0, BALANCE=747.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=0.0, MARRIED=0.0, OUTCOME=1.0, PRIMARY=0.0, SECONDARY=0.0, SINGLE=1.0, TERTIARY=1.0),
 Row(AGE=36.0, BALANCE=307.0, DEFAULT=0.0, DIVORCED=0.0, LOAN=0.0, MARRIED=1.0, OUTCOME=1.0, PRIMARY=0.0, SECO

---

## Análise Exploratória de Dados

In [10]:
# Transforma para Dataframe
bankDF = spSession.createDataFrame(bankRDD3)

In [11]:
# Estatística descritiva
bankDF.describe().show()

+-------+------------------+------------------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+
|summary|               AGE|           BALANCE|             DEFAULT|           DIVORCED|               LOAN|           MARRIED|            OUTCOME|           PRIMARY|         SECONDARY|            SINGLE|          TERTIARY|
+-------+------------------+------------------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+
|  count|               541|               541|                 541|                541|                541|               541|                541|               541|               541|               541|               541|
|   mean| 41.26987060998152|1444.7818853974122|0.022181146025878003|0.10905730129390019|0.16266173752310

In [12]:
# Correlação entre as variáveis
for i in bankDF.columns:
    if not( isinstance(bankDF.select(i).take(1)[0][0], str)) :
        print( "Correlação da variável OUTCOME com", i, bankDF.stat.corr('OUTCOME',i))
        
# Variável idade tem correlação negativa como "outcome", ou seja, receber ou não o crédito. 

Correlação da variável OUTCOME com AGE -0.1823210432736525
Correlação da variável OUTCOME com BALANCE 0.036574866119976804
Correlação da variável OUTCOME com DEFAULT -0.04536965206737378
Correlação da variável OUTCOME com DIVORCED -0.07812659940926987
Correlação da variável OUTCOME com LOAN -0.030420586112717318
Correlação da variável OUTCOME com MARRIED -0.3753241299133561
Correlação da variável OUTCOME com OUTCOME 1.0
Correlação da variável OUTCOME com PRIMARY -0.12561548832677982
Correlação da variável OUTCOME com SECONDARY 0.026392774894072973
Correlação da variável OUTCOME com SINGLE 0.46323284934360515
Correlação da variável OUTCOME com TERTIARY 0.08494840766635618


---

## Pré-Processamento dos Dados p/ o Algoritmo - Matriz densa

In [13]:
# Criando um LabeledPoint (target, Vector[features])
def transformaVar(row) :
    obj = (row["OUTCOME"], # label
           Vectors.dense([row["AGE"],                      # preditora
                                          row["BALANCE"],  # preditora
                                          row["DEFAULT"],  # preditora
                                          row["DIVORCED"], # preditora
                                          row["LOAN"],     # preditora
                                          row["MARRIED"],  # preditora
                                          row["PRIMARY"],  # preditora
                                          row["SECONDARY"],# preditora
                                          row["SINGLE"],   # preditora
                                          row["TERTIARY"]])) # preditora
    return obj

## Aplicar função ao RDD

In [14]:
bankRDD4 = bankDF.rdd.map(transformaVar)

In [15]:
bankRDD4.collect()

[(0.0, DenseVector([30.0, 1787.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0])),
 (1.0, DenseVector([33.0, 4789.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (1.0, DenseVector([35.0, 1350.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0])),
 (1.0, DenseVector([30.0, 1476.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 (0.0, DenseVector([59.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (1.0, DenseVector([35.0, 747.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0])),
 (1.0, DenseVector([36.0, 307.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 (0.0, DenseVector([39.0, 147.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (0.0, DenseVector([41.0, 221.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 (1.0, DenseVector([43.0, -88.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0])),
 (0.0, DenseVector([39.0, 9374.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (0.0, DenseVector([43.0, 264.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (0.0, DenseVector([36.0, 1109.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 (1.0, D

In [16]:
bankDF = spSession.createDataFrame(bankRDD4,["label", "features"])
bankDF.select("features", "label").show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[30.0,1787.0,0.0,...|  0.0|
|[33.0,4789.0,0.0,...|  1.0|
|[35.0,1350.0,0.0,...|  1.0|
|[30.0,1476.0,0.0,...|  1.0|
|[59.0,0.0,0.0,0.0...|  0.0|
|[35.0,747.0,0.0,0...|  1.0|
|[36.0,307.0,0.0,0...|  1.0|
|[39.0,147.0,0.0,0...|  0.0|
|[41.0,221.0,0.0,0...|  0.0|
|[43.0,-88.0,0.0,0...|  1.0|
+--------------------+-----+
only showing top 10 rows



In [17]:
# Aplicando Redução de Dimensionalidade com PCA
bankPCA = PCA(k = 3, inputCol = "features", outputCol = "pcaFeatures")
pcaModel = bankPCA.fit(bankDF)
pcaResult = pcaModel.transform(bankDF).select("label","pcaFeatures")
pcaResult.show(truncate = False)

+-----+------------------------------------------------------------+
|label|pcaFeatures                                                 |
+-----+------------------------------------------------------------+
|0.0  |[-1787.018897197381,28.86209683775489,-0.06459982604876296] |
|1.0  |[-4789.020177138492,29.922562636340885,-0.9830243513096447] |
|1.0  |[-1350.022213163262,34.10110809796657,0.8951427168301616]   |
|1.0  |[-1476.0189517184556,29.051333993596376,0.3952723868021922] |
|0.0  |[-0.037889185366455545,58.9897182000177,-0.729079238366194] |
|1.0  |[-747.0223377634923,34.488291981817554,0.9045654956970024]  |
|1.0  |[-307.0230691022592,35.799850539655154,0.5170631523785959]  |
|0.0  |[-147.0250121617634,38.90107856650326,-0.8069627548799431]  |
|0.0  |[-221.0262985348787,40.853633675694894,0.53730363658032]    |
|1.0  |[87.9723868768871,43.06265944115107,-0.06701642871171626]   |
|0.0  |[-9374.023105550941,32.9764588379908,-0.9511484606914545]   |
|0.0  |[-264.02755731528384,42.824

## Redução de Dimensionalidade com PCA

In [18]:
# Indexação é pré-requisito para Decision Trees
stringIndexer = StringIndexer(inputCol = "label", outputCol = "indexed")
si_model = stringIndexer.fit(pcaResult)
obj_final = si_model.transform(pcaResult)
obj_final.collect()

[Row(label=0.0, pcaFeatures=DenseVector([-1787.0189, 28.8621, -0.0646]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([-4789.0202, 29.9226, -0.983]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-1350.0222, 34.1011, 0.8951]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-1476.019, 29.0513, 0.3953]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-0.0379, 58.9897, -0.7291]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([-747.0223, 34.4883, 0.9046]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-307.0231, 35.7999, 0.5171]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-147.025, 38.9011, -0.807]), indexed=0.0),
 Row(label=0.0, pcaFeatures=DenseVector([-221.0263, 40.8536, 0.5373]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([87.9724, 43.0627, -0.067]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-9374.0231, 32.9765, -0.9511]), indexed=0.0),
 Row(label=0.0, pcaFeatures=DenseVector([-264.0276, 42.8248, -0

## Machine Learning

In [19]:
# Dados de Treino e de Teste
(dados_treino, dados_teste) = obj_final.randomSplit([0.7, 0.3]) # Split randômico

In [20]:
dados_treino.count()

397

In [21]:
dados_teste.count()

144

In [22]:
# Criando o modelo
rfClassifer = RandomForestClassifier(labelCol = "indexed", featuresCol = "pcaFeatures")
modelo = rfClassifer.fit(dados_treino)

In [23]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)
predictions.select("prediction", "indexed", "label", "pcaFeatures").collect()

[Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-9374.0231, 32.9765, -0.9511])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-7190.0255, 37.3733, 0.7344])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-2843.0225, 34.1656, -0.5582])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-2349.0201, 30.4898, 0.47])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-2030.0265, 40.6915, -0.9059])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1906.0356, 54.7786, -0.049])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1877.0356, 54.785, 0.2511])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1831.0215, 32.8212, -0.8522])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1811.0266, 40.8265, -0.5194])),
 Row(prediction=1.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1699.0234

In [24]:
# Avaliando a acurácia
evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "indexed", metricName = "accuracy")
evaluator.evaluate(predictions)      

0.75

In [25]:
# Confusion Matrix
predictions.groupBy("indexed", "prediction").count().show()

+-------+----------+-----+
|indexed|prediction|count|
+-------+----------+-----+
|    1.0|       1.0|   27|
|    0.0|       1.0|   15|
|    1.0|       0.0|   21|
|    0.0|       0.0|   81|
+-------+----------+-----+

