## <font color='blue'>Spark MLLib - Classificação - Decision Tree</font>

## Classificar as espécies de flores, listadas no dataset iris

In [1]:
# Imports 
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [1]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("SparkMLLib").getOrCreate()

In [3]:
# Carregando os dados e gerando um RDD
irisRDD = sc.textFile("data/iris.csv")

In [4]:
# Colocando o RDD em cache. Esse processo otimiza a performance.
irisRDD.cache()

data/iris.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
irisRDD.count()

151

In [6]:
irisRDD.take(5)

['Sepal.Length,Sepal.Width,Petal.Length,Petal.Width,Species',
 '5.1,3.5,1.4,0.2,setosa',
 '4.9,3,1.4,0.2,setosa',
 '4.7,3.2,1.3,0.2,setosa',
 '4.6,3.1,1.5,0.2,setosa']

In [7]:
# Removendo a primeira linha do arquivo (cabeçalho)
irisRDD2 = irisRDD.filter(lambda x: "Sepal" not in x)
irisRDD2.count()

150

## Limpeza dos Dados

In [8]:
# Separando as colunas
irisRDD3 = irisRDD2.map(lambda l: l.split(","))

In [9]:
# Mapeando as colunas
irisRDD4 = irisRDD3.map(lambda p: Row(SEPAL_LENGTH = float(p[0]), SEPAL_WIDTH = float(p[1]), 
                                      PETAL_LENGTH = float(p[2]), PETAL_WIDTH = float(p[3]), 
                                      SPECIES = p[4] ))

In [10]:
# Criando um Dataframe
irisDF = spSession.createDataFrame(irisRDD4)
irisDF.cache()

DataFrame[PETAL_LENGTH: double, PETAL_WIDTH: double, SEPAL_LENGTH: double, SEPAL_WIDTH: double, SPECIES: string]

In [None]:
irisDF.cache()

In [11]:
irisDF.take(5)

[Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.1, SEPAL_WIDTH=3.5, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.9, SEPAL_WIDTH=3.0, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.3, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.7, SEPAL_WIDTH=3.2, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.5, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.6, SEPAL_WIDTH=3.1, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.0, SEPAL_WIDTH=3.6, SPECIES='setosa')]

In [12]:
# Criando um índice numérico para a coluna de label target
# Transforma uma coluna categórica em Numérica
stringIndexer = StringIndexer(inputCol = "SPECIES", outputCol = "IDX_SPECIES")
si_model = stringIndexer.fit(irisDF)
irisNormDF = si_model.transform(irisDF)

In [13]:
irisNormDF.select("SPECIES","IDX_SPECIES").distinct().collect()

[Row(SPECIES='versicolor', IDX_SPECIES=0.0),
 Row(SPECIES='setosa', IDX_SPECIES=2.0),
 Row(SPECIES='virginica', IDX_SPECIES=1.0)]

## Análise Exploratória de Dados

In [14]:
# Estatística descritiva
irisNormDF.describe().show()

+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      PETAL_LENGTH|       PETAL_WIDTH|      SEPAL_LENGTH|       SEPAL_WIDTH|  SPECIES|       IDX_SPECIES|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               150|               150|               150|               150|      150|               150|
|   mean| 3.758000000000001|1.1993333333333331| 5.843333333333332|3.0573333333333337|     null|               1.0|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778634|0.4358662849366978|     null|0.8192319205190404|
|    min|               1.0|               0.1|               4.3|               2.0|   setosa|               0.0|
|    max|               6.9|               2.5|               7.9|               4.4|virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

In [36]:
isinstance(irisNormDF.select("SPECIES").take(1)[0][0], str)

True

In [45]:
irisNormDF.dtypes

[('PETAL_LENGTH', 'double'),
 ('PETAL_WIDTH', 'double'),
 ('SEPAL_LENGTH', 'double'),
 ('SEPAL_WIDTH', 'double'),
 ('SPECIES', 'string'),
 ('IDX_SPECIES', 'double')]

In [15]:
# Correlação entre as variáveis
for i in irisNormDF.columns:
    # Realiza a análise de correlação, somente se a coluna não for do tipo String
    if not(isinstance(irisNormDF.select(i).take(1)[0][0], str)) :
        print("Correlação da variável IDX_SPECIES com", i, irisNormDF.stat.corr('IDX_SPECIES', i))

Correlação da variável IDX_SPECIES com PETAL_LENGTH -0.649241830764174
Correlação da variável IDX_SPECIES com PETAL_WIDTH -0.5803770334306263
Correlação da variável IDX_SPECIES com SEPAL_LENGTH -0.46003915650023686
Correlação da variável IDX_SPECIES com SEPAL_WIDTH 0.6183715308237433
Correlação da variável IDX_SPECIES com IDX_SPECIES 1.0


## Pré-Processamento dos Dados
Apache Spark Requer que os dados estejam em um padrão de dados, em um formato de Vetores, seja denso ou sparso.

In [46]:
# Criando um LabeledPoint (target, Vector[features])
# Criando um vector denso, pois não existem dados com valores = 0
def transformaVar(row) :
    obj = (row["SPECIES"], row["IDX_SPECIES"], Vectors.dense([row["SEPAL_LENGTH"], row["SEPAL_WIDTH"], 
                                                              row["PETAL_LENGTH"], row["PETAL_WIDTH"]]))
    return obj

In [47]:
# Transforma o Dataframe em RDD para ser possível utilizar a função Map
irisRDD5 = irisNormDF.rdd.map(transformaVar)

In [48]:
irisRDD5.take(5)

[('setosa', 2.0, DenseVector([5.1, 3.5, 1.4, 0.2])),
 ('setosa', 2.0, DenseVector([4.9, 3.0, 1.4, 0.2])),
 ('setosa', 2.0, DenseVector([4.7, 3.2, 1.3, 0.2])),
 ('setosa', 2.0, DenseVector([4.6, 3.1, 1.5, 0.2])),
 ('setosa', 2.0, DenseVector([5.0, 3.6, 1.4, 0.2]))]

In [49]:
irisDF = spSession.createDataFrame(irisRDD5,["species", "label", "features"])
irisDF.select("species","label","features").show(10)
irisDF.cache()

+-------+-----+-----------------+
|species|label|         features|
+-------+-----+-----------------+
| setosa|  2.0|[5.1,3.5,1.4,0.2]|
| setosa|  2.0|[4.9,3.0,1.4,0.2]|
| setosa|  2.0|[4.7,3.2,1.3,0.2]|
| setosa|  2.0|[4.6,3.1,1.5,0.2]|
| setosa|  2.0|[5.0,3.6,1.4,0.2]|
| setosa|  2.0|[5.4,3.9,1.7,0.4]|
| setosa|  2.0|[4.6,3.4,1.4,0.3]|
| setosa|  2.0|[5.0,3.4,1.5,0.2]|
| setosa|  2.0|[4.4,2.9,1.4,0.2]|
| setosa|  2.0|[4.9,3.1,1.5,0.1]|
+-------+-----+-----------------+
only showing top 10 rows



DataFrame[species: string, label: double, features: vector]

In [50]:
type(irisDF)

pyspark.sql.dataframe.DataFrame

## Machine Learning

In [51]:
# Dados de Treino e de Teste
(dados_treino, dados_teste) = irisDF.randomSplit([0.7, 0.3])

In [52]:
dados_treino.count()

110

In [53]:
dados_teste.count()

40

In [54]:
# Construindo o modelo com os dados de treino

# maxDepth representa o nível de iterações que o algoritmo irá executar até apresentar o resultado.
# Quanto maior o valor de maxDepth, mais demorado será o treinamento do modelo.
dtClassifer = DecisionTreeClassifier(maxDepth = 2, labelCol = "label", featuresCol = "features")
modelo = dtClassifer.fit(dados_treino)

In [58]:
# Número de Nodes utilizado durante o processamento
modelo.numNodes

5

In [57]:
# Nível de profundidade utilizada em cada Tree
modelo.depth

2

In [59]:
# Previsões com dados de teste
previsoes = modelo.transform(dados_teste)
previsoes.select("prediction","species","label").collect()

[Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=2.0, species='setosa', label=2.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', label=0.0),
 Row(prediction=0.0, species='versicolor', lab

In [63]:
previsoes.show()

+----------+-----+-----------------+--------------+--------------------+----------+
|   species|label|         features| rawPrediction|         probability|prediction|
+----------+-----+-----------------+--------------+--------------------+----------+
|    setosa|  2.0|[4.4,3.0,1.3,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[4.8,3.1,1.6,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.0,3.0,1.6,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.1,3.5,1.4,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.1,3.7,1.5,0.4]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.2,3.4,1.4,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.3,3.7,1.5,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.4,3.7,1.5,0.2]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|       2.0|
|    setosa|  2.0|[5.4,3.9,1.7,0.4]|[0.0,0.0,41.0]|       [0.0,0.0,1.0]|    

In [64]:
# Avaliando a acurácia
avaliador = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "accuracy")
avaliador.evaluate(previsoes)      

0.975

In [65]:
# Resumindo as previsões - Confusion Matrix
previsoes.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   13|
|  2.0|       2.0|    9|
|  1.0|       0.0|    1|
|  0.0|       0.0|   17|
+-----+----------+-----+

