# CC DE BIGDATA - pyspark

Noms : **TAYAWELBA DAWAI HESED**

Classe : **Master 2 IADB**.

In [1]:
!pip install pyspark==3.5.1

!pip install findspark==2.0.1



In [2]:
# Importer les bibliothèques PySpark
import pyspark
from pyspark.sql import SparkSession

In [3]:
# Initialiser Spark
import findspark
findspark.init()

In [4]:
# Créer une session Spark
spark = SparkSession.builder.appName("Regression").getOrCreate()


In [5]:
# Message de test
df = spark.sql("select 'Spark' as hello")
df.show()


+-----+
|hello|
+-----+
|Spark|
+-----+



In [6]:
# Bibliothèques de préparation de données
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [7]:
# Bibliothèques d'apprentissage et d'évaluation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [8]:
# Chargement du jeu de données
df = spark.read.csv("/content/Concrete.csv", inferSchema=True, header=True)


In [9]:
# Afficher les 5 premières lignes
df.limit(5).toPandas()


Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27
3,332.5,142.5,0.0,228.0,0.0,932.0,594.0,365,41.05
4,198.6,132.4,0.0,192.0,0.0,978.4,825.5,360,44.3


In [10]:
# Nombre d'enregistrements et de colonnes
print(f"Nombre d'enregistrements: {df.count()}")
print(f"Nombre de colonnes: {len(df.columns)}")

Nombre d'enregistrements: 1030
Nombre de colonnes: 9


In [11]:
# Colonnes du jeu de données
df.columns

['cement',
 'slag',
 'flyash',
 'water',
 'superplasticizer',
 'coarseaggregate',
 'fineaggregate',
 'age',
 'csMPa']

# **Pré-traitement des données**

**Pas de suppression de colonnes nécessaire dans ce cas**

In [12]:
# Vérification du schéma de données
df.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



Valeurs manquantes

In [13]:
# Trouver le nombre de valeurs nulles par colonne
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()


Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,1,4,0,0,3,4,11,3,0


In [14]:
df.select("cement").summary('mean').show()

df =df.fillna({'cement':281})

df.select("slag").summary('mean').show()

df =df.fillna({'slag':74})

df.select("superplasticizer").summary('mean').show()

df =df.fillna({'superplasticizer':6})

df.select("coarseaggregate").summary('mean').show()

df =df.fillna({'coarseaggregate':972})

df.select("fineaggregate").summary('mean').show()

df =df.fillna({'fineaggregate':773})

df.select("age").summary('mean').show()

df =df.fillna({'age':45})

+-------+------------------+
|summary|            cement|
+-------+------------------+
|   mean|281.23449951409106|
+-------+------------------+

+-------+-----------------+
|summary|             slag|
+-------+-----------------+
|   mean|74.18391812865497|
+-------+-----------------+

+-------+-----------------+
|summary| superplasticizer|
+-------+-----------------+
|   mean|6.202142161635831|
+-------+-----------------+

+-------+-----------------+
|summary|  coarseaggregate|
+-------+-----------------+
|   mean|972.6394736842101|
+-------+-----------------+

+-------+-----------------+
|summary|    fineaggregate|
+-------+-----------------+
|   mean|773.1452404317946|
+-------+-----------------+

+-------+-----------------+
|summary|              age|
+-------+-----------------+
|   mean|45.71372930866602|
+-------+-----------------+



In [15]:
# # Remplacement des valeurs manquantes par la moyenne pour les valeurs numériques
# for col in ['cement', 'slag', 'superplasticizer', 'coarseaggregate', 'fineaggregate', 'age']:
#   df = df.fillna({col: df.select(col).summary('mean').rdd.first()[0]})


In [16]:
df.printSchema()

root
 |-- cement: double (nullable = false)
 |-- slag: double (nullable = false)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = false)
 |-- coarseaggregate: double (nullable = false)
 |-- fineaggregate: double (nullable = false)
 |-- age: integer (nullable = false)
 |-- csMPa: double (nullable = true)



In [17]:
# Vérification des valeurs manquantes restantes
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()


Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,0,0,0,0,0,0,0,0,0


In [18]:
# Corrélation (optionnel)
import six
for col in df.columns:
  if not isinstance(df.select(col).take(1)[0][0], six.string_types):
    print(f"Corrélation avec la résistance à la compression pour {col}:", df.stat.corr('csMPa', col))


Corrélation avec la résistance à la compression pour cement: 0.4983902048507596
Corrélation avec la résistance à la compression pour slag: 0.13470778480300238
Corrélation avec la résistance à la compression pour flyash: -0.10575491629731447
Corrélation avec la résistance à la compression pour water: -0.28963338498530294
Corrélation avec la résistance à la compression pour superplasticizer: 0.3658134397476175
Corrélation avec la résistance à la compression pour coarseaggregate: -0.16621456135112603
Corrélation avec la résistance à la compression pour fineaggregate: -0.16726113416813315
Corrélation avec la résistance à la compression pour age: 0.3284447729466021
Corrélation avec la résistance à la compression pour csMPa: 1.0


In [19]:
# Séparation des données en train et test
train, test = df.randomSplit([0.7, 0.3], seed=2018)
print(f"Nombre d'enregistrements dans l'ensemble d'entraînement: {train.count()}")
print(f"Nombre d'enregistrements dans l'ensemble de test: {test.count()}")


Nombre d'enregistrements dans l'ensemble d'entraînement: 732
Nombre d'enregistrements dans l'ensemble de test: 298


In [20]:
# Colonnes à mettre à l'échelle
cols_to_scale = ['cement', 'slag', 'flyash', 'water', 'superplasticizer', 'coarseaggregate', 'fineaggregate', 'age']


In [21]:
# Assemblage des features
s_assembler = VectorAssembler(inputCols=cols_to_scale, outputCol="feature")
train_assembled = s_assembler.transform(train)
train_assembled.show()

+------+-----+------+-----+----------------+---------------+-------------+---+-----+--------------------+
|cement| slag|flyash|water|superplasticizer|coarseaggregate|fineaggregate|age|csMPa|             feature|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+--------------------+
| 102.0|153.0|   0.0|192.0|             0.0|          887.0|        942.0|  7| 7.68|[102.0,153.0,0.0,...|
| 102.0|153.0|   0.0|192.0|             0.0|          887.0|        942.0| 28|17.28|[102.0,153.0,0.0,...|
| 102.0|153.0|   0.0|192.0|             0.0|          887.0|        942.0| 90|25.46|[102.0,153.0,0.0,...|
| 108.3|162.4|   0.0|203.5|             0.0|          938.2|        849.0|  3| 2.33|[108.3,162.4,0.0,...|
| 108.3|162.4|   0.0|203.5|             0.0|          938.2|        849.0|  7| 7.72|[108.3,162.4,0.0,...|
| 108.3|162.4|   0.0|203.5|             0.0|          938.2|        849.0| 28|20.59|[108.3,162.4,0.0,...|
| 108.3|162.4|   0.0|203.5|             0.0|  

In [22]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="feature", outputCol="features", withMean=True, withStd=True)

In [23]:
scaler_model = scaler.fit(train_assembled)
train_scaled = scaler_model.transform(train_assembled)


In [24]:
# Renommer la colonne cible
train_scaled = train_scaled.withColumnRenamed("csMPa", "label")


In [25]:
# Sélection des colonnes pour le Machine Learning
train_data = train_scaled.select("features", "label")


In [26]:
train_data.toPandas()

Unnamed: 0,features,label
0,"[-1.7377891123666505, 0.8877701970978037, -0.8...",7.68
1,"[-1.7377891123666505, 0.8877701970978037, -0.8...",17.28
2,"[-1.7377891123666505, 0.8877701970978037, -0.8...",25.46
3,"[-1.6768070031877411, 0.9955874397573756, -0.8...",2.33
4,"[-1.6768070031877411, 0.9955874397573756, -0.8...",7.72
...,...,...
727,"[2.5019194305479946, -0.867127475978207, -0.83...",41.64
728,"[2.5019194305479946, -0.867127475978207, -0.83...",52.61
729,"[2.5019194305479946, -0.867127475978207, -0.83...",59.76
730,"[2.5019194305479946, -0.867127475978207, -0.83...",67.31


# Algorithmes de régression

# **1. Régression linéaire**

In [27]:
from pyspark.ml.regression import LinearRegression

In [28]:
# Création du modèle
lr_regressor = LinearRegression()
lr_model = lr_regressor.fit(train_data)

In [29]:
# Evaluation
evaluator_r2 = RegressionEvaluator(metricName="r2")
evaluator_mse = RegressionEvaluator(metricName="mse")

In [30]:
# Prédictions avec la régression linéaire
lr_predictions = lr_model.transform(train_data)

In [31]:
# Affichage des métriques d'évaluation (R-carré et MSE)
r2_lr = evaluator_r2.evaluate(lr_predictions)
print("R-carré (régression linéaire):", r2_lr)
mse_lr = evaluator_mse.evaluate(lr_predictions)
print("MSE (régression linéaire):", mse_lr)

R-carré (régression linéaire): 0.6283189927948849
MSE (régression linéaire): 108.1627886266966


# **2. Régression avec Decision Tree**

In [32]:
from pyspark.ml.regression import DecisionTreeRegressor

In [33]:
# Paramètre de profondeur maximale de l'arbre
paramGrid = ParamGridBuilder().addGrid(DecisionTreeRegressor.maxDepth, [2, 3, 4]).build()

In [34]:
# Cross-validation
tree_cv = CrossValidator(estimator=DecisionTreeRegressor(), estimatorParamMaps=paramGrid, evaluator=evaluator_r2, numFolds=3)


In [35]:
# Ajustement du modèle
tree_model = tree_cv.fit(train_data)

In [36]:
# Sélection du meilleur modèle
best_tree_model = tree_model.bestModel

In [37]:
# Prédictions avec le meilleur modèle Decision Tree
tree_predictions = best_tree_model.transform(train_data)

In [38]:
# Affichage du R-carré du meilleur modèle Decision Tree
best_tree_r2 = evaluator_r2.evaluate(tree_predictions)
print("R-carré (meilleur modèle Decision Tree):", best_tree_r2)


R-carré (meilleur modèle Decision Tree): 0.8046651715872161


# **3. Régression Isotonique**

In [39]:
from pyspark.ml.regression import IsotonicRegression

In [40]:
# Paramètre de régularisation (alpha)
paramGrid = ParamGridBuilder().addGrid(IsotonicRegression.isotonic, [True, False]).build()

In [41]:
# Cross-validation
iso_cv = CrossValidator(estimator=IsotonicRegression(), estimatorParamMaps=paramGrid, evaluator=evaluator_r2, numFolds=3)


In [42]:
# Ajustement du modèle
iso_model = iso_cv.fit(train_data)

In [43]:
# Sélection du meilleur modèle
best_iso_model = iso_model.bestModel

In [44]:
# Prédictions avec le meilleur modèle Isotonic Regression
iso_predictions = best_iso_model.transform(train_data)

In [45]:
# Affichage du R-carré du meilleur modèle Isotonic Regression
best_iso_r2 = evaluator_r2.evaluate(iso_predictions)
print("R-carré (meilleur modèle Isotonic Regression):", best_iso_r2)


R-carré (meilleur modèle Isotonic Regression): 0.29633685289968625


# **4. Random Forest**

In [46]:
from pyspark.ml.regression import RandomForestRegressor


In [47]:
# Paramètre du nombre d'arbres
paramGrid = ParamGridBuilder().addGrid(RandomForestRegressor.numTrees, [100, 200, 500]).build()


In [48]:
# Cross-validation
rf_cv = CrossValidator(estimator=RandomForestRegressor(), estimatorParamMaps=paramGrid, evaluator=evaluator_r2, numFolds=3)


In [49]:
# Ajustement du modèle
rf_model = rf_cv.fit(train_data)


In [50]:
# Sélection du meilleur modèle
best_rf_model = rf_model.bestModel


In [51]:
# Prédictions avec le meilleur modèle Random Forest
rf_predictions = best_rf_model.transform(train_data)


In [52]:
# Affichage du R-carré du meilleur modèle Random Forest
best_rf_r2 = evaluator_r2.evaluate(rf_predictions)
print("R-carré (meilleur modèle Random Forest):", best_rf_r2)


R-carré (meilleur modèle Random Forest): 0.8532781438142442


# **5. Gradient Boosting**

In [53]:
from pyspark.ml.regression import GBTRegressor

In [54]:
# Paramètre de la profondeur maximale des arbres
paramGrid = ParamGridBuilder().addGrid(GBTRegressor.maxDepth, [3, 5, 8]).build()


In [55]:
# Cross-validation
gb_cv = CrossValidator(estimator=GBTRegressor(), estimatorParamMaps=paramGrid, evaluator=evaluator_r2, numFolds=3)


In [56]:
# Ajustement du modèle
gb_model = gb_cv.fit(train_data)


In [57]:
# Sélection du meilleur modèle
best_gb_model = gb_model.bestModel


In [58]:
# Prédictions avec le meilleur modèle Gradient Boosting
gb_predictions = best_gb_model.transform(train_data)


In [59]:
# Affichage du R-carré du meilleur modèle Gradient Boosting
best_gb_r2 = evaluator_r2.evaluate(gb_predictions)
print("R-carré (meilleur modèle Gradient Boosting):", best_gb_r2)


R-carré (meilleur modèle Gradient Boosting): 0.9552266559983826


In [60]:
# Comparaison des performances des modèles
print("R-carré des modèles:")
print("-" * 20)
print(f"Régression linéaire: {r2_lr}")
print(f"Meilleur modèle Decision Tree: {best_tree_r2}")
print(f"Meilleur modèle Régression Isotonique: {best_iso_r2}")
print(f"Meilleur modèle Random Forest: {best_rf_r2}")
print(f"Meilleur modèle Gradient Boosting: {best_gb_r2}")


R-carré des modèles:
--------------------
Régression linéaire: 0.6283189927948849
Meilleur modèle Decision Tree: 0.8046651715872161
Meilleur modèle Régression Isotonique: 0.29633685289968625
Meilleur modèle Random Forest: 0.8532781438142442
Meilleur modèle Gradient Boosting: 0.9552266559983826


# **Le meilleur modèle : Gradient Boosting**

In [61]:

from pyspark.ml.regression import GBTRegressor
from pyspark.sql import SparkSession


In [62]:
# Sélection du meilleur modèle
best_gb_model = gb_model.bestModel

In [63]:
# Affichage des importances des variables
gbt_featureImportances = best_gb_model.featureImportances.toArray()
imp_scores = []
numeric_inputs = ['cement', 'slag', 'flyash', 'water', 'superplasticizer', 'coarseaggregate', 'fineaggregate', 'age']
for x in gbt_featureImportances:
  imp_scores.append(float(x))

In [64]:
# Création d'un DataFrame pour afficher les importances des variables
result = spark.createDataFrame(zip(numeric_inputs, imp_scores), schema=["features", "score"])
print(result.orderBy(result["score"].desc()).show(truncate=False))

+----------------+--------------------+
|features        |score               |
+----------------+--------------------+
|age             |0.2803447469739218  |
|cement          |0.23436999112424156 |
|water           |0.11736454299828429 |
|slag            |0.11420958302356582 |
|fineaggregate   |0.09821674394184006 |
|superplasticizer|0.07098007767118576 |
|coarseaggregate |0.06351287802438395 |
|flyash          |0.021001436242576797|
+----------------+--------------------+

None


In [65]:
# Prédictions avec le meilleur modèle Gradient Boosting
gb_predictions = best_gb_model.transform(train_data)

In [66]:
# Affichage des prédictions
print("Prédictions :")
gb_predictions.show()

Prédictions :
+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[-1.7377891123666...| 7.68| 6.505047669801113|
|[-1.7377891123666...|17.28|18.621206404595718|
|[-1.7377891123666...|25.46| 27.41606387027266|
|[-1.6768070031877...| 2.33| 4.938830677981402|
|[-1.6768070031877...| 7.72|  8.57404743486597|
|[-1.6768070031877...|20.59|21.977694956555645|
|[-1.6768070031877...|29.23|28.537519485437638|
|[-1.6022733141912...|10.09| 9.325892247331563|
|[-1.6022733141912...|22.35|21.642629012043194|
|[-1.5383872950514...|10.35|10.878982136964563|
|[-1.5383872950514...|24.29|24.282629658654233|
|[-1.5383872950514...|33.19| 32.56178975656213|
|[-1.4473981162766...| 33.3| 32.28159309235297|
|[-1.4377184164069...| 6.88| 8.557619795489298|
|[-1.4377184164069...|27.87| 24.87226053451204|
|[-1.4377184164069...|36.59|32.812260965229655|
|[-1.4377184164069...|31.03|30.032121565270124|
|[-1.4212629266285...|13.2

In [67]:
# Affichage du R-carré du meilleur modèle Gradient Boosting
best_gb_r2 = evaluator_r2.evaluate(gb_predictions)
print("R-carré (meilleur modèle Gradient Boosting):", best_gb_r2)


R-carré (meilleur modèle Gradient Boosting): 0.9552266559983826


# Conclusion

Le meilleur modèle en termes de R-carré est le **Gradient Boosting**.