#Introduction

**Comment traiter des données, ajuster un modèle de régression linéaire Spark ML, évaluer les performances du modèle, stocker le modèle et faire des prédictions pour de nouvelles données ?**


**Apache Spark dispose d’une bibliothèque pour différents types de modèles de Machine Learning.**

#Importer les bibliothèques nécessaires

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 30 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 33.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=c27fbd85d4ba5214c16caa6f60e9dce7172213bfbadc0f3c84b8b38c62812a73
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [3]:
# Data processing
import pandas as pd

# Create synthetic dataset
from sklearn.datasets import make_regression  # Notre problème est un cas de régression linéraire
# Modeling

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel

In [5]:
#Install findspark
!pip install findspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [7]:
# Import findspark
import findspark
findspark.init()

#import pyspark
import pyspark
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

#Créer un jeu de données pour la régression linéaire

In [8]:
# Create a synthetic dataset
X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42)

# Convert the data from numpy array to a pandas dataframe
pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y})

# Convert pandas dataframe to spark dataframe
sdf = spark.createDataFrame(pdf)

# Check data summary statistics
display(sdf.summary())

DataFrame[summary: string, feature1: string, feature2: string, dependent_variable: string]

In [13]:
pdf.head()

Unnamed: 0,feature1,feature2,dependent_variable
0,0.313498,-0.441196,12.470413
1,0.781497,2.459872,152.772101
2,0.760127,-0.551061,45.344113
3,-0.212218,0.527536,2.333974
4,-0.034571,0.239026,7.6265


In [15]:
sdf

DataFrame[feature1: double, feature2: double, dependent_variable: double]

#Train Test Split

In [None]:
# Train test split
trainDF, testDF = sdf.randomSplit([.8, .2], seed=42)
# Print the number of records
print(f'There are {trainDF.cache().count()} records in the training dataset.')
print(f'There are {testDF.cache().count()} records in the testing dataset.')

#Assembleur de vecteur

In [None]:
# La régression linéaire accepte une entrée vectorielle
vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)

display(vecTrainDF)

#Ajuster le modèle de régression linéaire Spark ML

In [None]:
# Créer un modèle de régression linéaire
lr = LinearRegression(featuresCol="features", labelCol="dependent_variable")
# Ajuster le modèle de régression linéaire
lrModel = lr.fit(vecTrainDF)
# Interception et les coefficients du modèle
print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}')

**Alternativement, nous pouvons créer un pipeline et ajuster le modèle sur le pipeline. Un pipeline comprend généralement à la fois les étapes de traitement des données et l'étape d'ajustement du modèle.**

In [None]:
# Creation du pipeline
stages = [vecAssembler, lr]
pipeline = Pipeline(stages=stages)

# Ajuster le modèle sur le pipeline
pipelineModel = pipeline.fit(trainDF)

#Model Performance Evaluation

In [None]:
# Faire des prédictions sur le jeu de données de test
predDF = pipelineModel.transform(testDF)
# The output
display(predDF.select("features", "dependent_variable", "prediction"))

**Après avoir obtenu les valeurs prédites, nous transmettons le nom de la colonne de prédiction et le nom de la colonne de valeur réelle dans RegressionEvaluator**

metricName peut être l'une des valeurs suivantes :

1) rmse : l'erreur quadratique moyenne est la valeur par défaut

2) mse : erreur quadratique moyenne

3) r2 : R carré

4) mae : erreur absolue moyenne

In [None]:
# Créer un évaluateur de régression
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse")

# RMSE
rmse = regressionEvaluator.evaluate(predDF)
print(f"The RMSE for the linear regression model is {rmse:0.2f}")

# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(predDF)
print(f"The MSE for the linear regression model is {mse:0.2f}")

# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"The R2 for the linear regression model is {r2:0.2f}")

# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(predDF)
print(f"The MAE for the linear regression model is {mae:0.2f}")

#Stocker le modèle

**Stocker le modèle de pipeline dans le AWS S3 bucket**

In [None]:
# Chemin pour stocker le modèle
pipelinePath = '/.../model/linear_regression_pipeline_model'
# Save the model to the path
pipelineModel.write().overwrite().save(pipelinePath)

In [None]:
# Confirmer que le modèle est stocké
%fs ls '/.../model/linear_regression_pipeline_model'

#Faire des prédictions pour de nouvelles données

In [None]:
# Create a new synthetic dataset
X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0)

# Convert the data from numpy array to a pandas dataframe
pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new})

# Convert pandas dataframe to spark dataframe
sdf_new = spark.createDataFrame(pdf_new)

# Check data summary statistics
display(sdf_new.summary())

In [None]:
# Load the saved model
loadedPipelineModel = PipelineModel.load(pipelinePath)

# Make prediction for the new dataset
predDF_new = loadedPipelineModel.transform(sdf_new)

# Take a look at the data
display(predDF_new.select("features", "dependent_variable", "prediction"))

#Rassemblez tout le code

In [None]:
###### Step 1: Import Libraries
# Data processing
import pandas as pd
# Create synthetic dataset
from sklearn.datasets import make_regression
# Modeling
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
###### Step 2: Create Dataset For Linear Regression
# Create a synthetic dataset
X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42)
# Convert the data from numpy array to a pandas dataframe
pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y})
# Convert pandas dataframe to spark dataframe
sdf = spark.createDataFrame(pdf)
# Check data summary statistics
display(sdf.summary())
###### Step 3: Train Test Split
# Train test split
trainDF, testDF = sdf.randomSplit([.8, .2], seed=42)
# Print the number of records
print(f'There are {trainDF.cache().count()} records in the training dataset.')
print(f'There are {testDF.cache().count()} records in the testing dataset.')
###### Step 4: Vector Assembler
# Linear regression expect a vector input
vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
# Take a look at the data
display(vecTrainDF)
###### Step 5: Fit Spark ML Linear Regression Model
# Create linear regression
lr = LinearRegression(featuresCol="features", labelCol="dependent_variable")
# Fit the linear regresssion model
lrModel = lr.fit(vecTrainDF)
# Print model intercept and coefficients
print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}')
# Create pipeline
stages = [vecAssembler, lr]
pipeline = Pipeline(stages=stages)
# Fit the pipeline model
pipelineModel = pipeline.fit(trainDF)
###### Step 6: Model Performance Evaluation
# Make predictions on testing dataset
predDF = pipelineModel.transform(testDF)
# Take a look at the output
display(predDF.select("features", "dependent_variable", "prediction"))
# Create regression evaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse")
# RMSE
rmse = regressionEvaluator.evaluate(predDF)
print(f"The RMSE for the linear regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(predDF)
print(f"The MSE for the linear regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"The R2 for the linear regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(predDF)
print(f"The MAE for the linear regression model is {mae:0.2f}")
# Visualize the data
display(predDF.select("dependent_variable", "prediction"))
###### Step 7: Save Model
# Path to save the model
pipelinePath = '/.../model/linear_regression_pipeline_model'
# Save the model to the path
pipelineModel.write().overwrite().save(pipelinePath)
# Confirm the model is saved
%fs ls '/.../model/linear_regression_pipeline_model'
###### Step 8: Make Predictions For New Data
# Create a new synthetic dataset
X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0)
# Convert the data from numpy array to a pandas dataframe
pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new})
# Convert pandas dataframe to spark dataframe
sdf_new = spark.createDataFrame(pdf_new)
# Check data summary statistics
display(sdf_new.summary())
# Load the saved model
loadedPipelineModel = PipelineModel.load(pipelinePath)
# Make prediction for the new dataset
predDF_new = loadedPipelineModel.transform(sdf_new)
# Take a look at the data
display(predDF_new.select("features", "dependent_variable", "prediction"))
# Actual vs. predicted
display(predDF_new.select("dependent_variable", "prediction"))