## Machine Learning avec SPARK

Prédire la note attribuée à un restaurant par un client

- Importation des librairies utiles

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf

#### Importation des données depuis le compte de stokage

In [5]:
#spark.conf.set(
# storage account
# password
#               )

In [6]:
datasets = {
  dataset: spark.read.load( 
    "wasbs://default@storagestudent.blob.core.windows.net/datasets/S8-5/Exo/restaurant-data-with-consumer-ratings/{0}.csv".format(dataset), 
    format="csv",
    header="true"
                           )
  for dataset in ["geoplaces2", "rating_final", "userprofile"]
            } 

In [7]:
rating = datasets['rating_final']
user = datasets['userprofile']
place = datasets['geoplaces2']

#### Nettoyage des données (Recodage du label, typage et jointures)

- Dataset "rating"
    - Recodage du label dans le dataset rating : transformer la variable rating pour qu'elle vale 1 lorsque rating vaut 2 et 0 le reste du temps
    - Vérification des types de données dans chaque colonne

In [10]:
display(rating.head(5))

userID,placeID,rating,food_rating,service_rating
U1077,135085,2,2,2
U1077,135038,2,2,1
U1077,132825,2,2,2
U1077,135060,1,2,2
U1068,135104,1,1,2


In [11]:
# Fonction pour recoder le label

def change(a):
  if a == '2':
    b = '1'
  else:
    b = '0'
  return b

udf_change = udf(change)

In [12]:
rating = rating.withColumn("rating_label", udf_change('rating'))
display(rating.head(5))

userID,placeID,rating,food_rating,service_rating,rating_label
U1077,135085,2,2,2,1
U1077,135038,2,2,1,1
U1077,132825,2,2,2,1
U1077,135060,1,2,2,0
U1068,135104,1,1,2,0


In [13]:
rating.dtypes

In [14]:
rating = rating.select(
  F.col('userID'),
  F.col('placeID').cast(IntegerType()),
  F.col('rating').cast(IntegerType()),
  F.col('food_rating').cast(IntegerType()),
  F.col('service_rating').cast(IntegerType()),
  F.col('rating_label').cast(IntegerType())
                       )

In [15]:
rating.dtypes

- Dataset "user"
    - Vérification des types de données dans chaque colonne

In [17]:
display(user.head(5))

userID,latitude,longitude,smoker,drink_level,dress_preference,ambience,transport,marital_status,hijos,birth_year,interest,personality,religion,activity,color,weight,budget,height
U1001,22.139997,-100.978803,False,abstemious,informal,family,on foot,single,independent,1989,variety,thrifty-protector,none,student,black,69,medium,1.77
U1002,22.150087,-100.983325,False,abstemious,informal,family,public,single,independent,1990,technology,hunter-ostentatious,Catholic,student,red,40,low,1.87
U1003,22.119847,-100.946527,False,social drinker,formal,family,public,single,independent,1989,none,hard-worker,Catholic,student,blue,60,low,1.69
U1004,18.867,-99.183,False,abstemious,informal,family,public,single,independent,1940,variety,hard-worker,none,professional,green,44,medium,1.53
U1005,22.183477,-100.959891,False,abstemious,no preference,family,public,single,independent,1992,none,thrifty-protector,Catholic,student,black,65,medium,1.69


In [18]:
user.dtypes

In [19]:
col_int = ['birth_year', 'weight']
col_float = ['latitude', 'longitude', 'height']

for col in col_int:
    user = user.withColumn(col, user[col].cast(IntegerType()))

for col in col_float:
    user = user.withColumn(col, user[col].cast(DoubleType()))

- Dataset "place"
    - Vérification des types de données dans chaque colonne

In [21]:
display(place.head(5))

placeID,latitude,longitude,the_geom_meter,name,address,city,state,country,fax,zip,alcohol,smoking_area,dress_code,accessibility,price,url,Rambience,franchise,area,other_services
134999,18.915421,-99.184871,0101000020957F000088568DE356715AC138C0A525FC464A41,Kiku Cuernavaca,Revolucion,Cuernavaca,Morelos,Mexico,?,?,No_Alcohol_Served,none,informal,no_accessibility,medium,kikucuernavaca.com.mx,familiar,f,closed,none
132825,22.1473922,-100.983092,0101000020957F00001AD016568C4858C1243261274BA54B41,puesto de tacos,esquina santos degollado y leon guzman,s.l.p.,s.l.p.,mexico,?,78280,No_Alcohol_Served,none,informal,completely,low,?,familiar,f,open,none
135106,22.1497088,-100.9760928,0101000020957F0000649D6F21634858C119AE9BF528A34B41,El Rinc�n de San Francisco,Universidad 169,San Luis Potosi,San Luis Potosi,Mexico,?,78000,Wine-Beer,only at bar,informal,partially,medium,?,familiar,f,open,none
132667,23.7526973,-99.1633594,0101000020957F00005D67BCDDED8157C1222A2DC8D84D4941,little pizza Emilio Portes Gil,calle emilio portes gil,victoria,tamaulipas,?,?,?,No_Alcohol_Served,none,informal,completely,low,?,familiar,t,closed,none
132613,23.7529035,-99.165076,0101000020957F00008EBA2D06DC8157C194E03B7B504E4941,carnitas_mata,lic. Emilio portes gil,victoria,Tamaulipas,Mexico,?,?,No_Alcohol_Served,permitted,informal,completely,medium,?,familiar,t,closed,none


In [22]:
place.dtypes

In [23]:
col_int = ['PlaceID', 'Zip']
col_float = ['latitude', 'longitude']

for col in col_int:
    place = place.withColumn(col, place[col].cast(IntegerType()))

for col in col_float:
    place = place.withColumn(col, place[col].cast(DoubleType()))

- joindre les datasets user et rating

In [25]:
user_columns_to_drop = ['latitude', 'longitude']
place_columns_to_drop = ['latitude', 'longitude', 'the_geom_meter']

user = user.drop(*user_columns_to_drop)
place = place.drop(*place_columns_to_drop)

In [26]:
user.count(), rating.count()

In [27]:
dataset = rating.join(user, 'userID', 'left').join(place, 'placeID', 'left').dropna()
display(dataset.head(5))

placeID,userID,rating,food_rating,service_rating,rating_label,smoker,drink_level,dress_preference,ambience,transport,marital_status,hijos,birth_year,interest,personality,religion,activity,color,weight,budget,height,name,address,city,state,country,fax,Zip,alcohol,smoking_area,dress_code,accessibility,price,url,Rambience,franchise,area,other_services
135085,U1077,2,2,2,1,False,social drinker,elegant,family,public,married,kids,1987,technology,thrifty-protector,Catholic,student,blue,65,medium,1.71,Tortas Locas Hipocampo,Venustiano Carranza 719 Centro,San Luis Potosi,SLP,Mexico,?,78000,No_Alcohol_Served,not permitted,informal,no_accessibility,medium,?,familiar,f,closed,none
135038,U1077,2,2,1,1,False,social drinker,elegant,family,public,married,kids,1987,technology,thrifty-protector,Catholic,student,blue,65,medium,1.71,Restaurant la Chalita,Guajardo Sn San Luis Potosi Centro,San Luis Potosi,SLP,Mexico,?,78000,No_Alcohol_Served,section,informal,no_accessibility,medium,?,familiar,f,closed,none
132825,U1077,2,2,2,1,False,social drinker,elegant,family,public,married,kids,1987,technology,thrifty-protector,Catholic,student,blue,65,medium,1.71,puesto de tacos,esquina santos degollado y leon guzman,s.l.p.,s.l.p.,mexico,?,78280,No_Alcohol_Served,none,informal,completely,low,?,familiar,f,open,none
135060,U1077,1,2,2,0,False,social drinker,elegant,family,public,married,kids,1987,technology,thrifty-protector,Catholic,student,blue,65,medium,1.71,Restaurante Marisco Sam,Ignacio Allende 785 Centro,San Luis Potosi,SLP,Mexico,?,78310,No_Alcohol_Served,none,informal,no_accessibility,medium,?,familiar,f,closed,none
132732,U1068,0,0,0,0,False,casual drinker,informal,friends,public,single,independent,1988,technology,thrifty-protector,Catholic,student,blue,72,low,1.57,Taqueria EL amigo,Calle Mezquite Fracc Framboyanes,Cd Victoria,Tamaulipas,Mexico,?,87018,No_Alcohol_Served,none,casual,completely,low,?,familiar,f,open,none


#### Data Preprocessing

In [29]:
from pyspark.ml.feature import StringIndexer, IndexToString

label_indexer = StringIndexer(inputCol="rating_label", outputCol="label")
string_columns = ["drink_level", "smoker", "dress_preference", "transport", "marital_status", "hijos", "interest", "personality", "religion", "activity", "color", "budget", "smoking_area", "dress_code", "accessibility", "price", "area"]
string_indexers = [StringIndexer(inputCol=col, outputCol=col+"Index") for col in string_columns]

from pyspark.ml.feature import OneHotEncoderEstimator

one_hot_encoder = OneHotEncoderEstimator(
  inputCols=[col+"Index" for col in string_columns],
  outputCols=[col+"Vec" for col in string_columns]
)

from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(
    inputCols=[col+"Vec" for col in string_columns] + ["birth_year", "weight", "height"],
    outputCol="features"
)

from pyspark.ml import Pipeline

preprocessing_pipeline = Pipeline(
  stages= [label_indexer, *string_indexers, one_hot_encoder, vector_assembler]
)

In [30]:
preprocessing_pipeline_fitted = preprocessing_pipeline.fit(dataset)
dataset_preprocessed = preprocessing_pipeline_fitted.transform(dataset)

In [31]:
display(dataset_preprocessed.select("label", "features").head(5))

label,features
1.0,"List(0, 54, List(2, 8, 12, 15, 19, 22, 24, 28, 32, 39, 44, 45, 46, 48, 50, 51, 52, 53), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1987.0, 65.0, 1.71))"
1.0,"List(0, 54, List(2, 8, 12, 15, 19, 22, 24, 28, 32, 39, 43, 45, 46, 48, 50, 51, 52, 53), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1987.0, 65.0, 1.71))"
1.0,"List(0, 54, List(2, 8, 12, 15, 19, 22, 24, 28, 32, 39, 42, 45, 47, 51, 52, 53), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1987.0, 65.0, 1.71))"
0.0,"List(0, 54, List(2, 8, 12, 15, 19, 22, 24, 28, 32, 39, 42, 45, 46, 48, 50, 51, 52, 53), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1987.0, 65.0, 1.71))"
0.0,"List(0, 54, List(0, 2, 4, 8, 11, 14, 19, 22, 24, 28, 32, 40, 42, 47, 51, 52, 53), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1988.0, 72.0, 1.57))"


#### Construction des modèles de prédiction

In [33]:
train_set, test_set = dataset_preprocessed.randomSplit([0.8, 0.2], seed=100)

- Entraînez un arbre de décision et une forêt d'arbres aléatoire
- Evaluez ensuite les performances de chacun de ces modèles
- Faire une grid search dans chaque cas afin de déterminer des valeurs optimales pour les hyper paramètres.

##### Entrainement d'un arbre de décision

In [36]:
from pyspark.ml.classification import DecisionTreeClassifier
dt_model = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [37]:
dt_model_fitted = dt_model.fit(train_set)
predictions_train = dt_model_fitted.transform(train_set)
predictions_test = dt_model_fitted.transform(test_set)

In [38]:
display(predictions_test.select("label", "prediction", "probability").head(5))

label,prediction,probability
0.0,0.0,"List(1, 2, List(), List(0.5471698113207547, 0.4528301886792453))"
0.0,0.0,"List(1, 2, List(), List(0.9090909090909091, 0.09090909090909091))"
0.0,0.0,"List(1, 2, List(), List(0.8425925925925926, 0.1574074074074074))"
0.0,0.0,"List(1, 2, List(), List(0.7083333333333334, 0.2916666666666667))"
1.0,1.0,"List(1, 2, List(), List(0.31645569620253167, 0.6835443037974683))"


- Evaluation des performances de l'arbre de décision

In [40]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

evaluation_train = evaluator_auc.evaluate(predictions_train)
evaluation_test = evaluator_auc.evaluate(predictions_test)

print(
  "AUC train : {0} - AUC test : {1}"
  .format(
    evaluator_auc.evaluate(predictions_train), 
    evaluator_auc.evaluate(predictions_test)
  )
)

In [41]:
from pyspark.sql.types import *
from pyspark.sql import functions as F

cf_count = predictions_test.select(F.col("prediction"), F.col("label").cast(IntegerType()))\
                           .groupBy("prediction")\
                           .pivot("label")\
                           .count() 

display(cf_count)

prediction,0,1
0.0,59,16
1.0,16,36


- Résultats du modèle d'arbre de décision:
    - Précision : 69%
    - Rappel : 69%
    - Accuracy : 74%
    - F1-Score : 69%

- Gridsearch et cross validation pour améliorer le modèle d'arbre de décision

In [44]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(dt_model.maxDepth, [4, 6, 8, 10, 12]).addGrid(dt_model.impurity, ["entropy", "gini"]).build()
 
cv_dt_model = CrossValidator(estimator=dt_model, estimatorParamMaps=paramGrid, evaluator=evaluator_auc, numFolds=5)
cv_dt_model_fitted = cv_dt_model.fit(train_set)

In [45]:
bestModel = cv_dt_model_fitted .bestModel

bestModelParameters = {
  key.name: value
  for key, value in bestModel.extractParamMap().items()
  if key.name in ["maxDepth", "impurity"]
}

print(bestModelParameters)

In [46]:
predictions_train_after_cv = cv_dt_model_fitted .transform(train_set)
predictions_test_after_cv = cv_dt_model_fitted .transform(test_set)

print(
  "AUC test : {0} - AUC test : {1}".format(
    evaluator_auc.evaluate(predictions_train_after_cv), 
    evaluator_auc.evaluate(predictions_test_after_cv)
  )
)

In [47]:
cf_count = predictions_test_after_cv.select(F.col("prediction"), F.col("label").cast(IntegerType()))\
                           .groupBy("prediction")\
                           .pivot("label")\
                           .count() 

display(cf_count)

prediction,0,1
0.0,52,27
1.0,23,25


- Résultats du modèle d'arbre de décision après la GridSearch
    - Précision : 52%
    - Rappel : 48%
    - Accuracy : 50%
    - F1-score : 51%

##### Entrainement d'un modèle de forêt d'arbres alétoires

In [50]:
from pyspark.ml.classification import RandomForestClassifier
rf_model = RandomForestClassifier(labelCol="label", featuresCol="features")

In [51]:
rf_model_fitted = rf_model.fit(train_set)
predictions_train = rf_model_fitted.transform(train_set)
predictions_test = rf_model_fitted.transform(test_set)

In [52]:
display(predictions_test.select("label", "prediction", "probability").head(5))

label,prediction,probability
0.0,0.0,"List(1, 2, List(), List(0.68075426208091, 0.3192457379190899))"
0.0,0.0,"List(1, 2, List(), List(0.7387417600927898, 0.2612582399072102))"
0.0,0.0,"List(1, 2, List(), List(0.6526659397515636, 0.34733406024843627))"
0.0,0.0,"List(1, 2, List(), List(0.6420908527645731, 0.35790914723542694))"
1.0,0.0,"List(1, 2, List(), List(0.5451490675851518, 0.45485093241484825))"


In [53]:
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

evaluation_train = evaluator_auc.evaluate(predictions_train)
evaluation_test = evaluator_auc.evaluate(predictions_test)

print(
  "AUC train : {0} - AUC test : {1}"
  .format(
    evaluator_auc.evaluate(predictions_train), 
    evaluator_auc.evaluate(predictions_test)
  )
)

In [54]:
cf_count = predictions_test.select(F.col("prediction"), F.col("label").cast(IntegerType()))\
                           .groupBy("prediction")\
                           .pivot("label")\
                           .count() 

display(cf_count)

prediction,0,1
0.0,56,26
1.0,19,26


- Résultats du modèle de forêt d'arbres alétoire
    - Précision : 57%
    - Rappel : 50%
    - Accuracy : 65%
    - F1-score : 53%

- Gridsearch et cross validation pour améliorer le modèle

In [57]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(rf_model.numTrees, [5, 10, 20, 30])\
                              .addGrid(rf_model.maxDepth, [4, 6, 8, 10, 12])\
                              .addGrid(rf_model.impurity, ["entropy", "gini"])\
                              .build()
 
cv_rf_model = CrossValidator(estimator=rf_model, estimatorParamMaps=paramGrid, evaluator=evaluator_auc, numFolds=5)
cv_rf_model_fitted = cv_rf_model.fit(train_set)

In [58]:
bestModel = cv_rf_model_fitted.bestModel

bestModelParameters = {
  key.name: value
  for key, value in bestModel.extractParamMap().items()
  if key.name in ["numTrees", "maxDepth", "impurity"]
}

print(bestModelParameters)

In [59]:
predictions_train_after_cv = cv_rf_model_fitted.transform(train_set)
predictions_test_after_cv = cv_rf_model_fitted.transform(test_set)

print(
  "AUC train : {0} - AUC test : {1}".format(
    evaluator_auc.evaluate(predictions_train_after_cv), 
    evaluator_auc.evaluate(predictions_test_after_cv)
  )
)

In [60]:
cf_count = predictions_test.select(F.col("prediction"), F.col("label").cast(IntegerType()))\
                           .groupBy("prediction")\
                           .pivot("label")\
                           .count() 

display(cf_count)

prediction,0,1
0.0,56,26
1.0,19,26


- Résultats du modèle de forêt d'arbres alétoire après la GridSearch
    - Précision : 58%
    - Rappel : 50%
    - Accuracy : 65%
    - F1-score : 54%

In [62]:
bestModel.featureImportances

In [63]:
from itertools import chain
import pandas as pd

pd.DataFrame(sorted([
  (bestModel.featureImportances[attr["idx"]], attr["name"]) 
  for attr in chain(*predictions_test_after_cv.schema["features"].metadata["ml_attr"]["attrs"].values())
                     ], reverse=True), columns=["Coeff", "Features"])

Unnamed: 0,Coeff,Features
0,0.085938,height
1,0.073859,birth_year
2,0.069878,weight
3,0.062324,drink_levelVec_casual drinker
4,0.035473,interestVec_variety
5,0.029115,religionVec_Catholic
6,0.029041,transportVec_on foot
7,0.028493,smokerVec_false
8,0.028045,budgetVec_low
9,0.025597,colorVec_white
