<a href="https://colab.research.google.com/github/hhmida/ESB/blob/main/LinearRegression_Spark_ML_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pipeline avec Spark ML
---

Le pipeline permet d'organiser les différentes transformations et prétraitements sur les données jusqu'à la création d'un modèle. Il facilite ainsi sa configuration et sa réutilisation. Un pipeline est composé de stages de **transformers** et **estimators**. 

**Transformers**

Il transforme un DataFrame en un nouveau DataFrame en lui ajoutant généralement une ou plusieurs colonnes. Les transformers commence par *fit()* sur le DataFrame et possèdent aussi une méthode *transform()* pour transformer le DataFrame.

Exemple : StandardScaler
```python
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.csv("data.csv")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
scalerModel = scaler.fit(dataFrame)
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
```

**Estimators**

C'est une abstraction d'un algorithme d'apprentissage qui sera entrainé sur un dataset. Sa méthode *fit()* dont le paramètre est un DataFrame produit un modèle. Ce modèle peut être utilisé ensuite pour transformer (méthode *transform()*) des données afin d'appliquer le modèle pour prédire leur classe par exemple.

Exemple : LinearRegression

```python
from pyspark.ml.regression import LinearRegression

#charger et préparer le dataset
# ...
# Créer le modèle
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(trainingDF)

# Appliquer  sur les données de test
lrModel.transform(testDF)
```
Nous allons appliquer un pipeline pour un problème de classification afin de prédire si un vol serait en retard ou non.

### Installer Pyspark télécharger le dataset



In [None]:
!pip install pyspark  

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!wget https://github.com/hhmida/datasets/raw/master/flights.csv

--2022-12-12 15:54:35--  https://github.com/hhmida/datasets/raw/master/flights.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/hhmida/datasets/master/flights.csv [following]
--2022-12-12 15:54:35--  https://raw.githubusercontent.com/hhmida/datasets/master/flights.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17698856 (17M) [text/plain]
Saving to: ‘flights.csv.1’


2022-12-12 15:54:35 (132 MB/s) - ‘flights.csv.1’ saved [17698856/17698856]



## Importer les librairies Spark SQL et Spark ML 

In [None]:
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('Spark ML').getOrCreate()

# Pour créer un pipeline
from pyspark.ml import Pipeline

# Estimator de type Régression logistique pour la classification
from pyspark.ml.classification import LogisticRegression

# La préparation des données pour la regression logistique
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler

## Charger et explorer le dataset

Notre dataset est sous forme de fichier csv **flights.csv**

In [None]:
flightSchema = "DayofMonth int,DayOfWeek int,Carrier string,OriginAirportID string,DestAirportID string, DepDelay int, ArrDelay int, Late int"
data = spark.read.csv('flights.csv', schema=flightSchema, header=True)
data.show()

+----------+---------+-------+---------------+-------------+--------+--------+----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|Late|
+----------+---------+-------+---------------+-------------+--------+--------+----+
|        21|        2|     WN|          10721|        13342|      26|      57|   1|
|        13|        1|     AA|          15016|        12892|      51|      27|   1|
|         5|        5|     FL|          10397|        11433|       9|       4|   0|
|        22|        1|     US|          11278|        14100|      35|      71|   1|
|        23|        4|     WN|          12451|        10693|       9|       5|   0|
|         5|        7|     AA|          11298|        15016|      39|      42|   1|
|         4|        6|     UA|          13930|        14307|      71|      58|   1|
|        10|        3|     9E|          14307|        11433|      68|     140|   1|
|        29|        7|     UA|          14057|        14771|     130|     12

In [None]:
data.count()

637083

In [None]:
from pyspark.sql.functions import *
data.groupby().agg(min("DepDelay"),max("DepDelay"),mean("DepDelay"),min("ArrDelay"),max("ArrDelay"),mean("ArrDelay")).show()

+-------------+-------------+-----------------+-------------+-------------+-----------------+
|min(DepDelay)|max(DepDelay)|    avg(DepDelay)|min(ArrDelay)|max(ArrDelay)|    avg(ArrDelay)|
+-------------+-------------+-----------------+-------------+-------------+-----------------+
|          -52|          149|28.89959706976956|          -86|          149|29.31866177562421|
+-------------+-------------+-----------------+-------------+-------------+-----------------+



In [None]:
from pyspark.ml.stat import Correlation
va = VectorAssembler(inputCols=["DayofMonth", "DayOfWeek", "DepDelay", "ArrDelay","Late"], outputCol="features")

In [None]:
newData =va.transform(data)

In [None]:
newData.show()

+----------+---------+-------+---------------+-------------+--------+--------+----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|Late|            features|
+----------+---------+-------+---------------+-------------+--------+--------+----+--------------------+
|        21|        2|     WN|          10721|        13342|      26|      57|   1|[21.0,2.0,26.0,57...|
|        13|        1|     AA|          15016|        12892|      51|      27|   1|[13.0,1.0,51.0,27...|
|         5|        5|     FL|          10397|        11433|       9|       4|   0|[5.0,5.0,9.0,4.0,...|
|        22|        1|     US|          11278|        14100|      35|      71|   1|[22.0,1.0,35.0,71...|
|        23|        4|     WN|          12451|        10693|       9|       5|   0|[23.0,4.0,9.0,5.0...|
|         5|        7|     AA|          11298|        15016|      39|      42|   1|[5.0,7.0,39.0,42....|
|         4|        6|     UA|          13930|        1

In [None]:
r1 = Correlation.corr(newData, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.        ,  0.02932506,  0.00345172,  0.00521996,  0.00760964],
             [ 0.02932506,  1.        , -0.00797621, -0.01716528, -0.01259032],
             [ 0.00345172, -0.00797621,  1.        ,  0.89696607,  0.70494444],
             [ 0.00521996, -0.01716528,  0.89696607,  1.        ,  0.80351383],
             [ 0.00760964, -0.01259032,  0.70494444,  0.80351383,  1.        ]])


## Découper le dataset en training set et test set
Dans ce qui suit 70% des données pour le training et 30% pour le test.

In [None]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 445647  Testing Rows: 191436


## Définir le Pipeline
Les étapes d'un pipeline dépendent de type d'apprentissage (supervisé, non supervisé, ...) et le type du problème et l'algorithme utilisé (classification, régression linéaire, arbre de décision, ...).

Dans cet exemple, il s'agit de problème de classification avec l'algorithme de régression logistique. Alors, le pipeline est constitué des éléments suivants :

- **StringIndexer** pour transformer les colonnes catégoriques en colonne numériques.
- **VectorAssembler** pour créer un vecteur des valeurs numériques continues.
- **MinMaxScaler** pour normaliser les valeurs numériques dans l'intervalle [0,1].
- **VectorAssembler** pour regrouper les colonnes catégoriques et numériques en une seule colonne vecteur.
- **LogisticRegression** l'algorithme d'apprentissage pour le modèle de classification.

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
monthdayIndexer = StringIndexer(inputCol="DayofMonth", outputCol="DayofMonthIdx")
weekdayIndexer = StringIndexer(inputCol="DayOfWeek", outputCol="DayOfWeekIdx")
carrierIndexer = StringIndexer(inputCol="Carrier", outputCol="CarrierIdx")
originIndexer = StringIndexer(inputCol="OriginAirportID", outputCol="OriginAirportIdx")
destIndexer = StringIndexer(inputCol="DestAirportID", outputCol="DestAirportIdx")
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normNums")
featVect = VectorAssembler(inputCols=["DayofMonthIdx", "DayOfWeekIdx", "CarrierIdx", "OriginAirportIdx", "DestAirportIdx", "normNums"], outputCol="features")
lr = LinearRegression(labelCol="ArrDelay", featuresCol="features")
pipeline = Pipeline(stages=[monthdayIndexer, weekdayIndexer, carrierIndexer, originIndexer, destIndexer, numVect, minMax, featVect, lr])

### Exécuter le Pipeline sur le training set


In [None]:
piplineModel = pipeline.fit(train)
print ("Pipeline complete!")

Pipeline complete!


## Tester le modèle sur le test set

In [None]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", col("prediction"), col("ArrDelay").alias("trueLabel"))
predicted.show(100, truncate=False)

+---------------------------------------------+--------------------+---------+
|features                                     |prediction          |trueLabel|
+---------------------------------------------+--------------------+---------+
|[25.0,2.0,10.0,1.0,25.0,0.7263681592039801]  |91.63212261353219   |113      |
|[25.0,2.0,10.0,18.0,12.0,0.2288557213930348] |-2.4353049663154707 |39       |
|[25.0,2.0,10.0,8.0,12.0,0.4129353233830846]  |32.655151321598794  |83       |
|[25.0,2.0,10.0,48.0,16.0,0.2537313432835821] |1.5203968897891684  |-12      |
|[25.0,2.0,10.0,38.0,53.0,0.3383084577114428] |16.105886790902183  |86       |
|[25.0,2.0,10.0,38.0,53.0,0.4129353233830846] |30.253032772769387  |27       |
|[25.0,2.0,10.0,38.0,22.0,0.24378109452736318]|-0.43801368503966387|62       |
|[25.0,2.0,10.0,38.0,5.0,0.5323383084577115]  |55.0187648228639    |55       |
|[25.0,2.0,10.0,38.0,7.0,0.5621890547263682]  |60.58886077898133   |42       |
|[25.0,2.0,10.0,38.0,49.0,0.2537313432835821] |0.249

## Évaluer le modèle

### Matrice de confusion
Elle calcule le nombre de :
- True Positives (TP)
- True Negatives (TN)
- False Positives (FP)
- False Negatives (FN)

Plusieurs mesures sont calculées à partir de cette matrice comme :
$$precision = \frac{TP}{TP+FP}$$
$$recall(TPR) = \frac{TP}{TP+FN}$$
$$Accuracy = \frac{TP+TN}{TP+TN+FP+FN}$$

In [None]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           89464.0|
|       FP|            7162.0|
|       TN|           81711.0|
|       FN|           13099.0|
|Precision|  0.92587916295821|
|   Recall|0.8722833770463032|
+---------+------------------+



### AUC

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="Late", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(prediction)
print ("AUC = ", auc)

AUC =  0.949390862764023


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
myevaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse =myevaluator.evaluate(predicted)
print("RMSE=", rmse)

RMSE= 17.21261793810399
