## Spark-MLlib - Classificação - Decision Tree

### Problema de Negócio
- Objetivo: Prever o tipo da espécie da planta (Setosa, Versicolour, Virginica) a partir dos comprimentos e alturas da petala e da sepala.
    - Species será a variável target e as demais variáveis serão as features (variáveis preditoras).

### Dataset
 - Nome: Iris Data Set
 - Fonte: http://archive.ics.uci.edu/ml/datasets/Iris
 - Descrição: Os dados se referem ao consumo de combustível in galões por milha.
 - Atributos:
    - 1. sepal length in cm
    - 2. sepal width in cm
    - 3. petal length in cm
    - 4. petal width in cm
    - 5. Species:
        -- Iris Setosa
        -- Iris Versicolour
        -- Iris Virginica

### Tecnologias utilizadas
- Modelo Preditivo: Decision Tree
- JDK 1.8
- Apache Spark 2.4.2

### SparkSession e importação do dataset

In [2]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# Spark Session
spSession = SparkSession.builder.master("local").appName("SparkMLLib").getOrCreate()

In [4]:
# Carregando os dados e gerando um RDD
irisRDD = sc.textFile('data/iris.csv')

In [5]:
# Colocando o RDD em cache - otimização de performance
irisRDD.cache()

data/iris.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [6]:
irisRDD.count()

151

In [7]:
irisRDD.take(5)

['Sepal.Length,Sepal.Width,Petal.Length,Petal.Width,Species',
 '5.1,3.5,1.4,0.2,setosa',
 '4.9,3,1.4,0.2,setosa',
 '4.7,3.2,1.3,0.2,setosa',
 '4.6,3.1,1.5,0.2,setosa']

In [9]:
# Remmovendo o cabeçalho
irisRDD2= irisRDD.filter(lambda x: 'Sepal' not in x)
irisRDD2.count()

150

### Limpeza dos dados

In [11]:
# Separando as colunas
irisRDD3 = irisRDD2.map(lambda l: l.split(","))

In [12]:
# Mapeando as colunas
irisRDD4 = irisRDD3.map(lambda p: Row(SEPAL_LENGTH = float(p[0]), SEPAL_WIDTH = float(p[1]), 
                                      PETAL_LENGTH = float(p[2]), PETAL_WIDTH = float(p[3]), 
                                      SPECIES = p[4] ))

In [13]:
# Criando um Dataframe
irisDF = spSession.createDataFrame(irisRDD4)
irisDF.cache()

DataFrame[PETAL_LENGTH: double, PETAL_WIDTH: double, SEPAL_LENGTH: double, SEPAL_WIDTH: double, SPECIES: string]

In [14]:
irisDF.take(5)

[Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.1, SEPAL_WIDTH=3.5, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.9, SEPAL_WIDTH=3.0, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.3, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.7, SEPAL_WIDTH=3.2, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.5, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.6, SEPAL_WIDTH=3.1, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.0, SEPAL_WIDTH=3.6, SPECIES='setosa')]

In [15]:
# Criando um índice numérico para a coluna de label target
stringIndexer = StringIndexer(inputCol = "SPECIES", outputCol = "IDX_SPECIES")
si_model = stringIndexer.fit(irisDF)
irisNormDF = si_model.transform(irisDF)

In [16]:
irisNormDF.select("SPECIES","IDX_SPECIES").distinct().collect()

[Row(SPECIES='versicolor', IDX_SPECIES=0.0),
 Row(SPECIES='setosa', IDX_SPECIES=2.0),
 Row(SPECIES='virginica', IDX_SPECIES=1.0)]

### Análise exploratória dos dados

In [17]:
# Estatística descritiva
irisNormDF.describe().show()

+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      PETAL_LENGTH|       PETAL_WIDTH|      SEPAL_LENGTH|       SEPAL_WIDTH|  SPECIES|       IDX_SPECIES|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               150|               150|               150|               150|      150|               150|
|   mean| 3.758000000000001|1.1993333333333331| 5.843333333333332|3.0573333333333337|     null|               1.0|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778634|0.4358662849366978|     null|0.8192319205190404|
|    min|               1.0|               0.1|               4.3|               2.0|   setosa|               0.0|
|    max|               6.9|               2.5|               7.9|               4.4|virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

In [19]:
# Encontrando a correlação entre a variável target com as variáveis preditoras
for i in irisNormDF.columns:
    if not(isinstance(irisNormDF.select(i).take(1)[0][0], str)) :
        print("Correlação da variável IDX_SPECIES com", i, irisNormDF.stat.corr('IDX_SPECIES', i))

Correlação da variável IDX_SPECIES com PETAL_LENGTH -0.649241830764174
Correlação da variável IDX_SPECIES com PETAL_WIDTH -0.5803770334306263
Correlação da variável IDX_SPECIES com SEPAL_LENGTH -0.46003915650023686
Correlação da variável IDX_SPECIES com SEPAL_WIDTH 0.6183715308237433
Correlação da variável IDX_SPECIES com IDX_SPECIES 1.0


### Pré-processamento dos dados

In [21]:
# Criando um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes para o modelo ou com baixa correlação
def transformaVar(row) :
    obj = (row["SPECIES"], row["IDX_SPECIES"], Vectors.dense([row["SEPAL_LENGTH"], row["SEPAL_WIDTH"], 
                                                              row["PETAL_LENGTH"], row["PETAL_WIDTH"]]))
    return obj

In [25]:
# Convertendo para RDD
irisRDD5 = irisNormDF.rdd.map(transformaVar)

In [26]:
type(irisRDD5)

pyspark.rdd.PipelinedRDD

In [23]:
irisRDD5.take(5)

[('setosa', 2.0, DenseVector([5.1, 3.5, 1.4, 0.2])),
 ('setosa', 2.0, DenseVector([4.9, 3.0, 1.4, 0.2])),
 ('setosa', 2.0, DenseVector([4.7, 3.2, 1.3, 0.2])),
 ('setosa', 2.0, DenseVector([4.6, 3.1, 1.5, 0.2])),
 ('setosa', 2.0, DenseVector([5.0, 3.6, 1.4, 0.2]))]

In [24]:
irisDF = spSession.createDataFrame(irisRDD5,["species", "label", "features"])
irisDF.select("species","label","features").show(10)
irisDF.cache()

+-------+-----+-----------------+
|species|label|         features|
+-------+-----+-----------------+
| setosa|  2.0|[5.1,3.5,1.4,0.2]|
| setosa|  2.0|[4.9,3.0,1.4,0.2]|
| setosa|  2.0|[4.7,3.2,1.3,0.2]|
| setosa|  2.0|[4.6,3.1,1.5,0.2]|
| setosa|  2.0|[5.0,3.6,1.4,0.2]|
| setosa|  2.0|[5.4,3.9,1.7,0.4]|
| setosa|  2.0|[4.6,3.4,1.4,0.3]|
| setosa|  2.0|[5.0,3.4,1.5,0.2]|
| setosa|  2.0|[4.4,2.9,1.4,0.2]|
| setosa|  2.0|[4.9,3.1,1.5,0.1]|
+-------+-----+-----------------+
only showing top 10 rows



DataFrame[species: string, label: double, features: vector]

### Machine Learning

In [27]:
# Split do dataset
(dados_treino, dados_teste) = irisDF.randomSplit([0.7, 0.3])

In [28]:
dados_treino.count()

109

In [30]:
dados_teste.count()

41

In [32]:
# Construindo o modelo com os dados de treino
dtClassifier = DecisionTreeClassifier(maxDepth = 2, labelCol = "label", featuresCol = "features")
modelo = dtClassifier.fit(dados_treino)

In [33]:
modelo.numNodes
modelo.depth

2

In [34]:
# Previsões com dados de teste
previsoes = modelo.transform(dados_teste)
previsoes.select("prediction","species","label").collect()

[Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=1.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=1.0, species='versicolor', label=0

In [35]:
# Avaliando a acurácia
avaliador = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "accuracy")
avaliador.evaluate(previsoes)  

0.9024390243902439

In [36]:
# Resumindo as previsões - Confusion Matrix
previsoes.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   15|
|  0.0|       1.0|    3|
|  2.0|       2.0|   10|
|  1.0|       0.0|    1|
|  0.0|       0.0|   12|
+-----+----------+-----+

