# Devfest 2017
# Apache Spark: Casos de Uso e Escalabilidade

## flavio.clesio@movile.com / eiti.kimura@movile.com

Este notebook mostra um pequeno exemplo de como usar o PySpark para tarefas de Machine Learning dentro do Spark MLLLib. 

### 1) Boiler Plate code

In [1]:
import os
import sys

SPARK_PATH = "/Users/flavio.clesio/Documents/spark-2.1.0" 

os.environ['SPARK_HOME'] = SPARK_PATH
os.environ['HADOOP_HOME'] = SPARK_PATH

sys.path.append(SPARK_PATH + "/bin")
sys.path.append(SPARK_PATH + "/python")
sys.path.append(SPARK_PATH + "/python/pyspark/")
sys.path.append(SPARK_PATH + "/python/lib")
sys.path.append(SPARK_PATH + "/python/lib/pyspark.zip")
sys.path.append(SPARK_PATH + "/python/lib/py4j-0.10.4-src.zip")

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.sql import SparkSession

sc = SparkContext("local", "test")
spark = SparkSession(sc)

data = spark.read.format("libsvm").load(SPARK_PATH + "/data/mllib/sample_libsvm_data.txt")

data.show(3)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 3 rows



### 2) Feature Extraction

Vamos usar aqui duas transformações do Spark MLLib que são o `StringIndexer` que vai incluir todos os labels em um único índice; e o `VectorIndexer` automaticamente vai indexar as features. O parâmetro `maxCategories` estará em `4` para que as features que tenham mais de `4` valores distintos sejam tratadas pelo modelo como contínuas. 

In [2]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

Vamos dividir a nossa base de dados em 70% para treino e 30% para testes.

In [3]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

### 3) Treinamento do modelo 

Primeiramente vamos chamar a classe `DecisionTreeClassifier` e vamos passar os objetos `StringIndexer` e `VectorIndexer` que construímos anteriormente.

In [4]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

Aqui vamos usar um recurso do Spark MLLib que é o Pipeline.  

O conceito do pipeline é colocar em uma sequência encadeada inúmeros algoritmos para o processamento dos dados até mesmo à parte de aprendizado. Cada passo dentro do pipeline é chamado de `PipelineStages` em que podem ser feitas transformações e o uso de estimadores de forma encadeada.  

Isso ajuda na simplificação do código, leitura, e principalmente debug. 

No nosso pipeline vamos encadear o `StringIndexer`, o `VectorIndexer` e a nossa árvore de decisão que está no objeto `dt`

In [5]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

Neste fase vamos realizar o treinamento do modelo chamando o método `.fit()`

In [6]:
model = pipeline.fit(trainingData)

Agora vamos realizar algumas predições usando a base de testes; e vamos mostrar 5 registros realizando a comparação entre o que foi previsto pelo modelo (_i.e._ a coluna `prediction`) e o que está na base de testes (_i.e._ a coluna `indexedLabel`).

In [7]:
predictions = model.transform(testData)

predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[123,124,125...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[126,127,128...|
|       1.0|         1.0|(692,[126,127,128...|
+----------+------------+--------------------+
only showing top 5 rows



### 4) Gráfico da nossa árvore de decisão

Neste step, vamos ver como ficou a estrutura de decisão da nossa árvore gerada pelo algortimo.

In [8]:
print(model.stages[2].toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_42ceb20c9dc1d26ae66a) of depth 2 with 5 nodes
  If (feature 406 <= 20.0)
   If (feature 99 in {2.0})
    Predict: 0.0
   Else (feature 99 not in {2.0})
    Predict: 1.0
  Else (feature 406 > 20.0)
   Predict: 0.0



### 5) Serialização do modelo

Todo modelo do MLLib pode ser serializado, _i.e._ pode ser salvo em um arquivo `.parquet` ou `.json` para posterior uso.

Vamos salvar o nosso modelo, e realizar a carga novamente e ver a estrutura do modelo para checar se trata do mesmo modelo. 

In [9]:
model.write().overwrite().save(SPARK_PATH + "/data/mllib/tmp/myDecisionTreeClassificationModel")

In [10]:
sameModel = model.load(SPARK_PATH + "/data/mllib/tmp/myDecisionTreeClassificationModel")

In [11]:
print(sameModel.stages[2].toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_42ceb20c9dc1d26ae66a) of depth 2 with 5 nodes
  If (feature 406 <= 20.0)
   If (feature 99 in {2.0})
    Predict: 0.0
   Else (feature 99 not in {2.0})
    Predict: 1.0
  Else (feature 406 > 20.0)
   Predict: 0.0

