# Spark

## I. Processiong de données et RDD

#### I.1. Définiton d'un SparkContext en local

- sc = SparkContext => pour RDD utilisé pour données non-structurées (par ex texte)
- spark = SparkSession etc... => pour DataFrame utilisé pour données structurées (par ex models de
ML).

       DF utilise de façon sous-jacente les bases d'un RDD (= SparkContext + SparkSession en sur-couche)

In [6]:
# Import de SparkContext du module pyspark
from pyspark import SparkContext
# Définiton d'un SparkContext en local
sc = SparkContext('local')
sc

#### I.2. Chargement du fichier "miserables_full.txt" et affichage des 10 premières lignes

In [None]:
# Chargement du fichier "miserables_full.txt" et affichage des 10 premières lignes
miserables = sc.textFile("miserables_full.txt")
miserables.take(10)

#### I.3. succession de méthodes

In [None]:
# Création d'une liste à partir du fichier texte
mots_sorted_3 = sc.textFile("miserables_full.txt") \
                  .map(lambda x : x.lower().replace(',', ' ').replace('.', ' ').replace('-', ' ').replace('’', ' ')) \
                  .flatMap(lambda line: line.split(" ")) \
                  .map(lambda x : (x,1)) \
                  .reduceByKey(lambda x,y : x + y) \
                  .sortBy(lambda couple: couple[1], ascending = False) \
                  .collect()
                
mots_sorted_3

## II. Spark SQL

In [None]:
# Import de Spark Session et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext
SparkContext.getOrCreate() 

# Définition d'une SparkSession
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Introduction au DataFrame") \
    .getOrCreate()
    
spark

In [None]:
# Création d'un raccourci vers le SparkContext déjà créé
sc = SparkContext.getOrCreate()
sc

### Créer un DataFrame Spark

La structure RDD n'est pas optimisée pour effectuer des tâches par colonne ou du Machine Learning. La structure DataFrame a été créé pour répondre à ce besoin. Elle utilise de façon sous-jacente les bases d'un RDD mais a été structurée en colonnes autant qu'en lignes dans une structure SQL et une forme inspirée des DataFrame du module pandas.

La structure DataFrame possède deux grands avantages. Tout d'abord cette structure est similaire au DataFrame pandas et est donc facile à prendre en main. Elle est également performante : un DataFrame en PySpark est aussi rapide qu'un DataFrame en Scala et est la structure distribuée la plus optimisée en Machine Learning. Grâce à la structure DataFrame, nous pouvons donc faire des calculs performants à travers un langage familier, en évitant le coût d'entrée d'apprentissage d'un nouveau langage fonctionnel : Scala.

Dans cet exercice, vous apprendrez à manipuler un DataFrame PySpark pour explorer les données.

#### II.1. à partir d'un RDD

In [None]:
# Import de Row du package pyspark.sql
from pyspark.sql import Row

# Chargement du fichier '2008_raw.csv'
rdd = sc.textFile('2008_raw.csv').map(lambda line: line.split(","))

# Création d'un nouveau RDD en sélectionnant les variables explicatives
rdd_row = rdd.map(lambda line: Row(annee = line[0],
                                   mois = line[1],
                                   jours = line[2],
                                   flightNum = line[5]))

# Créer d'un data frame à partir d'un rdd
df = spark.createDataFrame(rdd_row)

# Affichage des 5 premières lignes
df.show(5)

#### II.2. à partir d'un fichier CSV (méthode plus commune)

In [None]:
# Lecture du fichier '2008.csv'
raw_df = spark.read.csv('2008.csv', header=True)

# Affichage du schéma des variables
raw_df.printSchema()

#### II.3. Explorer et manipuler un DataFrame

In [None]:
# Création d'un data frame ne contenant que les variables explicatives
flights1 = raw_df.select('annee', 'mois', 'jours', 'flightNum', 'origin', 'dest', 'distance', 'canceled', 'cancellationCode', 'carrierDelay')

# Affichage de 20 premières lignes
flights1.show() # 'show' affiche 20 lignes par défaut

In [None]:
# Création d'un DataFrame en spécifiant le type des colonnes
flights = raw_df.select(raw_df.annee.cast("int"),
                        raw_df.mois.cast("int"),
                        raw_df.jours.cast("int"),
                        raw_df.flightNum.cast("int"),
                        raw_df.origin.cast("string"),
                        raw_df.dest.cast("string"),
                        raw_df.distance.cast("int"),
                        raw_df.canceled.cast("boolean"),
                        raw_df.cancellationCode.cast("string"),
                        raw_df.carrierDelay.cast("int"))

# Affichage de 20 premières lignes
flights.show()

In [None]:
##select disctinct et count
# Calcul du nombre de vols ayant des numéros de vol distincts
flights.select('flightNum').distinct().count()

In [None]:
##methode describe (2 méthodes)

### Première méthode
# Affichage d'un résumé en utilisant l'option truncate de la méthode show
flights.describe().show(truncate = 8)

### Deuxième méthode
# Affichage d'un résumé en utilisant la méthode toPandas
flights.describe().toPandas()

###La méthode toPandas est à utiliser avec des DataFrames de petite taille 
####tels que des résumés d'informations, sinon elle peut affecter la distribution des données.

In [None]:
##groupBy (1)
# Affichage du résumé de la variable catégorielle 'cancellationCode'
flights.groupBy('cancellationCode').count().show()

##groupBy (2)
# Affichage du résumé de la variable catégorielle 'cancellationCode' et 'canceled'
flights.groupBy('cancellationCode', 'canceled').count().show()

##filter (1)
# Affichage des 20 premièrs vols annulés pour la raison "C"
flights.filter(flights.cancellationCode == 'C').show()

##filter (2)
# Calcul du nombre vols annulés par mois
flights.filter(flights.canceled == True).groupBy('mois').count().show()


#### II.4. Création et aggrégation de variables

In [9]:
# Création d'une nouvelle variable 'isLongFlight' et affichage des 10 premières lignes
flights.withColumn('isLongFlight', flights.distance > 1000 ).show(10)

L'enregistrement de la nouvelle colonne ne s'effectue nulle part. A cause du caractère immuable, aucune modification ne se fait par remplacement (in place). Pour enregistrer une nouvelle variable, il faut créer un nouvel objet ou de la créer dès la création du DataFrame.

#### II.5. Gestion des valeurs manquantes

In [None]:
## fillna
# Remplacement des valeurs manquantes par des 0 et affichage des 6 premières lignes
flights = flights.fillna(0, 'carrierDelay')
flights.show(6)

###replace
# Remplacement des codes d'annulation : df.replace(oldValue, newValue, 'columnName')
flights = flights.replace(['A','B','C'],['1','2','3'],'cancellationCode')
flights.show()

###orderby
# Ordonner le data frame par numéro de vol décroissant
flights = flights.orderBy(flights.flightNum.desc())
flights.show()


#### II.6. Requêtes SQL

- (a) Créer une vue SQL de flights que l'on appellera "flightsView".
- (b) Créer un DataFrame appelé sqlDF contenant uniquement la variable carrierDelay grâce à une requête SQL.
- (c) Afficher les premières lignes de sqlDF.

In [None]:
# Création d'une vue SQL
flights.createOrReplaceTempView("flightsView")

# Création d'un DataFrame ne contenant que la variable "flightsView"
sqlDF = spark.sql("SELECT carrierDelay FROM flightsView")

# Affichage des 10 premières lignes
sqlDF.show(10)

#### II.7. Sample & astuces d'affichage

In [None]:
# Affichage d'un dizaine de lignes de la base de données
flights.sample(False, .0001, seed = 222).toPandas()

In [None]:
# Fermeture de la session Spark
spark.stop()

## III. Régression avec PySpark

In [None]:
# Import de SparkSession et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext en local
sc = SparkContext.getOrCreate()

# Construction d'une session Spark
spark = SparkSession \
    .builder \
    .appName("Introduction à Spark ML") \
    .getOrCreate()
    
spark

#### III.1. Importation de la base de données

- (a) Charger le fichier YearPredictionMSD.txt dans un DataFrame nommé df_raw.
- (b) Afficher un extrait de la base de données avec une méthode de votre choix.

In [None]:
# Chargement du fichier " YearPredictionMSD.txt" dans un DataFrame
df_raw = spark.read.csv('YearPredictionMSD.txt')

# Première méthode d'affichage 
df_raw.show(2, truncate = 4)
# Modifier les valeurs de 'truncate' ne permet pas de bien visualiser les données
# à cause du nombre de variables

# Deuxième méthode d'affichage
df_raw.sample(False, .00001, seed = 222).toPandas()
# Utiliser toPandas permet de mieux visualiser les données

- (c) Importer la fonction col du sous-module pyspark.sql.functions.
- (d) Créer un DataFrame df à partir de df_raw en changeant les types des colonnes relatives au timbre en double et l'année en int.
- (e) Afficher le schéma des variables du df.

In [None]:
# Importation de col du sous-module pyspark.sql.functions
from pyspark.sql.functions import col

# Convertir des colonnes relatives au timbre en double et l'année en int
exprs = [col(c).cast("double") for c in df_raw.columns[1:91]]
df = df_raw.select(df_raw._c0.cast('int'), *exprs)

# Affichage du schéma des variables "df"
df.printSchema()

(g) Afficher un résumé descriptif de la base de données df.###### 

In [None]:
# Affichage d'un résumé descriptif des données
df.describe().toPandas()

#### III.2. Mise en forme de la base en format svmlib

Pour pouvoir être utilisée par les algorithmes de Machine Learning de Spark ML, la base de données doit être un DataFrame contenant 2 colonnes :

    La colonne label contenant la variable à prédire (label en anglais).
    La colonne features contenant les variables explicatives (features en anglais).
La fonction DenseVector() issue du package pyspark.ml.linalg permet de regrouper plusieurs variables en une seule variable.

- (a) Importer la fonction DenseVector du package pyspark.ml.linalg.
- (b) Créer un rdd rdd_ml séparant la variable à expliquer des features (à mettre sous forme DenseVector).
- (c) Créer un DataFrame df_ml contenant notre base de données sous deux variables : 'labels' et 'features'.
- (d) Afficher un extrait de df_ml.

In [None]:
# Import de DenseVector du package pyspark.ml.linalg
from pyspark.ml.linalg import DenseVector

# Création d'un rdd en séparant la variable à expliquer des features
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Création d'un DataFrame composé de deux variables : label et features
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])

# Affichage des 10 premières lignes du DataFrame
df_ml.show(10)

In [None]:
# Décomposition des données en deux ensembles d'entraînement et de test
# Par défaut l'échantillon est aléatoirement réparti
train, test = df_ml.randomSplit([.8, .2], seed= 1234)

#### III.3. Regression linéaire

- (a) Importer la fonction LinearRegression du sous-module pyspark.ml.regression.
- (b) Créer lr, une fonction de régression linéaire distribuée pour l'appliquer à l'ensemble train.
- (c) Créer linearModel, le modèle issu de lr appliqué à train.

In [None]:
# Import de LinearRegression du package pyspark.ml.regression
from pyspark.ml.regression import LinearRegression

# Création d'une fonction de régression linéaire
lr = LinearRegression(labelCol='label', featuresCol= 'features')

# Apprentissage sur les données d'entraînement 
linearModel = lr.fit(train)

In [None]:
# Calcul des prédictions sur les données test
predicted = linearModel.transform(test)

# Affichage des prédictions
predicted.show()

#### III.4. Evaluation du modèle

In [None]:
# Calcul et affichage du RMSE
print("RMSE:", linearModel.summary.rootMeanSquaredError)

# Calcul et affichage du R2
print("R2:  ", linearModel.summary.r2)

- Afficher les coefficients coefficients du modèle. La fonction pprint du module pprint permet d'avoir un affichage plus élégant des données.
- Fermer la session spark en utilisant la méthode stop.

In [None]:
from pprint import pprint

# Affichage des Coefficients du modèle linéaire
pprint(linearModel.coefficients)

# Fermeture de la session Spark 
spark.stop()

##### Aller plus loin - Autres algorithmes de régression

Maintenant que vous avez appris à programmer une régression linéaire en utilisant Spark ML, vous n'êtes qu'à quelques pas de maîtriser tout algorithme de régression distribué sous Spark. Pour vous aider à retenir l'essentiel, en voici un aperçu :
- 1. Transformer la base de données en format svmlib :
      - Sélectionner les variables numériques à utiliser pour la régression.
      - Placer la variable à expliquer en première position.
      - Mapper un couple (label, vecteur de features) dans un RDD.
      - Convertir ce RDD en DataFrame et nommer les variables 'label' et 'features'.
- 2. Séparer la base de données en deux échantillons train et test.
- 3. Appliquer un modèle de classification.
- 4. Evaluer le modèle.


   Spark est en constante amélioration et possède aujourd'hui quelques régresseurs notables. Ils sont utilisables de la même façon en important ces fonction depuis pyspark.ml.regression. Vous êtes invité à consulter la documentation pour observer les différents paramètres à prendre en compte pour optimiser ces algorithmes :
- LinearRegression() pour effectuer une régression linéaire lorsque le label est présupposé suivre une loi normale.
- GeneralizedLinearRegression() pour effectuer une régression linéaire généralisée lorsque le label est présupposé suivre une autre loi que l'on spécifie dans le paramètre family (gaussian, binomial, poisson, gamma).
- AFTSurvivalRegression() pour effectuer une analyse de survie.

Il est également possible d'utiliser les algorithmes, qui gèrent également les variables catégorielles, détaillés dans l'exercice suivant :

- DecisionTreeRegressor() pour un arbre de décision.
- RandomForestRegressor() pour une forêt aléatoire d'arbres de décision.
- GBTRegressor() pour une forêt d'arbres gradient-boosted.

## IV. Utilisation des ML Pipelines

In [None]:
# Import de SparkSession et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext en local
sc = SparkContext.getOrCreate()

# Construction d'une Session Spark
spark = SparkSession \
    .builder \
    .appName("Pipelines Spark ML") \
    .getOrCreate()
    
spark

#### IV.1. Les variables catégorielles

In [None]:
# Chargement du fichier 'HR_comma_sep.csv'
hr = spark.read.csv('HR_comma_sep.csv', header = True)

# Affichage d'un extrait du DataFrame
hr.sample(False, 0.001, seed=222).toPandas()

    La variable à prédire est la variable left. Elle indique si l'employé a quitté la boîte volontairement ou non.

- (c) Réordonner les variables de façon à placer la variable left en première colonne.
- (d) Afficher un résumé des variables.

In [None]:
# Ordonner les variables pour avoir le label en première colonne
hr = hr.select( 'left',
               'satisfaction_level',
               'last_evaluation',
               'number_project',
               'average_montly_hours',
               'time_spend_company',
               'Work_accident',
               'promotion_last_5years',
               'sales',
               'salary')

# Affichage d'une description des variables
hr.describe().toPandas()

ATTENTION !!!!


La transformation directe de la base de données en svmlib, comme vu dans l'exercice précédent, génère l'erreur suivante :  ValueError: could not convert string to float: 'sales'
   
   
En effet, la fonction DenseVector ne gère pas les strings, il faut donc indexer les variables non numériques.

La fonction StringIndexer du package pyspark.ml.feature permet d'indexer les variables selon la fréquence de leurs modalités: la modalité la plus fréquente aura pour indice 0.0, la modalité suivante 1.0, etc. 

Pour cela, la fonction prend en entrée une variable et en créé une variable indexée.

    La fonction StringIndexer est comme un estimateur, de la même façon qu'une régression ou un arbre de décision. Elle s'utilise donc en deux étapes :

    - Créer un indexeur en spécifiant les colonnes d'entrée et de sortie (paramètres inputCol et outputCol) et chercher des modalités dans la base de données grâce à la méthode fit.
    - Appliquer l'indexeur à la base de données par le biais de la méthode transform.

- (e) Importer la fonction StringIndexer depuis le package pyspark.ml.feature.
- (f) Créer un indexeur SalesIndexer transformant une variable sales en une variable indexedSales.
- (g) Indexer la variable sales de hr dans un nouveau DataFrame nommé hrSalesIndexed.
- (h) Afficher un extrait de hrSalesIndexed.

    La fonction StringIndexer du package pyspark.ml.feature permet d'indexer les variables selon la fréquence de leurs modalités: 
la modalité la plus fréquente aura pour indice 0.0, la modalité suivante 1.0, etc. Pour cela, la fonction prend en entrée une variable et en créé une variable indexée.

La fonction StringIndexer est comme un estimateur, de la même façon qu'une régression ou un arbre de décision. Elle s'utilise donc en deux étapes :

- Créer un indexeur en spécifiant les colonnes d'entrée et de sortie (paramètres inputCol et outputCol) et chercher des modalités dans la base de données grâce à la méthode fit.
- Appliquer l'indexeur à la base de données par le biais de la méthode transform.

In [None]:
# Import de StringIndexer du package pyspark.ml.feature
from pyspark.ml.feature import StringIndexer

# Création d'un indexeur transformant une variable sales en indexedSales
salesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales').fit(hr)

# Création d'un DataFrame hrSalesIndexed indexant la variable sales
hrSalesIndexed = salesIndexer.transform(hr)

# Affichage d'un extrait du DataFrame hrSalesIndexed 
hrSalesIndexed.sample(False, 0.001, seed = 222).toPandas()

- (i) Importer IndexToString depuis le package pyspark.ml.feature.
- (j) Créer une variable salesReconstructed à partir de indexedSales.
- (k) Appliquer ce transformateur à hrSalesIndexed en créant une nouvelle table hrSalesReconstructed.
- (l) Afficher un extrait de cette table

In [None]:
# Insérez votre code ici
# Import de IndexToString du package pyspark.ml.feature
from pyspark.ml.feature import IndexToString

# Création d'une nouvelle colonne salesReconstructed
SalesReconstructor = IndexToString(inputCol='indexedSales',
                                   outputCol='salesReconstructed',
                                   labels = salesIndexer.labels)

# Appliquer le transformateur SalesReconstructor
hrSalesReconstructed = SalesReconstructor.transform(hrSalesIndexed)

# Affichage d'un extrait de la base de données
hrSalesReconstructed.sample(False, 0.001 , seed = 222).toPandas()

### On voit apparaître une nouvelle colonne 'salesReconstructed' égale à la colonne 'sales'

## IV.2. Les pipelines

- (a) Importer la fonction Pipeline depuis le package pyspark.ml.
- (b) Créer un indexeur SalesIndexer transformant une variable sales en indexedSales.
- (c) Créer un indexeur SalaryIndexer transformant une variable salary en indexedSalary.
- (d) Créer une Pipeline indexer qui applique les deux transformations.
- (e) Indexer les variables de hr dans un nouveau DataFrame nommé hrIndexed.
- (f) Afficher un extrait de hrIndexed.

In [None]:
# Import de Pipeline du package pyspark.ml
from pyspark.ml import Pipeline

# Création des indexeurs
SalesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales')
SalaryIndexer = StringIndexer(inputCol='salary', outputCol='indexedSalary')

# Création d'une Pipeline
indexer = Pipeline(stages =  [SalaryIndexer, SalesIndexer])

# Indexer les variables de hr
hrIndexed = indexer.fit(hr).transform(hr)

# Affichage d'un extrait de hrIndexed
hrIndexed.sample(False, 0.001 , seed = 222).toPandas()

#### IV.3. Mise en forme de la base en format svmlib

- (a) Créer une base de données hrNumeric excluant les anciennes variables non indexées.
- (b) Créer hrLibsvm, une base de données au format svmlib à partir de hrNumeric.
- (c) Afficher un extrait de la nouvelle base de données.

In [None]:
# Import de DenseVector du package pyspark.ml.linalg
from pyspark.ml.linalg import DenseVector

# Création d'une base de données excluant les variables non indexées
hrNumeric = hrIndexed.select('left',
                             'satisfaction_level',
                             'last_evaluation',
                             'number_project',
                             'average_montly_hours',
                             'time_spend_company',
                             'Work_accident',
                             'promotion_last_5years',
                             'indexedSales',
                             'indexedSalary')

# Création d'une variable DenseVector contenant les features en passant par la structure RDD
hrRdd = hrNumeric.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Transformation en DataFrame et nommage des variables pour obtenir une base au format svmlib
hrLibsvm = spark.createDataFrame(hrRdd, ['label', 'features'])

# Affichage d'un extrait de hrLibsvm
hrLibsvm.sample(False, .001, seed = 222).toPandas()

#### IV.4. Application d'un classifieur Spark ML

La base de données d'apprentissage actuelle est au format svmlib. Il ne reste qu'à spécifier que:

- La variable label est catégorielle.
- Certaines features sont catégorielles et d'autres sont continues.


Pour cela, deux transformateurs seront créés et intégrés dans une Pipeline générale :


    -# Pour pouvoir classifier sur notre label,
    -# on crée une variable 'indexedLabel' comme vu précédemment.

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(hr_ml)

    -# Pour les features, on utilise la fonction 'VectorIndexer'
    -# qui définie un seuil de nombre de modalités:
    -# ici, si une variable a plus de 5 modalités, alors elle sera considérée comme continue

featureIndexer = VectorIndexer(inputCol="features",
                               outputCol="indexedFeatures",
                               maxCategories = 5).fit(hr_ml)
                               
                               
La fonction VectorIndexer a été développée de façon à n'indexer que les variables qui ont moins d'un certain nombre de modalités. Ce nombre est spécifié par l'argument maxCategories. Il faut donc déterminer ce paramètre en regardant le nombre maximal de modalités des variables catégorielles.

In [None]:
# Import de VectorIndexer du package pyspark.ml.feature
from pyspark.ml.feature import VectorIndexer

# Création d'un transformateur indexant les features
featureIndexer = VectorIndexer(inputCol="features",
                               outputCol="indexedFeatures",
                               maxCategories = 10).fit(hrLibsvm)

##### RandomForest

In [None]:
# Import du classifieur RandomForestClassifier du package pyspark.ml.classification
from pyspark.ml.classification import RandomForestClassifier

# Création des transformateurs
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(hrLibsvm)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories = 10).fit(hrLibsvm)

# Création d'un classifieur 
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", predictionCol='prediction', seed = 222)

# Création d'un transformateur permettant de rétablir les labels des prédictions
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Création d'une Pipeline 
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Décomposition des données en deux ensembles: données d'entraînement et de test
(train, test) = hrLibsvm.randomSplit([0.7, 0.3], seed = 222)

# Apprentissage du modèle en utilisant les données d'entraînement
model = pipeline.fit(train)

In [None]:
# Import du classifieur RandomForestClassifier du package pyspark.ml.classification
from pyspark.ml.classification import RandomForestClassifier

# Création des transformateurs
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(hrLibsvm)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories = 10).fit(hrLibsvm)

# Création d'un classifieur 
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", predictionCol='prediction', seed = 222)

# Création d'un transformateur permettant de rétablir les labels des prédictions
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Création d'une Pipeline 
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Décomposition des données en deux ensembles: données d'entraînement et de test
(train, test) = hrLibsvm.randomSplit([0.7, 0.3], seed = 222)

# Apprentissage du modèle en utilisant les données d'entraînement
model = pipeline.fit(train)

#### IV.5. Evaluation du modele

Une fois le modèle d'apprentissage automatique construit, il est important de vérifier la fiabilité des prédictions, à la fois pour le comparer à d'autres modèles de classification mais également pour optimiser ses paramètres.

Pour cela, il existe un sous-module pyspark.ml.evaluation contenant toutes les métriques d'évaluation. En particulier, vous y trouverez la fonction MulticlassClassificationEvaluator permettant d'évaluer des modèles de classification.

Cette fonction prend 3 arguments principaux :

metricName : métrique à utiliser, typiquement : 'accuracy'.
labelCol : nom de la colonne à prédire.
predictionCol : nom de la colonne de prédictions.
L'évaluateur créé possède une méthode evaluate permettant de l'appliquer à un échantillon.

- (a) Importer la fonction MulticlassClassificationEvaluator.
- (b) Créer evaluator l'évaluateur d'accuracy du modèle.
- (c) Calculer et afficher la précision accuracy de la prédiction sur l'échantillon test.

In [None]:
# Import d'un évaluateur MulticlassClassificationEvaluator du package pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Création d'un évaluateur 
evaluator = MulticlassClassificationEvaluator(metricName='accuracy',
                                              labelCol= 'indexedLabel',
                                              predictionCol= 'prediction')
# Calcul et affichage de la précision du modèle 
accuracy = evaluator.evaluate(predictions)
print(accuracy)

## V. Model Tuning

In [None]:
# Import de SparkSession et de SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Création d'un SparkContext
sc = SparkContext.getOrCreate()

# Création d'une session Spark
spark = SparkSession \
    .builder \
    .appName("ML Tuning") \
    .getOrCreate()
        
spark

#### V.1. Importation de la base de données

In [None]:
# Chargement de la base de données brute
df_full = spark.read.csv('YearPredictionMSD.txt', header=False)

# On infère les bons types des colonnes
from pyspark.sql.functions import col
exprs = [col(c).cast("double") for c in df_full.columns[1:13]]

df_casted = df_full.select(df_full._c0.cast('int'),
                           *exprs)

# Enfin, par soucis de rapidité des calculs,
# on ne traitera qu'un extrait de la base de données 
df = df_casted.sample(False, .1, seed = 222)

df.sample(False, .001, seed = 222).toPandas()

- (b) Convertir cette base de données au format svmlib dans une variable df_ml.
- (c) Afficher un extrait de cette base de données.

In [None]:
from pyspark.ml.linalg import DenseVector

# Conversion de la base de données au format svmlib
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])

df_ml.show()

- (d) Séparer df_ml en un ensemble train et un ensemble test comprenant respectivement 80% et 20% des données.

In [None]:
# Décomposition des données en deux ensembles d'entraînement et de test
# Par défaut l'échantillon est aléatoirement réparti
train, test = df_ml.randomSplit([0.8, 0.2], seed=222)

Dans un premier temps, nous allons instancier un modèle de régression linéaire.

- (e) Importer la fonction LinearRegression depuis le package pyspark.ml.regression.
- (f) Créer un estimateur lr permettant d'effectuer une régression linéaire entre le label 'label'et les features 'features'.

In [None]:
# Import de LinearRegression du package pyspark.ml.regression
from pyspark.ml.regression import LinearRegression

# Création d'un estimateur LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

#### V.2. Création d'une grille de paramètres

In [None]:
# Import de ParamGridBuilder du package pyspark.ml.tuning
from pyspark.ml.tuning import ParamGridBuilder

# Création d'une grille de paramètres
param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0, 0.5, 1]).\
    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
    build()

#### V.3. Choix d'une métrique d'évaluation

In [None]:
# Import de RegressionEvaluator du package pyspark.ml.evaluation
from pyspark.ml.evaluation import RegressionEvaluator

# Création d'un évaluateur ayant pour métrique d'évaluation r2
ev = RegressionEvaluator(predictionCol='prediction',
                                labelCol='label',
                                metricName='r2')

#### V.4. Réglage des paramètres par validation croisée

In [None]:
# Import de CrossValidator du package pyspark.ml.tuning
from pyspark.ml.tuning import CrossValidator

# Création d'un objet CrossValidator à 3 folds
cv = CrossValidator(estimator = lr,
                    estimatorParamMaps = param_grid,
                    evaluator=ev,
                    numFolds=3)

#### V.5. Application du modèle

In [None]:
# Import de la bibliothèque time et calcul du temps au début de l'exécution (t0)
from time import time
t0 = time()

lr = LinearRegression(featuresCol = 'features', labelCol = 'label')
ev = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')
cv = CrossValidator(estimator = lr, estimatorParamMaps = param_grid, evaluator = ev, numFolds = 3)

cv_model = cv.fit(train)

tt = time() - t0
print("Réalisé en {} secondes".format(round(tt,3)))

In [None]:
# Calcul des prédictions des données d'entraînement
pred_train = cv_model.transform(train)

# Calcul des prédictions des données de test
pred_test  = cv_model.transform(test)

In [None]:
ev.setMetricName('rmse').evaluate(pred_test)

#### V.6. Exploitation des résultats

In [None]:
# Affichage des coefficients du modèle
cv_model.bestModel.coefficients

Les paramètres de la grille obtenus pour le meilleur modèle ne sont pas accessibles directement, mais sont stockés au sein de l'objet java :

    cv_model.bestModel._java_obj.getRegParam()
    cv_model.bestModel._java_obj.getElasticNetParam()

Cette information est utile pour vérifier que notre grille est bien adaptée au modèle : il faut éviter que le paramètre choisi soit sur un bord de notre intervalle.
 
 
 Il faut ici passer par l'objet java parce que cette option n'est pas encore disponible directement dans PySpark.