# Pr√©diction de l'Attrition Client Bancaire : Pipeline de Machine Learning Distribu√© avec Apache Spark

* imports : 

In [1]:


from pyspark.sql import SparkSession
from pymongo import MongoClient
import pyspark
import pandas as pd
import os
import sys
import numpy as np

from pyspark import StorageLevel
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from imblearn.over_sampling import SMOTE
import numpy as np
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.ml import PipelineModel

import shutil


## üü¢ Construction du Pipeline de Machine Learning

### 1Ô∏è‚É£ R√©cup√©ration des donn√©es pr√©trait√©es

In [2]:

# Configuration des variables d'environnement Python
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


# Configurez la variable d'environnement HADOOP_HOME
os.environ['HADOOP_HOME'] = r'C:\hadoop'
os.environ['PATH'] = os.environ['HADOOP_HOME'] + r'\bin;' + os.environ['PATH']



In [3]:


print("üîç V√©rification de la version PySpark...")
print(f"Version PySpark: {pyspark.__version__}")



üîç V√©rification de la version PySpark...
Version PySpark: 3.5.7


In [4]:


# Cr√©er une session Spark simple (sans MongoDB JAR)
spark = SparkSession.builder \
    .appName("MongoDB-PySpark-PyMongo") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.python.worker.timeout", "600") \
    .master("local[*]") \
    .config("spark.hadoop.io.nativeio.NativeIO$Windows.enabled", "false") \
    .getOrCreate()
    
    # .config("spark.hadoop.io.nativeio.NativeIO.disable.native", "true") \
    


In [5]:

# Connexion √† MongoDB avec PyMongo
print("\nüîå Connexion √† MongoDB...")
client = MongoClient("mongodb://localhost:27017/")
db = client["Attrition_Client_Bancaire_db"]
collection = db["clients_pretraite"]



üîå Connexion √† MongoDB...


In [6]:

# Compter les documents
total_docs = collection.count_documents({})
print(f"üìä Total documents dans MongoDB : {total_docs}")

# Lire les donn√©es
data = list(collection.find())

# Convertir en DataFrame Pandas
df_pandas = pd.DataFrame(data)

display(df_pandas.head(5))


üìä Total documents dans MongoDB : 10000


Unnamed: 0,_id,CreditScore,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,Geography_Index,Gender_Index
0,690e19f332753e87679009a5,619.0,3.7612,2,0.0,1,1,1,101348.88,1,0,1
1,690e19f332753e87679009a6,608.0,3.73767,1,83807.86,1,0,1,112542.58,0,2,1
2,690e19f332753e87679009a7,502.0,3.7612,8,159660.8,3,1,0,113931.57,1,0,1
3,690e19f332753e87679009a8,699.0,3.688879,1,0.0,2,0,0,93826.63,0,0,1
4,690e19f332753e87679009a9,850.0,3.78419,2,125510.82,1,1,1,79084.1,0,2,1


In [7]:

# Supprimer le champ _id 
if '_id' in df_pandas.columns:
    df_pandas = df_pandas.drop('_id', axis=1)

print(f"‚úÖ Donn√©es charg√©es dans Pandas : {len(df_pandas)} lignes")
print(f"üìã Colonnes : {list(df_pandas.columns)}")


‚úÖ Donn√©es charg√©es dans Pandas : 10000 lignes
üìã Colonnes : ['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'Exited', 'Geography_Index', 'Gender_Index']


In [8]:

# Convertir en DataFrame Spark
df = spark.createDataFrame(df_pandas)

print("\n‚úÖ Donn√©es converties en Spark DataFrame :")
df.printSchema()
df.show(5)

print(f"\nüìä Nombre total de lignes : {df.count()}")

client.close()


‚úÖ Donn√©es converties en Spark DataFrame :
root
 |-- CreditScore: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Tenure: long (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: long (nullable = true)
 |-- HasCrCard: long (nullable = true)
 |-- IsActiveMember: long (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: long (nullable = true)
 |-- Geography_Index: long (nullable = true)
 |-- Gender_Index: long (nullable = true)

+-----------+------------------+------+---------+-------------+---------+--------------+---------------+------+---------------+------------+
|CreditScore|               Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Geography_Index|Gender_Index|
+-----------+------------------+------+---------+-------------+---------+--------------+---------------+------+---------------+------------+
|      619.0|3.7612001156935624|     2|      0.0|            1|        1|   

### 2Ô∏è‚É£ Gestion du d√©s√©quilibre de classes (undersampling / SMOTE)

In [9]:

data = df.select("Exited", "CreditScore", "Age", "Balance", "EstimatedSalary",
                 "Tenure", "NumOfProducts", "HasCrCard", "IsActiveMember", "Geography_Index", "Gender_Index" ).collect()

# display(data)


X = np.array([[row["CreditScore"], row["Age"], row["Balance"], row["EstimatedSalary"],
               row["Tenure"], row["NumOfProducts"], row["HasCrCard"], row["IsActiveMember"],row["Geography_Index"], row["Gender_Index"] ]
              for row in data])

y = np.array([row["Exited"] for row in data])

print("Avant SMOTE :")
print("Classe 0 :", sum(y == 0))
print("Classe 1 :", sum(y == 1))

Avant SMOTE :
Classe 0 : 7963
Classe 1 : 2037


In [10]:

smote = SMOTE(random_state=42, sampling_strategy="auto")
X_resampled, y_resampled = smote.fit_resample(X, y)

print("\nApr√®s SMOTE :")
unique, counts = np.unique(y_resampled, return_counts=True)
for cls, cnt in zip(unique, counts):
    print(f"Classe {cls} : {cnt}")



Apr√®s SMOTE :
Classe 0 : 7963
Classe 1 : 7963


In [11]:
# Create the resampled data with proper column names
resampled_data = [
    Row(
        Exited=int(y_resampled[i]),
        CreditScore=float(X_resampled[i][0]),
        Age=float(X_resampled[i][1]),
        Balance=float(X_resampled[i][2]),
        EstimatedSalary=float(X_resampled[i][3]),
        Tenure=int(X_resampled[i][4]),
        NumOfProducts=int(X_resampled[i][5]),
        HasCrCard=int(X_resampled[i][6]),
        IsActiveMember=int(X_resampled[i][7]),
        Geography_Index=int(X_resampled[i][8]),
        Gender_Index=int(X_resampled[i][9])
    )
    for i in range(len(y_resampled))
]

# Create pandas DataFrame
df_pd = pd.DataFrame(resampled_data)

# Create Spark DataFrame directly from the list of Row objects
df_smote = spark.createDataFrame(resampled_data)


print(f"Total count: {df_smote.count()}")

print("\n‚úÖ Donn√©es √©quilibr√©es :")
df_smote.groupBy("Exited").count().show()

print("\nüìà Statistiques descriptives :")
df_smote.describe().show()

Total count: 15926

‚úÖ Donn√©es √©quilibr√©es :
+------+-----+
|Exited|count|
+------+-----+
|     0| 7963|
|     1| 7963|
+------+-----+


üìà Statistiques descriptives :
+-------+------------------+-----------------+-------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+
|summary|            Exited|      CreditScore|                Age|          Balance|   EstimatedSalary|            Tenure|     NumOfProducts|          HasCrCard|     IsActiveMember|   Geography_Index|      Gender_Index|
+-------+------------------+-----------------+-------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+
|  count|             15926|            15926|              15926|            15926|             15926|             15926|             15926|              15926|     

### 3Ô∏è‚É£ S√©lection et assemblage des features (VectorAssembler)

In [None]:
feature_cols = [
 'CreditScore',
 'Age',
 'Balance',
 'EstimatedSalary',
 'Tenure',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'Geography_Index',
 'Gender_Index'
]


assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="assembled_features"
)



### 4Ô∏è‚É£ Normalisation des features (StandardScaler ou MinMaxScaler)


In [None]:

# Cr√©ation du StandardScaler
scaler = StandardScaler(
    inputCol="assembled_features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)



### 5Ô∏è‚É£ Choix du mod√®le de mlib.

In [14]:
lr_m = LogisticRegression(featuresCol="scaled_features", labelCol="Exited")


### 6Ô∏è‚É£ Construction d‚Äôun Pipeline int√©grant toutes les √©tapes


In [15]:

pipeline_lr = Pipeline(stages=[assembler, scaler, lr_m])



#### ‚ûï Split et entra√Ænement


In [16]:
train_df, test_df = df_smote.randomSplit([0.8, 0.2], seed=42)

In [17]:
nbr_train = train_df.count()
nbr_test = test_df.count()
total = df_smote.count()

pourcentage_train = (nbr_train / total) * 100
pourcentage_test = (nbr_test / total) * 100


In [18]:

print(f"train : {nbr_train} - {pourcentage_train:.2f}%")
print(f"test : {nbr_test} - {pourcentage_test:.2f}%")

train : 12756 - 80.10%
test : 3170 - 19.90%


## üü¢ Entra√Ænement et Validation Crois√©e

### 1Ô∏è‚É£ D√©finition de la grille d‚Äôhyperparam√®tres

In [19]:



paramGrid = ParamGridBuilder()\
    .addGrid(lr_m.regParam, [0.01, 0.05 , 0.1, 0.2, 0.5])\
    .addGrid(lr_m.elasticNetParam, [0.0,0.3, 0.5, 0.7, 1.0])\
    .addGrid(lr_m.maxIter, [5])\
    .build()



evaluator = BinaryClassificationEvaluator(
    labelCol="Exited",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
    
    
crossval = (
    CrossValidator()
    .setEstimator(pipeline_lr)
    .setEvaluator(evaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3)
)




### 2Ô∏è‚É£ Entra√Ænement avec CrossValidator

In [20]:
cv_model = crossval.fit(train_df)

In [21]:
predictions = cv_model.transform(test_df)

In [22]:
auc = evaluator.evaluate(predictions)
print(f"AUC sur le test set : {auc:.4f}")

AUC sur le test set : 0.8290


In [23]:
best_model = cv_model.bestModel
best_lr = best_model.stages[-1]

print("Meilleur regParam :", best_lr._java_obj.getRegParam())
print("Meilleur elasticNetParam :", best_lr._java_obj.getElasticNetParam())


Meilleur regParam : 0.05
Meilleur elasticNetParam : 0.0


In [25]:
# --- Accuracy ---
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="Exited", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator_acc.evaluate(predictions)

# --- Pr√©cision ---
evaluator_prec = MulticlassClassificationEvaluator(
    labelCol="Exited", predictionCol="prediction", metricName="weightedPrecision"
)
precision = evaluator_prec.evaluate(predictions)

# --- Rappel ---
evaluator_rec = MulticlassClassificationEvaluator(
    labelCol="Exited", predictionCol="prediction", metricName="weightedRecall"
)
recall = evaluator_rec.evaluate(predictions)

# --- F1-score ---
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Exited", predictionCol="prediction", metricName="f1"
)
f1 = evaluator_f1.evaluate(predictions)

print(f"‚úÖ √âvaluation du mod√®le :")
print(f"AUC-ROC : {auc:.4f}")
print(f"Accuracy : {accuracy:.4f}")
print(f"Precision : {precision:.4f}")
print(f"Recall : {recall:.4f}")
print(f"F1-score : {f1:.4f}")


‚úÖ √âvaluation du mod√®le :
AUC-ROC : 0.8290
Accuracy : 0.7372
Precision : 0.7377
Recall : 0.7372
F1-score : 0.7370


In [26]:
predictions.show(5)

+------+-----------+------------------+---------+---------------+------+-------------+---------+--------------+---------------+------------+--------------------+--------------------+--------------------+--------------------+----------+
|Exited|CreditScore|               Age|  Balance|EstimatedSalary|Tenure|NumOfProducts|HasCrCard|IsActiveMember|Geography_Index|Gender_Index|  assembled_features|     scaled_features|       rawPrediction|         probability|prediction|
+------+-----------+------------------+---------+---------------+------+-------------+---------+--------------+---------------+------------+--------------------+--------------------+--------------------+--------------------+----------+
|     0|      433.0|3.4011973816621555|141325.56|        93839.3|     1|            1|        1|             1|              0|           0|[433.0,3.40119738...|[-2.3812490335084...|[1.22943310972841...|[0.77371933981225...|       0.0|
|     0|      433.0|3.7376696182833684|122189.66|       

* matrice de confusion : 

In [27]:
confusion = (
    predictions.groupBy("Exited", "prediction")
    .count()
    .orderBy("Exited", "prediction")
)

print("\n===== üîç Matrice de confusion =====")
confusion.show()



confusion_matrix = (
    predictions.groupBy("Exited")
    .pivot("prediction", [0.0, 1.0]) 
    .agg(F.count("prediction"))
    .withColumnRenamed("0.0", "Pr√©dit: 0")
    .withColumnRenamed("1.0", "Pr√©dit: 1")
    .orderBy("Exited")
)


print("\n===== üîç Matrice de confusion =====")
confusion_matrix.show()



===== üîç Matrice de confusion =====
+------+----------+-----+
|Exited|prediction|count|
+------+----------+-----+
|     0|       0.0| 1124|
|     0|       1.0|  455|
|     1|       0.0|  378|
|     1|       1.0| 1213|
+------+----------+-----+


===== üîç Matrice de confusion =====
+------+---------+---------+
|Exited|Pr√©dit: 0|Pr√©dit: 1|
+------+---------+---------+
|     0|     1124|      455|
|     1|      378|     1213|
+------+---------+---------+



* Score par combinaison d'hyperparams : 

In [28]:
print("\n===== ‚öôÔ∏è Scores pour chaque combinaison test√©e =====")
param_combinations = paramGrid
metrics = cv_model.avgMetrics

for i, params in enumerate(param_combinations):
    reg_param = params[lr_m.regParam]
    elastic_param = params[lr_m.elasticNetParam]
    auc_score = metrics[i]
    print(f"Combinaison {i+1}: regParam={reg_param}, elasticNetParam={elastic_param} --> AUC={auc_score:.4f}")



===== ‚öôÔ∏è Scores pour chaque combinaison test√©e =====
Combinaison 1: regParam=0.01, elasticNetParam=0.0 --> AUC=0.8360
Combinaison 2: regParam=0.01, elasticNetParam=0.3 --> AUC=0.8360
Combinaison 3: regParam=0.01, elasticNetParam=0.5 --> AUC=0.8359
Combinaison 4: regParam=0.01, elasticNetParam=0.7 --> AUC=0.8358
Combinaison 5: regParam=0.01, elasticNetParam=1.0 --> AUC=0.8354
Combinaison 6: regParam=0.05, elasticNetParam=0.0 --> AUC=0.8360
Combinaison 7: regParam=0.05, elasticNetParam=0.3 --> AUC=0.8352
Combinaison 8: regParam=0.05, elasticNetParam=0.5 --> AUC=0.8331
Combinaison 9: regParam=0.05, elasticNetParam=0.7 --> AUC=0.8314
Combinaison 10: regParam=0.05, elasticNetParam=1.0 --> AUC=0.8268
Combinaison 11: regParam=0.1, elasticNetParam=0.0 --> AUC=0.8359
Combinaison 12: regParam=0.1, elasticNetParam=0.3 --> AUC=0.8329
Combinaison 13: regParam=0.1, elasticNetParam=0.5 --> AUC=0.8285
Combinaison 14: regParam=0.1, elasticNetParam=0.7 --> AUC=0.8191
Combinaison 15: regParam=0.1, 

## üü¢ Sauvegarde et D√©ploiement

### 1Ô∏è‚É£ Sauvegarde du mod√®le

* Sauvegarder le mod√®le optimis√© au format natif Spark (model.write().save())

In [29]:


model_path = "../models/best_lr_model"

# Supprimer l'ancien mod√®le s'il existe
if os.path.exists(model_path):
    shutil.rmtree(model_path)

try:
    best_model.write().overwrite().save(model_path)
    print(f"‚úÖ Mod√®le sauvegard√© avec succ√®s dans : {model_path}")
    
except Exception as e:
    print(f"‚ùå Erreur lors de la sauvegarde : {str(e)}")


‚ùå Erreur lors de la sauvegarde : An error occurred while calling o32685.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationSco

In [30]:


model_path_file = f"file:///{os.path.abspath(model_path).replace(os.sep, '/')}"

if os.path.exists(model_path):
    shutil.rmtree(model_path)

try:
    best_model.write().overwrite().save(model_path_file)
    print(f"‚úÖ Mod√®le sauvegard√© avec file:// : {model_path_file}")
except Exception as e:
    print(f"‚ùå √âchec avec file:// : {str(e)}")


‚ùå √âchec avec file:// : An error occurred while calling o32774.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withS