### Spark MLLib - Classificação - Decision Tree

**Descrição**

    . Fácil de compreender e fácil de explicar
    . Variáveis preditoras são usadas para construir uma árvore que pregressivamente prevê valores target
    . Dados de treino são usados para construit a árvore de decisão e preve o valor target.
    . A árvore de decisão se torna um modelo que é usaod para fazer previsões com novos dados.

**Vantagens**

    - Fácil de interpretar e explicar
    - Funciona com valores missing
    - Veloz

**Desvantagens**

    - Acurácia limitada
    - Bias podem ocorrer com frequencia
    - Não funciona muito bem com muitas variáveis preditoras

**Aplicação**

    - Aprovação de crédito
    - Categorização preliminar

### Classificar as espécies de flores, listadas no dataset iris

In [1]:
# imports dos pacotes
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("SparkMLLib-Classification").getOrCreate()

In [3]:
# Carregando os dados e gerando um RDD
irisRDD = sc.textFile("iris.csv")

In [4]:
# Colocando o RDD em cache. Esse processo otimiza a performance
irisRDD.cache()

iris.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
# Contando o numero de elemntos do RDD
irisRDD.count()

151

In [6]:
# Mostrando os pimeiros elementos do RDD
irisRDD.take(7)

['Sepal.Length,Sepal.Width,Petal.Length,Petal.Width,Species',
 '5.1,3.5,1.4,0.2,setosa',
 '4.9,3,1.4,0.2,setosa',
 '4.7,3.2,1.3,0.2,setosa',
 '4.6,3.1,1.5,0.2,setosa',
 '5,3.6,1.4,0.2,setosa',
 '5.4,3.9,1.7,0.4,setosa']

In [11]:
# Removendo a primeira linha do arquivo (cabeçalho)
header = irisRDD.first()
irisRDD2 = irisRDD.filter(lambda line: line != header)

In [12]:
irisRDD2.take(5)

['5.1,3.5,1.4,0.2,setosa',
 '4.9,3,1.4,0.2,setosa',
 '4.7,3.2,1.3,0.2,setosa',
 '4.6,3.1,1.5,0.2,setosa',
 '5,3.6,1.4,0.2,setosa']

### Limpeza dos dados

In [13]:
# Separando as colunas
irisRDD3 = irisRDD2.map(lambda columns: columns.split(","))

In [17]:
# Mapeando as colunas - Convertendo os valores para numericos
irisRDD4 = irisRDD3.map(lambda p: Row(SEPAL_LENGTH = float(p[0]), SEPAL_WIDTH = float(p[1]),
                                      PETAL_LENGTH = float(p[2]), PETAL_WIDTH = float(p[3]), 
                                      SPECIES = p[4]))

In [18]:
# Criando um dataframe - Spark Session
irisDF = spSession.createDataFrame(irisRDD4)

In [19]:
# Alocando o dataframe para o cache
irisDF.cache()

DataFrame[PETAL_LENGTH: double, PETAL_WIDTH: double, SEPAL_LENGTH: double, SEPAL_WIDTH: double, SPECIES: string]

In [20]:
irisDF.take(5)

[Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.1, SEPAL_WIDTH=3.5, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.9, SEPAL_WIDTH=3.0, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.3, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.7, SEPAL_WIDTH=3.2, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.5, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.6, SEPAL_WIDTH=3.1, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.0, SEPAL_WIDTH=3.6, SPECIES='setosa')]

In [23]:
# Criando um índice numérico para a coluna de label target
stringIndexer = StringIndexer(inputCol = "SPECIES", outputCol = "IDX_SPECIES")
si_model = stringIndexer.fit(irisDF)
irisNormDF = si_model.transform(irisDF) 

In [24]:
irisNormDF.select("SPECIES", "IDX_SPECIES").distinct().collect()

[Row(SPECIES='versicolor', IDX_SPECIES=0.0),
 Row(SPECIES='setosa', IDX_SPECIES=2.0),
 Row(SPECIES='virginica', IDX_SPECIES=1.0)]

### Análise Exploratória de Dados

In [25]:
# Estatistica Descritiva
irisNormDF.describe().show()

+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      PETAL_LENGTH|       PETAL_WIDTH|      SEPAL_LENGTH|       SEPAL_WIDTH|  SPECIES|       IDX_SPECIES|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               150|               150|               150|               150|      150|               150|
|   mean| 3.758000000000001|1.1993333333333331| 5.843333333333332|3.0573333333333337|     null|               1.0|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778634|0.4358662849366978|     null|0.8192319205190404|
|    min|               1.0|               0.1|               4.3|               2.0|   setosa|               0.0|
|    max|               6.9|               2.5|               7.9|               4.4|virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

In [26]:
# Correlação entre as variáveis
for column in irisNormDF.columns:
    if not(isinstance(irisNormDF.select(column).take(1)[0][0], str)):
        print("Correlação da variavel IDX_SPECIES com ", column, irisNormDF.stat.corr("IDX_SPECIES", column))

Correlação da variavel IDX_SPECIES com  PETAL_LENGTH -0.649241830764174
Correlação da variavel IDX_SPECIES com  PETAL_WIDTH -0.5803770334306264
Correlação da variavel IDX_SPECIES com  SEPAL_LENGTH -0.46003915650023686
Correlação da variavel IDX_SPECIES com  SEPAL_WIDTH 0.6183715308237433
Correlação da variavel IDX_SPECIES com  IDX_SPECIES 1.0


### Pré-Processamento dos Dados

In [30]:
# Criando um LabeledPoint (target, Vector(features))
# Remove colunas não relevantes para o modelo ou com baixa correlaçao
def transformaVar(row):
    obj = (row["SPECIES"], row["IDX_SPECIES"], Vectors.dense([row["SEPAL_LENGTH"], row["SEPAL_WIDTH"],
                                                              row["PETAL_LENGTH"], row["PETAL_WIDTH"]]))
    return obj

In [31]:
# Converte o dataframe para RDD - funçao map funciona em RDD´s
irisRDD5 = irisNormDF.rdd.map(transformaVar)

In [32]:
irisRDD5.take(5)

[('setosa', 2.0, DenseVector([5.1, 3.5, 1.4, 0.2])),
 ('setosa', 2.0, DenseVector([4.9, 3.0, 1.4, 0.2])),
 ('setosa', 2.0, DenseVector([4.7, 3.2, 1.3, 0.2])),
 ('setosa', 2.0, DenseVector([4.6, 3.1, 1.5, 0.2])),
 ('setosa', 2.0, DenseVector([5.0, 3.6, 1.4, 0.2]))]

In [35]:
# Transformando novamente para Dataframe
irisDF = spSession.createDataFrame(irisRDD5, ["species", "label", "features"])
irisDF.select("species", "label", "features").show(10)
irisDF.cache()

+-------+-----+-----------------+
|species|label|         features|
+-------+-----+-----------------+
| setosa|  2.0|[5.1,3.5,1.4,0.2]|
| setosa|  2.0|[4.9,3.0,1.4,0.2]|
| setosa|  2.0|[4.7,3.2,1.3,0.2]|
| setosa|  2.0|[4.6,3.1,1.5,0.2]|
| setosa|  2.0|[5.0,3.6,1.4,0.2]|
| setosa|  2.0|[5.4,3.9,1.7,0.4]|
| setosa|  2.0|[4.6,3.4,1.4,0.3]|
| setosa|  2.0|[5.0,3.4,1.5,0.2]|
| setosa|  2.0|[4.4,2.9,1.4,0.2]|
| setosa|  2.0|[4.9,3.1,1.5,0.1]|
+-------+-----+-----------------+
only showing top 10 rows



DataFrame[species: string, label: double, features: vector]

### Machine Learning

In [36]:
# dados de Treino e de teste
(dados_treino, dados_teste) = irisDF.randomSplit([0.7, 0.3])

In [37]:
dados_treino.count()

102

In [38]:
dados_teste.count()

48

In [39]:
# Construindo o modelo com os dados de treino
dtClassifier = DecisionTreeClassifier(maxDepth = 2, labelCol = "label", featuresCol = "features")
modelo = dtClassifier.fit(dados_treino)

In [41]:
# Verificando alguns dados sobre a Decision Tree
print("Numero de Nodes: ", modelo.numNodes)

Numero de Nodes:  5


In [42]:
print("Profundidade dos nodes: ", modelo.depth)

Profundidade dos nodes:  2


In [43]:
# Previsões com dados de teste
previsoes = modelo.transform(dados_teste)
previsoes.select("prediction", "species", "label").collect()

[Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Ro

In [44]:
# Avaliando a acurácia do modelo
avaliador = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "accuracy")
avaliador.evaluate(previsoes)

0.9375

In [45]:
# Resumindo as previsões - Confusion Matrix
previsoes.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   17|
|  0.0|       1.0|    1|
|  2.0|       2.0|   12|
|  1.0|       0.0|    2|
|  0.0|       0.0|   16|
+-----+----------+-----+



### Fim