<a href="https://colab.research.google.com/github/EdgarTriquet/PySpark-Projects/blob/main/Pipeline_and_Ml_basics_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Decision Tree in Spark ML



```
# This is formatted as code
```

## Préparation

Vérifier que des ressources de calcul sont allouées à votre notebook est connecté (cf RAM  de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.




Pour accéder directement aux fichiers stockées sur votre google drive. Renseigner le code d'authentification lorsqu'il est demandé

Ajuster le nom de votre dossier : MyDrive/ens/bdle/dir. **Remplacer dir **

In [None]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ens/bdle/dir"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


[]

Installer pyspark et findspark :


In [None]:
!pip install -q pyspark
!pip install -q findspark

Démarrer la session spark

In [None]:
import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [None]:
# Principaux import
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf  

# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark 
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")    
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1635498300481


In [None]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

Nombre de partitions utilisées :  8


## Data loading

In [None]:
data = [["young","high","no","fair","no"],
        ["young","high","no","excellent","no"],
        ["middle","high","no","fair","yes"],
        ["senior","low","yes","fair","yes"],
        ["senior","low","yes","excellent","no"],
        ["middle","low","yes","excellent","yes"],
        ["young","medium","no","fair","no"],
         ["young","low","yes","fair","yes"],
         ["senior","medium","yes","fair","yes"],
         ["young","medium","yes","excellent","yes"],
         ["middle","medium","no","excellent","yes"],
         ["middle","high","yes","fair","yes"],
         ["senior","medium","no","excellent","no"]]
    

In [None]:
data_df = spark.createDataFrame(spark.sparkContext.parallelize(data),'age String, income String, student String, rating String, label String')
data_df.columns

['age', 'income', 'student', 'rating', 'label']

## Basic fonctions

In [None]:
df.show()

+------+------+-------+---------+-----+
|   age|income|student|   rating|label|
+------+------+-------+---------+-----+
| young|  high|     no|     fair|   no|
| young|  high|     no|excellent|   no|
|middle|  high|     no|     fair|  yes|
|senior|   low|    yes|     fair|  yes|
|senior|   low|    yes|excellent|   no|
|middle|   low|    yes|excellent|  yes|
| young|medium|     no|     fair|   no|
| young|   low|    yes|     fair|  yes|
|senior|medium|    yes|     fair|  yes|
| young|medium|    yes|excellent|  yes|
|middle|medium|     no|excellent|  yes|
|middle|  high|    yes|     fair|  yes|
|senior|medium|     no|excellent|   no|
+------+------+-------+---------+-----+



In [None]:
from pyspark.ml.feature import StringIndexer
'''
create new columns with index instead of string
'''
df = data_df
for field in df.columns:
  indexer = StringIndexer(inputCol=field,outputCol='indexed_'+field)
  df = indexer.fit(df).transform(df)

In [None]:
from pyspark.ml.feature import IndexToString
'''
retrieve the string from the index
'''
age_rev_indexer = IndexToString(inputCol="indexed_age",
outputCol='original_age')
df_age_retrieved =age_rev_indexer.transform(df)

In [None]:
from pyspark.ml.feature import OneHotEncoder
'''
compute the One hot encoding
'''
age_onehotenc = OneHotEncoder(inputCol='indexed_age', outputCol='cat_age')
age_onehotenc.setDropLast(False)
df_age_onehot = age_onehotenc.fit(df).transform(df)

In [None]:
from pyspark.ml.feature import VectorAssembler
'''
Concatenate column in one vec
'''
cols = ['indexed_age','indexed_income']
vec_assembler = VectorAssembler(inputCols= cols, outputCol= 'ageIncomeVec')
df_age_income_vec = vec_assembler. transform(df)

In [None]:
from pyspark.ml.feature import VectorIndexer
'''
reaname values of vector to their index
'''

vecIndexer = VectorIndexer(inputCol='ageIncomeVec', outputCol='indexed_ageIncomeVec', maxCategories=3)
df_age_income_vec_idx = vecIndexer.fit(df_age_income_vec).\
transform(df_age_income_vec) 
df_age_income_vec_idx.show()

+------+------+-------+---------+-----+-----------+--------------+---------------+--------------+-------------+------------+--------------------+
|   age|income|student|   rating|label|indexed_age|indexed_income|indexed_student|indexed_rating|indexed_label|ageIncomeVec|indexed_ageIncomeVec|
+------+------+-------+---------+-----+-----------+--------------+---------------+--------------+-------------+------------+--------------------+
| young|  high|     no|     fair|   no|        0.0|           1.0|            1.0|           0.0|          1.0|   [0.0,1.0]|           [0.0,1.0]|
| young|  high|     no|excellent|   no|        0.0|           1.0|            1.0|           1.0|          1.0|   [0.0,1.0]|           [0.0,1.0]|
|middle|  high|     no|     fair|  yes|        1.0|           1.0|            1.0|           0.0|          0.0|   [1.0,1.0]|           [1.0,1.0]|
|senior|   low|    yes|     fair|  yes|        2.0|           2.0|            0.0|           0.0|          0.0|   [2.0,2.0]|

## Pipeline

In [None]:
label = 'label'
features_col = data_df.columns
features_col.remove(label)
prefix = 'indexed_'


label_string_indexer = StringIndexer(inputCol=label, outputCol=prefix+label)
features_str_col = list(map(lambda c:prefix+c, features_col))
features_string_indexer = StringIndexer(inputCols=features_col,outputCols=features_str_col)

vec_assembler = VectorAssembler(inputCols= features_string_indexer.getOutputCols(),outputCol= 'vector')

vec_indexer = VectorIndexer(inputCol='vector',outputCol='features' ,maxCategories=3)

stages = [label_string_indexer,features_string_indexer,vec_assembler,vec_indexer] # stages of the pipeline

from pyspark.ml import Pipeline 

pipeline = Pipeline(stages = stages)
train_data = pipeline.fit(data_df).transform(data_df).select("features","indexed_label").withColumnRenamed("indexed_label", "label")
train_data.show()

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[0.0,1.0,1.0,0.0]|  1.0|
|[0.0,1.0,1.0,1.0]|  1.0|
|[1.0,1.0,1.0,0.0]|  0.0|
|[2.0,2.0,0.0,0.0]|  0.0|
|[2.0,2.0,0.0,1.0]|  1.0|
|[1.0,2.0,0.0,1.0]|  0.0|
|    (4,[2],[1.0])|  1.0|
|    (4,[1],[2.0])|  0.0|
|    (4,[0],[2.0])|  0.0|
|    (4,[3],[1.0])|  0.0|
|[1.0,0.0,1.0,1.0]|  0.0|
|[1.0,1.0,0.0,0.0]|  0.0|
|[2.0,0.0,1.0,1.0]|  1.0|
+-----------------+-----+



## Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassificationModel, DecisionTreeClassifier

dt = DecisionTreeClassifier()
dtModel = dt.fit(train_data)

In [None]:
print(dtModel.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_feb70edf1be3, depth=3, numNodes=9, numClasses=2, numFeatures=4
  If (feature 2 in {0.0})
   If (feature 0 in {0.0,1.0})
    Predict: 0.0
   Else (feature 0 not in {0.0,1.0})
    If (feature 3 in {0.0})
     Predict: 0.0
    Else (feature 3 not in {0.0})
     Predict: 1.0
  Else (feature 2 not in {0.0})
   If (feature 0 in {1.0})
    Predict: 0.0
   Else (feature 0 not in {1.0})
    Predict: 1.0



In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluatorPR = BinaryClassificationEvaluator()

dt_paramGrid = ParamGridBuilder().addGrid(dt.maxBins, [40,42]).addGrid(dt.minInstancesPerNode, [10,100]) .build()


In [None]:


#create k folds with k=5.
cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_paramGrid, evaluator=evaluatorPR, numFolds=5, parallelism=2)


cvModel = cv.fit(train_data)

bestModel = cvModel.bestModel

print(bestModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_feb70edf1be3, depth=3, numNodes=9, numClasses=2, numFeatures=4


##END