### Inicialização do pyspark

In [3]:
sc.version

'2.4.6'

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.appName('decision_tree').getOrCreate()
sc = spark.sparkContext

### Implementação de árvore de decisão para classificação MLlib

In [3]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
#Carrega e processo os dados para dentro de um RDD de LabeledPoint
data = MLUtils.loadLibSVMFile(sc, 'data/iris_libsvm.txt')

#Separa os dados em treino e teste
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [9]:
data.collect()[2]

LabeledPoint(0.0, (4,[0,1,2,3],[4.7,3.2,1.3,0.2]))

In [14]:
#Treina o modelo
#categoricalFeaturesInfo vazio indica que todas as features são contínuas
model = DecisionTree.trainClassifier(trainingData, numClasses=3, categoricalFeaturesInfo={}
                                    , impurity='gini', maxDepth=5, maxBins=32)

#Valida o modelo em instâncias de teste e calcula o erro no teste
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test error: ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

Test error: 0.041666666666666664
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 13 nodes
  If (feature 2 <= 2.45)
   Predict: 0.0
  Else (feature 2 > 2.45)
   If (feature 3 <= 1.75)
    If (feature 2 <= 5.05)
     Predict: 1.0
    Else (feature 2 > 5.05)
     If (feature 0 <= 6.05)
      Predict: 1.0
     Else (feature 0 > 6.05)
      Predict: 2.0
   Else (feature 3 > 1.75)
    If (feature 2 <= 4.85)
     If (feature 0 <= 5.95)
      Predict: 1.0
     Else (feature 0 > 5.95)
      Predict: 2.0
    Else (feature 2 > 4.85)
     Predict: 2.0



In [13]:
#Save and load the model
#model.save(sc, 'tmp/myDecisionTreeClassificationModel')
#sameModel = DecisionTreeModel.load(sc, 'tmp/myDecisionTreeClassificationModel')

Test error: 0.041666666666666664
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 13 nodes
  If (feature 2 <= 2.45)
   Predict: 0.0
  Else (feature 2 > 2.45)
   If (feature 3 <= 1.75)
    If (feature 2 <= 5.05)
     Predict: 1.0
    Else (feature 2 > 5.05)
     If (feature 0 <= 6.05)
      Predict: 1.0
     Else (feature 0 > 6.05)
      Predict: 2.0
   Else (feature 3 > 1.75)
    If (feature 2 <= 4.85)
     If (feature 0 <= 5.95)
      Predict: 1.0
     Else (feature 0 > 5.95)
      Predict: 2.0
    Else (feature 2 > 4.85)
     Predict: 2.0



---

### Hiperparâmetros
### Hiperparâmetros de especificação do problema
<b>numClasses</b>: Número de classes da variável dependente (Apenas para classificação) 

<b>categoricalFeaturesInfo</b>: Dicionário especificando quais variáveis independentes são categóricas, não é obrigatório mas pode ajudar no tempo de treinamento do modelo

### Critérios de parada do modelo
<b>maxDepth</b>: Profundidade máxima da árvore

<b>minInstancesPerNode</b>: Para que um ramo seja splitado, cada uma de suas folhas deve ter o mínimo de datapoints especificados neste parâmetro

<b>minInfoGain</b>: Para que um ramo seja splitado, deve ter um ganho de informação mínimo especificado neste parâmetro

### Parâmetros de tunagem
<b>maxBins</b>: Número máximo de bins utilizado para discretizar variáveis contínuas

<b>maxMemoryInMB</b>: Quantidade de memória que será utilizada para coletar estatísticas dos dados. Por padrão é 246mb para que o algoritmo rode na maioria dos ambientes. Aumentar este parâmetro pode deixar o treinamento mais rápido, se houver memória disponível

<b>subsamplingRate</b>: Fração dos dados de treinamento utilizados para treinar o modelo. Este parâmetro é relevante apenas para ensembles (RandomForest ou GradientBoostedTrees)

<b>impurity</b>: Especifica qual cálculo será utilizado para realizar o split 'gini' ou 'entropy'

---

### Árvore de decisão para classificação no pyspark.ML

Utilizando o exemplo do dataset da pesquisa de satisfação do aeroporto de São Francisco

In [1]:
from pyspark import SparkContext
from pyspark import sql

In [2]:
sc = SparkContext(master='local', appName='PesquisaSatisfacaoSFO')
spark = sql.SparkSession(sc)

In [3]:
# sc.stop()

In [4]:
#Para conseguir visualizar a árvore, o spark deve ser inicializado da seguinte forma:
# from pyspark import SparkConf, SparkContext
# from pyspark.sql import SparkSession

# jar_file = 'C:\\Repositorios\\pyspark_decision_tree\\spark-tree-plotting\\target\\scala-2.11\\spark-tree-plotting_0.2.jar'

# spark = SparkSession\
#     .builder\
#     .config("spark.jars", jar_file)\
#     .appName("PesquisaSatisfacaoSFO")\
#     .getOrCreate()

# sc = spark.sparkContext

In [5]:
pesquisa = spark.read.csv('data//2013_SFO_Customer_Survey.csv', inferSchema=True, header=True)

In [6]:
pesquisa.printSchema()

root
 |-- RESPNUM: integer (nullable = true)
 |-- CCGID: integer (nullable = true)
 |-- RUN: integer (nullable = true)
 |-- INTDATE: integer (nullable = true)
 |-- GATE: integer (nullable = true)
 |-- STRATA: integer (nullable = true)
 |-- PEAK: integer (nullable = true)
 |-- METHOD: integer (nullable = true)
 |-- AIRLINE: integer (nullable = true)
 |-- FLIGHT: integer (nullable = true)
 |-- DEST: integer (nullable = true)
 |-- DESTGEO: integer (nullable = true)
 |-- DESTMARK: integer (nullable = true)
 |-- ARRTIME: string (nullable = true)
 |-- DEPTIME: string (nullable = true)
 |-- Q2PURP1: integer (nullable = true)
 |-- Q2PURP2: integer (nullable = true)
 |-- Q2PURP3: integer (nullable = true)
 |-- Q2PURP4: integer (nullable = true)
 |-- Q2PURP5: integer (nullable = true)
 |-- Q2PURP6: string (nullable = true)
 |-- Q3GETTO1: integer (nullable = true)
 |-- Q3GETTO2: integer (nullable = true)
 |-- Q3GETTO3: integer (nullable = true)
 |-- Q3GETTO4: integer (nullable = true)
 |-- Q3GETT

In [7]:
colunasEstudo = pesquisa.columns[35:50]
df = pesquisa.select(colunasEstudo)

---

In [8]:
map_medias = map(lambda col: 'mean(' + col + ') ' + col, colunasEstudo)
medias = list(map_medias)
medias[:5]

['mean(Q7A_ART) Q7A_ART',
 'mean(Q7B_FOOD) Q7B_FOOD',
 'mean(Q7C_SHOPS) Q7C_SHOPS',
 'mean(Q7D_SIGNS) Q7D_SIGNS',
 'mean(Q7E_WALK) Q7E_WALK']

In [9]:
df_medias = df.selectExpr(medias)

In [10]:
df_medias.toPandas()

Unnamed: 0,Q7A_ART,Q7B_FOOD,Q7C_SHOPS,Q7D_SIGNS,Q7E_WALK,Q7F_SCREENS,Q7G_INFOARR,Q7H_INFODEP,Q7I_WIFI,Q7J_ROAD,Q7K_PARK,Q7L_AIRTRAIN,Q7M_LTPARK,Q7N_RENTAL,Q7O_WHOLE
0,4.300707,3.951909,3.921641,3.931542,4.114851,4.029986,4.645262,4.667044,4.27157,4.501839,4.82546,4.83819,5.032815,4.92645,3.874399


---

In [11]:
df.selectExpr('mean(Q7O_WHOLE) Q7O_WHOLE').show()

+------------------+
|         Q7O_WHOLE|
+------------------+
|3.8743988684582744|
+------------------+



---

In [12]:
treinamento = df.withColumn('label', df['Q7O_WHOLE']*1.0).na.replace(0,3).replace(6,3)

In [13]:
treinamento.limit(1).toPandas()

Unnamed: 0,Q7A_ART,Q7B_FOOD,Q7C_SHOPS,Q7D_SIGNS,Q7E_WALK,Q7F_SCREENS,Q7G_INFOARR,Q7H_INFODEP,Q7I_WIFI,Q7J_ROAD,Q7K_PARK,Q7L_AIRTRAIN,Q7M_LTPARK,Q7N_RENTAL,Q7O_WHOLE,label
0,3,4,3,3,3,3,3,3,3,3,3,3,3,3,3,3.0


In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

colunasInput = colunasEstudo[0:-1]
va = VectorAssembler(inputCols=colunasInput, outputCol='features')
dtreg = DecisionTreeRegressor(labelCol='label', featuresCol='features', maxDepth=4)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='label')
grid = ParamGridBuilder().addGrid(dtreg.maxDepth, [3, 5, 7, 10]).build()
cv = CrossValidator(estimator=dtreg, estimatorParamMaps=grid, evaluator=evaluator, numFolds=10)
pipeline = Pipeline(stages=[va, dtreg])

In [15]:
modelo = pipeline.fit(treinamento)

In [16]:
display(modelo.stages[1])

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_e70b928fd122) of depth 4 with 31 nodes

In [17]:
predicoes = modelo.transform(treinamento)

In [18]:
predicoes.limit(1).toPandas()

Unnamed: 0,Q7A_ART,Q7B_FOOD,Q7C_SHOPS,Q7D_SIGNS,Q7E_WALK,Q7F_SCREENS,Q7G_INFOARR,Q7H_INFODEP,Q7I_WIFI,Q7J_ROAD,Q7K_PARK,Q7L_AIRTRAIN,Q7M_LTPARK,Q7N_RENTAL,Q7O_WHOLE,label,features,prediction
0,3,4,3,3,3,3,3,3,3,3,3,3,3,3,3,3.0,"[3.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, ...",3.676829


In [19]:
avaliador = RegressionEvaluator()
avaliador.evaluate(predicoes, {evaluator.metricName: 'rmse'})

0.555808023551782

### feature importance

In [20]:
#Visualização das importâncias das features
modelo.stages[1].featureImportances

SparseVector(14, {0: 0.0653, 1: 0.1173, 2: 0.0099, 3: 0.5219, 4: 0.0052, 5: 0.2403, 8: 0.0028, 10: 0.0059, 13: 0.0314})

In [21]:
#Extrai featureImportances para um array np
importanciaFeatures = modelo.stages[1].featureImportances.toArray()

In [22]:
importanciaFeatures

array([0.06525145, 0.11726446, 0.0099403 , 0.52193029, 0.00523214,
       0.24034459, 0.        , 0.        , 0.0027749 , 0.        ,
       0.00585446, 0.        , 0.        , 0.03140742])

In [23]:
#Utiliza a função map para iterar pelos campos do dataframe e extrair o nome de cada campo
nomeFeatures = map(lambda feat: feat.name, df.schema.fields)

In [24]:
#Concatena o map dos nomes dos campos com os valores de importância
importanciaFeaturesMap = zip(importanciaFeatures, nomeFeatures)

In [25]:
importanciaFeaturesMap

<zip at 0x12abc203748>

In [26]:
#Cria um dataframe trocando de posição (nome da feature antes, valor de importância depois)
dfImportanciaFeatures = spark.createDataFrame(sc.parallelize(importanciaFeaturesMap).map(lambda r: [r[1], float(r[0])]))

In [27]:
#Renomeia colunas do dataframe
dfImportanciaFeatures = dfImportanciaFeatures.withColumnRenamed('_1', 'Feature').withColumnRenamed('_2', 'Importancia')

In [28]:
dfImportanciaFeatures.orderBy('Importancia').show()

+------------+--------------------+
|     Feature|         Importancia|
+------------+--------------------+
|Q7L_AIRTRAIN|                 0.0|
| Q7H_INFODEP|                 0.0|
|  Q7M_LTPARK|                 0.0|
|    Q7J_ROAD|                 0.0|
| Q7G_INFOARR|                 0.0|
|    Q7I_WIFI|0.002774900310175427|
|    Q7E_WALK|0.005232136454289954|
|    Q7K_PARK|0.005854457703262646|
|   Q7C_SHOPS|0.009940295196927703|
|  Q7N_RENTAL| 0.03140742223542566|
|     Q7A_ART| 0.06525145453249198|
|    Q7B_FOOD| 0.11726445589519406|
| Q7F_SCREENS|   0.240344587239121|
|   Q7D_SIGNS|  0.5219302904331116|
+------------+--------------------+



### Visualização da árvore

Instalação manual da biblioteca que gera a visualização, necessário instalar o sbt

git clone https://github.com/julioasotodv/spark-tree-plotting.git

cd spark-tree-plotting

sbt assembly

In [29]:
arvore = modelo.stages[1]

In [32]:
#Representação em texto da árvore
arvore_str = arvore.toDebugString

---

In [35]:
import json

# Parser
def parse(lines):
    block = []
    while lines :

        if lines[0].startswith('If'):
            bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
            block.append({'name':bl, 'children':parse(lines)})


            if lines[0].startswith('Else'):
                be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
                block.append({'name':be, 'children':parse(lines)})
        elif not lines[0].startswith(('If','Else')):
            block2 = lines.pop(0)
            block.append({'name':block2})
        else:
            break	
    return block

# Convert Tree to JSON
def tree_json(tree):
    data = []
    for line in tree.splitlines() : 
        if line.strip():
            line = line.strip()
            data.append(line)
        else : break
        if not line : break
    res = []
    res.append({'name':'Root', 'children':parse(data[1:])})
    with open('C:\Repositorios\pyspark_decision_tree\structure.json', 'w') as outfile:
        json.dump(res[0], outfile)
    print ('Conversion Success !')

In [36]:
tree_json(arvore_str)

Conversion Success !
