Machine learning avec Pyspark
========
Régression avec PySpark
--------
- - - -

Spark ML est un module très récent, développé en parallèle par Databricks et UC Berkeley AMPLab et lancé fin 2015. Spark ML permet d'exécuter la majorité des algorithmes de Machine Learning de façon distribuée pour un très grand gain en performance.

Dans cet exercice, nous étudierons le cas d'une régression simple de façon à comprendre comment préparer les données et faire face à un problème de Machine Learning grâce à Spark ML. Des algorithmes plus poussés font l'objectif de l'exercice suivant.

* __(a)__ Exécuter la cellule ci-dessous pour construire une SparkSession pour notre exercice.

In [1]:
# Import de SparkSession et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext en local
sc = SparkContext.getOrCreate()

# Construction d'une session Spark
spark = SparkSession \
    .builder \
    .appName("Introduction à Spark ML") \
    .getOrCreate()
    
spark

## 1. Importation de la base de données

Dans cet exercice, la base de données utilisée est Year Prediction MSD (https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD). Elle contient des caractéristiques audio de 515345 chansons parues entre 1922 et 2011. Ces chansons sont essentiellement des tubes commerciaux occidentaux.

Cette base de données contient 91 variables :
* Une variable contenant l'année de la chanson.
* 12 variables contenant une projection à 12 dimensions du timbre audio de la chanson.
* 78 variables contenant des informations de covariance du timbre audio.

L'objectif est d'estimer l'année de sortie d'une chanson en fonction de ses caractéristiques audio. Pour cela nous allons implémenter une régression linéaire simple sur les informations du timbre pour prédire l'année de sortie.

* __(a)__ Charger le fichier YearPredictionMSD.txt dans un DataFrame nommé df_raw.
* __(b)__ Afficher un extrait de la base de données avec une méthode de votre choix.

In [2]:
# Chargement du fichier " YearPredictionMSD.txt" dans un DataFrame
df_raw = spark.read.csv('data/YearPredictionMSD.txt')

# Première méthode d'affichage
df_raw.show(2, truncate = 4)
# Modifier les valeurs de 'truncate' ne permet pas de bien visualiser les données
# à cause du nombre de variables

# Deuxième méthode d'affichage
# df_raw.sample(False, .00001, seed = 222).toPandas()
# Utiliser toPandas permet de mieux visualiser les données

+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|_c45|_c46|_c47|_c48|_c49|_c50|_c51|_c52|_c53|_c54|_c55|_c56|_c57|_c58|_c59|_c60|_c61|_c62|_c63|_c64|_c65|_c66|_c67|_c68|_c69|_c70|_c71|_c72|_c73|_c74|_c75|_c76|_c77|_c78|_c79|_c80|_c81|_c82|_c83|_c84|_c85|_c86|_c87|_c88|_c89|_c90|
+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

<div class="alert alert-warning">
Afficher une base données avec la méthode show() est plus rapide. Cependant, cet affichage peut parfois être incompréhensible lorsqu'il y a trop de variables.<br>
On peut alors sélectionner quelques variables et tronquer l'affichage pour le rendre propre ou alors privilégier la succession des méthodes sample() et toPandas() en faisant attention à choisir un nombre raisonnable de lignes à afficher. Il faut bien garder en tête que même si la méthode sample() est relativement rapide, elle passe néanmoins par un décompte du nombre de ligne, une opération simple mais qui n'est pas rapide en Spark.
</div>

L'exercice précédent montre que le parsing effectué par PySpark a tendance à enregistrer toutes les variables en string même lorsqu'elles sont numériques. La vérification de cette information peut s'effectuer en utilisant la méthode printSchema() :

```python
df_raw.printSchema()
```

Pour modifier le type de chacune des variables, il faudrait changer son type comme suit :

```python
df_raw.select(df_raw._c0.cast("double"),
                  df_raw._c1.cast("double"),
                  df_raw._c2.cast("double"),
                  df_raw._c3.cast("double"),
                  ...)
```

Une telle tâche devient de plus en plus fastidieuse quand le nombre de variables devient important. Cette démarche peut être automatisée grâce à la fonction col issue du sous-module pyspark.sql.functions. La fonction col permet de nommer directement une colonne et d'automatiser ce type de démarche au sein d'une boucle. Les deux lignes suivantes permettent alors de changer toutes les colonnes en double dans un nouveau DataFrame df :

```python
exprs = [col(c).cast("double") for c in df_raw.columns]
df = df_raw.select(*exprs)
```

* __(c)__ Importer la fonction col du sous-module pyspark.sql.functions.
* __(d)__ Créer un DataFrame df à partir de df_raw en changeant les types des colonnes relatives au timbre en double et l'année en int.
* __(e)__ Afficher le schéma des variables du df.

In [3]:
from pyspark.sql.functions import col

exprs = [col(c).cast("double") for c in df_raw.columns[1:91]]
df = df_raw.select(df_raw._c0.cast("int"), *exprs )
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: double (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: double (nullable = true)
 |-- _c27: double (nullable = tr

<div class="alert alert-warning">
Inférer le bon type des variables peut paraître superflu et certains algorithmes fonctionnent même lorsque les variables numériques sont de type string. Cependant, il s'agit d'une mesure de sécurité importante car cela peut entraîner beaucoup de bugs potentiels.<br>
Une deuxième mesure de sécurité à prendre en compte est de supprimer ou remplacer les valeurs manquantes. La base de données est ici dépourvue de valeurs manquantes.
</div>

* __(g)__ Afficher un résumé descriptif de la base de données df.

In [None]:
# Affichage d'un résumé descriptif des données
df.describe().toPandas()

<div class="alert alert-warning">
Une bonne pratique est de mettre la variable à prédire en première position.
</div>

## 2. Mise en forme de la base en format svmlib

Pour pouvoir être utilisée par les algorithmes de Machine Learning de Spark ML, la base de données doit être un DataFrame contenant 2 colonnes :
* La colonne label contenant la variable à prédire (label en anglais).
* La colonne features contenant les variables explicatives (features en anglais).

La fonction DenseVector() issue du package pyspark.ml.linalg permet de regrouper plusieurs variables en une seule variable.

<div class="alert alert-warning">
Pour pouvoir utiliser la fonction DenseVector(), il faut utiliser la méthode map après avoir transformer le DataFrame en rdd.
</div>

__Exemple :__

```python
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:]))) # en supposant que la variable à expliquer est en première position
```

__Exemple :__

```python
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])
```

* __(a)__ Importer la fonction DenseVector du package pyspark.ml.linalg.
* __(b)__ Créer un rdd rdd_ml séparant la variable à expliquer des features (à mettre sous forme DenseVector).
* __(c)__ Créer un DataFrame df_ml contenant notre base de données sous deux variables : 'label' et 'features'.
* __(d)__ Afficher un extrait de df_ml.

In [4]:
# Import de DenseVector du package pyspark.ml.linalg
from pyspark.ml.linalg import DenseVector

# Création d'un rdd en séparant la variable à expliquer des features
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Création d'un DataFrame composé de deux variables : label et features
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])

# Affichage des 10 premières lignes du DataFrame
df_ml.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 2001|[49.94357,21.4711...|
| 2001|[48.73215,18.4293...|
| 2001|[50.95714,31.8560...|
| 2001|[48.2475,-1.89837...|
| 2001|[50.9702,42.20998...|
| 2001|[50.54767,0.31568...|
| 2001|[50.57546,33.1784...|
| 2001|[48.26892,8.97526...|
| 2001|[49.75468,33.9958...|
| 2007|[45.17809,46.3423...|
+-----+--------------------+
only showing top 10 rows



Afin d'évaluer les performances du modèle de régression, il faut mettre de côté une partie des données qui attesteront de la qualité du modèle une fois entraîné. Pour cela, il faut systématiquement diviser les données en un ensemble d'entraînement et un ensemble de test.

<div class="alert alert-info">
Usuellement, la taille de jeu de test est comprise entre 15% et 30% de la quantité totale de données disponibles. Le choix de la répartition dépend essentiellement de la quantité et de la qualité des données disponibles.
</div>

<div class="alert alert-info">
La méthode randomSplit permet de séparer un DataFrame en deux. Par exemple, la création d'un DataFrame d'entraînement contenant 70% des données et un de test en contenant 30%, se fait de la façon suivante :<br>
train, test = df.randomSplit([.7, .3], seed= 222)
</div>

<div class="alert alert-success">
Imposer un seed sert simplement à rendre les résultats reproductibles.
</div>

* __(e)__ Créer deux DataFrames appelés train et test contenant respectivement 80% et 20% des données.

In [5]:
train, test = df_ml.randomSplit([.8, .2], seed= 222)

## 3. Régression linéaire

Spark ML contient de nombreuses fonctions de Machine Learning, commençons ici par la plus basique : la régression linéaire. Elle est présente sous le nom LinearRegression (https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression) dans le module pyspark.ml.regression. Cette fonction permet d'effectuer une régression linéaire de façon distribuée et effectue les calculs sur les différents clusters prédéfinis dans la SparkSession, quel que soit leur nombre ou la taille de la base de données.

Pour l'utiliser, il faut procéder avec les deux étapes habituelles :
* Créer la fonction avec les paramètres spécifiques au contexte.
* Utiliser la méthode fit pour l'appliquer aux données.

<div class="alert alert-success">
Cliquez ici (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html#pyspark.ml.regression.LinearRegression) pour plus d'informations sur les différents paramètres disponibles de LinearRegression.
</div>

__Exemple :__

```python
lr = LinearRegression(labelCol='label', featuresCol= 'features', maxIter=10, regParam=0.3)
```

* __(a)__ Importer la fonction LinearRegression du sous-module pyspark.ml.regression.
* __(b)__ Créer lr, une fonction de régression linéaire distribuée pour l'appliquer à l'ensemble train.
* __(c)__ Créer linearModel, le modèle issu de lr appliqué à train.

In [None]:
# Import de LinearRegression du package pyspark.ml.regression
from pyspark.ml.regression import LinearRegression

# Création d'une fonction de régression linéaire
lr = LinearRegression(labelCol='label', featuresCol= 'features')

# Apprentissage sur les données d'entraînement 
linearModel = lr.fit(train)

Maintenant que le modèle d'apprentissage est construit, il est possible d'effectuer des prédictions sur les données test.

Pour cela, les modèles Spark ML possèdent une méthode transform() permettant d'effectuer des prédictions en prenant en seul argument la base de données de test.

* __(d)__ Créer un DataFrame predicted contenant les données prédites et les labels correspondants.
* __(e)__ Afficher un extrait de predicted.

In [None]:
# Calcul des prédictions sur les données test
predicted = linearModel.transform(test)

# Affichage des prédictions
predicted.show()

## 4. Evaluation du modèle

Afin d'évaluer la qualité du modèle, il est possible de chercher les informations au sein de l'attribut summary de notre modèle.

<div class="alert alert-info">
Taper linearModel.summary. puis appuyez sur Tab pour faire apparaître tous les différents attributs du summary
</div>

* __(a)__ Calculer et afficher le RMSE (Root Mean Squared Error) de la régression.
* __(b)__ Calculer et afficher le R 22  de la régression.

In [None]:
# Calcul et affichage du RMSE
print("RMSE:", linearModel.summary.rootMeanSquaredError)

# Calcul et affichage du R2
print("R2:  ", linearModel.summary.r2)

<div class="alert alert-info">
Bien que la RMSE indique une erreur moyenne relativement faible (inférieur à 10), le R 22  reste très faible (inférieur à 0.25). On peut donc penser que le modèle de régression est dans ce cas un mauvais indicateur de la décennie dans laquelle la chanson a été composée.
</div>

Une fois les résultats obtenus, il est possible d'optimiser le modèle par rapport à ces mesures. De façon générale, l'optimisation des modèles se fait en modifiant les variables explicatives utilisées, en utilisant un autre modèle de régression et/ou en changeant les paramètres du modèle grâce à la documentation de PySpark.

Une fois que le modèle prédit adéquatement l'année de sortie, Spark ML offre la possibilité de récupérer les coefficients grâce aux attributs coefficients et intercept du modèle.

* __(c)__ Afficher les coefficients coefficients du modèle. La fonction pprint du module pprint permet d'avoir un affichage plus élégant des données.
* __(d)__ Fermer la session spark en utilisant la méthode stop.

In [None]:
from pprint import pprint

# Affichage des Coefficients du modèle linéaire
pprint(linearModel.coefficients)

# Fermeture de la session Spark 
spark.stop()

<div class="alert alert-success">
Maintenant que vous avez appris à programmer une régression linéaire en utilisant Spark ML, vous n'êtes qu'à quelques pas de maîtriser tout algorithme de régression distribué sous Spark. Pour vous aider à retenir l'essentiel, en voici un aperçu :<br>
• 1. Transformer la base de données en format svmlib :<br>
  • Sélectionner les variables numériques à utiliser pour la régression.<br>
  • Placer la variable à expliquer en première position.<br>
  • Mapper un couple (label, vecteur de features) dans un RDD.<br>
  • Convertir ce RDD en DataFrame et nommer les variables 'label' et 'features'.<br>
• 2. Séparer la base de données en deux échantillons train et test.<br>
• 3. Appliquer un modèle de classification.<br>
• 4. Evaluer le modèle.
</div>

<div class="alert alert-success">
Spark est en constante amélioration et possède aujourd'hui quelques régresseurs notables. Ils sont utilisables de la même façon en important ces fonction depuis pyspark.ml.regression. Vous êtes invité à consulter la documentation pour observer les différents paramètres à prendre en compte pour optimiser ces algorithmes :<br>
  • LinearRegression() pour effectuer une régression linéaire lorsque le label est présupposé suivre une loi normale.<br>
  • GeneralizedLinearRegression() pour effectuer une régression linéaire généralisée lorsque le label est présupposé suivre une autre loi que l'on spécifie dans le paramètre family (gaussian, binomial, poisson, gamma).<br>
  • AFTSurvivalRegression() pour effectuer une analyse de survie.<br><br>

Il est également possible d'utiliser les algorithmes, qui gèrent également les variables catégorielles, détaillés dans l'exercice suivant :<br>
  • DecisionTreeRegressor() pour un arbre de décision.<br>
  • RandomForestRegressor() pour une forêt aléatoire d'arbres de décision.<br>
  • GBTRegressor() pour une forêt d'arbres gradient-boosted.
</div>