* Master DAC, BDLE, 2021 
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr


# Decision Tree in Spark ML



```
# This is formatted as code
```

## Préparation

Vérifier que des ressources de calcul sont allouées à votre notebook est connecté (cf RAM  de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.




Pour accéder directement aux fichiers stockées sur votre google drive. Renseigner le code d'authentification lorsqu'il est demandé

Ajuster le nom de votre dossier : MyDrive/ens/bdle/dir. **Remplacer dir **

In [None]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ens/bdle/dir"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


[]

Installer pyspark et findspark :


In [None]:
!pip install -q pyspark
!pip install -q findspark

[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[K     |████████████████████████████████| 198 kB 59.9 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Démarrer la session spark

In [None]:
import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [None]:
# Principaux import
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf  

# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark 
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")    
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1635498856257


In [None]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

NameError: ignored

## Test Exemple Cours


In [None]:
data = [["young","high","no","fair","no"],
        ["young","high","no","excellent","no"],
        ["middle","high","no","fair","yes"],
        ["senior","medium","no","fair","yes"],
        ["senior","low","yes","fair","yes"],
        ["senior","low","yes","excellent","no"],
        ["middle","low","yes","excellent","yes"],
        ["young","medium","no","fair","no"],
        ["young","low","yes","fair","yes"],
        ["senior","medium","yes","fair","yes"],
        ["young","medium","yes","excellent","yes"],
        ["middle","medium","no","excellent","yes"],
        ["middle","high","yes","fair","yes"],
        ["senior","medium","no","excellent","no"]]

In [None]:
df = spark.createDataFrame(spark.sparkContext.parallelize(data),'age string, income string, student string, rating string, label string')

In [None]:
from pyspark.ml.feature import StringIndexer
field = 'age'
age_indexer = StringIndexer(inputCol=field, outputCol='indexed_'+field )
df_age_idx=age_indexer.fit(df).transform(df)

In [None]:
df_age_idx.show()

+------+------+-------+---------+-----+-----------+
|   age|income|student|   rating|label|indexed_age|
+------+------+-------+---------+-----+-----------+
| young|  high|     no|     fair|   no|        1.0|
| young|  high|     no|excellent|   no|        1.0|
|middle|  high|     no|     fair|  yes|        2.0|
|senior|medium|     no|     fair|  yes|        0.0|
|senior|   low|    yes|     fair|  yes|        0.0|
|senior|   low|    yes|excellent|   no|        0.0|
|middle|   low|    yes|excellent|  yes|        2.0|
| young|medium|     no|     fair|   no|        1.0|
| young|   low|    yes|     fair|  yes|        1.0|
|senior|medium|    yes|     fair|  yes|        0.0|
| young|medium|    yes|excellent|  yes|        1.0|
|middle|medium|     no|excellent|  yes|        2.0|
|middle|  high|    yes|     fair|  yes|        2.0|
|senior|medium|     no|excellent|   no|        0.0|
+------+------+-------+---------+-----+-----------+



In [None]:
from pyspark.ml.feature import IndexToString
age_rev_indexer= IndexToString(inputCol=age_indexer.getOutputCol(),outputCol='original_age')
df_orig_age=age_rev_indexer.transform(df_age_idx)

In [None]:
df_orig_age.show()

+------+------+-------+---------+-----+-----------+------------+
|   age|income|student|   rating|label|indexed_age|original_age|
+------+------+-------+---------+-----+-----------+------------+
| young|  high|     no|     fair|   no|        1.0|       young|
| young|  high|     no|excellent|   no|        1.0|       young|
|middle|  high|     no|     fair|  yes|        2.0|      middle|
|senior|medium|     no|     fair|  yes|        0.0|      senior|
|senior|   low|    yes|     fair|  yes|        0.0|      senior|
|senior|   low|    yes|excellent|   no|        0.0|      senior|
|middle|   low|    yes|excellent|  yes|        2.0|      middle|
| young|medium|     no|     fair|   no|        1.0|       young|
| young|   low|    yes|     fair|  yes|        1.0|       young|
|senior|medium|    yes|     fair|  yes|        0.0|      senior|
| young|medium|    yes|excellent|  yes|        1.0|       young|
|middle|medium|     no|excellent|  yes|        2.0|      middle|
|middle|  high|    yes|  

In [None]:
from pyspark.ml.feature import OneHotEncoder 
age_onehotenc= OneHotEncoder(inputCol=age_indexer.getOutputCol(),outputCol='cat_age') 
age_onehotenc.setDropLast(False)
df_age_onehot= age_onehotenc.fit(df_age_idx).transform(df_age_idx)

In [None]:
df_age_onehot.show()

+------+------+-------+---------+-----+-----------+-------------+
|   age|income|student|   rating|label|indexed_age|      cat_age|
+------+------+-------+---------+-----+-----------+-------------+
| young|  high|     no|     fair|   no|        1.0|(3,[1],[1.0])|
| young|  high|     no|excellent|   no|        1.0|(3,[1],[1.0])|
|middle|  high|     no|     fair|  yes|        2.0|(3,[2],[1.0])|
|senior|medium|     no|     fair|  yes|        0.0|(3,[0],[1.0])|
|senior|   low|    yes|     fair|  yes|        0.0|(3,[0],[1.0])|
|senior|   low|    yes|excellent|   no|        0.0|(3,[0],[1.0])|
|middle|   low|    yes|excellent|  yes|        2.0|(3,[2],[1.0])|
| young|medium|     no|     fair|   no|        1.0|(3,[1],[1.0])|
| young|   low|    yes|     fair|  yes|        1.0|(3,[1],[1.0])|
|senior|medium|    yes|     fair|  yes|        0.0|(3,[0],[1.0])|
| young|medium|    yes|excellent|  yes|        1.0|(3,[1],[1.0])|
|middle|medium|     no|excellent|  yes|        2.0|(3,[2],[1.0])|
|middle|  

In [None]:
from pyspark.ml.feature import StringIndexer
label = 'label'
features_col= df.columns
features_col.remove(label)
prefix = 'indexed_'
label_string_indexer= StringIndexer(inputCol=label, outputCol=prefix+label)
features_str_col= list(map(lambda c:prefix+c, features_col))
features_string_indexer= StringIndexer(inputCols=features_col,outputCols=features_str_col)

In [None]:
from pyspark.ml.feature import VectorAssembler,VectorIndexer
vec_assembler= VectorAssembler(inputCols= features_string_indexer.getOutputCols(),outputCol= 'vector')
vec_indexer= VectorIndexer(inputCol='vector',outputCol='features', maxCategories=3)

In [None]:
from pyspark.ml import Pipeline
stages = [label_string_indexer,features_string_indexer,vec_assembler,vec_indexer]
pipeline = Pipeline(stages = stages)
train_data= pipeline.fit(df).transform(df)
train_data.select("features","indexed_label").show()

+-----------------+-------------+
|         features|indexed_label|
+-----------------+-------------+
|[1.0,1.0,0.0,0.0]|          1.0|
|[1.0,1.0,0.0,1.0]|          1.0|
|[2.0,1.0,0.0,0.0]|          0.0|
|        (4,[],[])|          0.0|
|[0.0,2.0,1.0,0.0]|          0.0|
|[0.0,2.0,1.0,1.0]|          1.0|
|[2.0,2.0,1.0,1.0]|          0.0|
|    (4,[0],[1.0])|          1.0|
|[1.0,2.0,1.0,0.0]|          0.0|
|    (4,[2],[1.0])|          0.0|
|[1.0,0.0,1.0,1.0]|          0.0|
|[2.0,0.0,0.0,1.0]|          0.0|
|[2.0,1.0,1.0,0.0]|          0.0|
|    (4,[3],[1.0])|          1.0|
+-----------------+-------------+



In [None]:
from pyspark.ml.classification import DecisionTreeClassificationModel, DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol= "indexed_label")
dtModel= dt.fit(train_data)

In [None]:
from pyspark.ml.classification import DecisionTreeClassificationModel, DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol= "label")
dtModel= dt.fit(train_data)
dtModel.transform(train_data).select("features","label","prediction").show()

+-----------------+-------------+----------+
|         features|indexed_label|prediction|
+-----------------+-------------+----------+
|[1.0,1.0,0.0,0.0]|          1.0|       1.0|
|[1.0,1.0,0.0,1.0]|          1.0|       1.0|
|[2.0,1.0,0.0,0.0]|          0.0|       0.0|
|        (4,[],[])|          0.0|       0.0|
|[0.0,2.0,1.0,0.0]|          0.0|       0.0|
|[0.0,2.0,1.0,1.0]|          1.0|       1.0|
|[2.0,2.0,1.0,1.0]|          0.0|       0.0|
|    (4,[0],[1.0])|          1.0|       1.0|
|[1.0,2.0,1.0,0.0]|          0.0|       0.0|
|    (4,[2],[1.0])|          0.0|       0.0|
|[1.0,0.0,1.0,1.0]|          0.0|       0.0|
|[2.0,0.0,0.0,1.0]|          0.0|       0.0|
|[2.0,1.0,1.0,0.0]|          0.0|       0.0|
|    (4,[3],[1.0])|          1.0|       1.0|
+-----------------+-------------+----------+



In [None]:
print(dtModel.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_1cca20a63964, depth=4, numNodes=13, numClasses=2, numFeatures=4
  If (feature 0 in {2.0})
   Predict: 0.0
  Else (feature 0 not in {2.0})
   If (feature 2 in {1.0})
    If (feature 3 in {0.0})
     Predict: 0.0
    Else (feature 3 not in {0.0})
     If (feature 0 in {1.0})
      Predict: 0.0
     Else (feature 0 not in {1.0})
      Predict: 1.0
   Else (feature 2 not in {1.0})
    If (feature 0 in {0.0})
     If (feature 3 in {0.0})
      Predict: 0.0
     Else (feature 3 not in {0.0})
      Predict: 1.0
    Else (feature 0 not in {0.0})
     Predict: 1.0



In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
dt_paramGrid= ParamGridBuilder().addGrid(dt.maxBins, [40,42]).addGrid(dt.minInstancesPerNode, [10,100]).build()

In [None]:
dt = DecisionTreeClassifier(featuresCol="features", labelCol= "label")
cv = CrossValidator(estimator=dt,estimatorParamMaps=dt_paramGrid ,evaluator=BinaryClassificationEvaluator(),numFolds=5,parallelism=2)

In [None]:
train_data.show()

+------+------+-------+---------+-----+-------------+-----------+--------------+---------------+--------------+-----------------+-----------------+
|   age|income|student|   rating|label|indexed_label|indexed_age|indexed_income|indexed_student|indexed_rating|           vector|         features|
+------+------+-------+---------+-----+-------------+-----------+--------------+---------------+--------------+-----------------+-----------------+
| young|  high|     no|     fair|   no|          1.0|        1.0|           1.0|            0.0|           0.0|[1.0,1.0,0.0,0.0]|[1.0,1.0,0.0,0.0]|
| young|  high|     no|excellent|   no|          1.0|        1.0|           1.0|            0.0|           1.0|[1.0,1.0,0.0,1.0]|[1.0,1.0,0.0,1.0]|
|middle|  high|     no|     fair|  yes|          0.0|        2.0|           1.0|            0.0|           0.0|[2.0,1.0,0.0,0.0]|[2.0,1.0,0.0,0.0]|
|senior|medium|     no|     fair|  yes|          0.0|        0.0|           0.0|            0.0|           0.0| 

In [None]:
train_data_final=train_data.select("features","indexed_label").withColumnRenamed("indexed_label","label")

In [None]:

cvModel=cv.fit(train_data_final)

In [None]:
cvModel.transform(train_data_final).select("features","label","prediction").show()

+-----------------+-----+----------+
|         features|label|prediction|
+-----------------+-----+----------+
|[1.0,1.0,0.0,0.0]|  1.0|       1.0|
|[1.0,1.0,0.0,1.0]|  1.0|       1.0|
|[2.0,1.0,0.0,0.0]|  0.0|       0.0|
|        (4,[],[])|  0.0|       0.0|
|[0.0,2.0,1.0,0.0]|  0.0|       0.0|
|[0.0,2.0,1.0,1.0]|  1.0|       1.0|
|[2.0,2.0,1.0,1.0]|  0.0|       0.0|
|    (4,[0],[1.0])|  1.0|       1.0|
|[1.0,2.0,1.0,0.0]|  0.0|       0.0|
|    (4,[2],[1.0])|  0.0|       0.0|
|[1.0,0.0,1.0,1.0]|  0.0|       0.0|
|[2.0,0.0,0.0,1.0]|  0.0|       0.0|
|[2.0,1.0,1.0,0.0]|  0.0|       0.0|
|    (4,[3],[1.0])|  1.0|       1.0|
+-----------------+-----+----------+



##Projct Data Loading

