In [1]:
! pip install imbalanced-learn

Collecting imbalanced-learn
  Downloading imbalanced_learn-0.14.0-py3-none-any.whl (239 kB)
     ------------------------------------ 240.0/240.0 kB 918.9 kB/s eta 0:00:00
Collecting scipy<2,>=1.11.4
  Downloading scipy-1.16.3-cp311-cp311-win_amd64.whl (38.7 MB)
     ---------------------------------------- 38.7/38.7 MB 9.1 MB/s eta 0:00:00
Collecting scikit-learn<2,>=1.4.2
  Downloading scikit_learn-1.7.2-cp311-cp311-win_amd64.whl (8.9 MB)
     ---------------------------------------- 8.9/8.9 MB 10.2 MB/s eta 0:00:00
Collecting joblib<2,>=1.2.0
  Downloading joblib-1.5.2-py3-none-any.whl (308 kB)
     ------------------------------------- 308.4/308.4 kB 18.6 MB/s eta 0:00:00
Collecting threadpoolctl<4,>=2.0.0
  Downloading threadpoolctl-3.6.0-py3-none-any.whl (18 kB)
Installing collected packages: threadpoolctl, scipy, joblib, scikit-learn, imbalanced-learn
Successfully installed imbalanced-learn-0.14.0 joblib-1.5.2 scikit-learn-1.7.2 scipy-1.16.3 threadpoolctl-3.6.0




In [2]:
# √âTAPE 1 : Arr√™ter compl√®tement Spark
import os
import sys

try:
    spark.stop()
    print("‚úì Spark session arr√™t√©e")
except:
    print("Aucune session Spark √† arr√™ter")

# Attendre un peu
import time
time.sleep(3)

# √âTAPE 2 : Configuration des variables d'environnement
# Trouver le chemin Python actuel
python_path = sys.executable
print(f"Python path: {python_path}")

os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

# √âTAPE 3 : Cr√©er une nouvelle session Spark avec configuration robuste
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Bank Churn Prediction") \
    .master("local[1]") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.memory", "1g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.python.worker.reuse", "true") \
    .config("spark.executor.heartbeatInterval", "120s") \
    .config("spark.network.timeout", "800s") \
    .config("spark.rpc.askTimeout", "600s") \
    .config("spark.core.connection.ack.wait.timeout", "600s") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

print("‚úì Nouvelle session Spark cr√©√©e")

# √âTAPE 4 : R√©cup√©rer et pr√©parer les donn√©es
from pymongo import MongoClient
import pandas as pd

client = MongoClient("mongodb://localhost:27017/")
db = client["clients_bancaires"]
collection = db["df_clean_collection"]

# R√©cup√©rer les donn√©es
data = list(collection.find())
df_pandas = pd.DataFrame(data)

print(f"‚úì Donn√©es r√©cup√©r√©es: {len(df_pandas)} lignes")

# Supprimer _id et colonnes non n√©cessaires
columns_to_drop = ['_id', 'CustomerId', 'Surname']
for col in columns_to_drop:
    if col in df_pandas.columns:
        df_pandas = df_pandas.drop(col, axis=1)

# Traiter GeographyVec
df_pandas['Geography_France'] = df_pandas['GeographyVec'].apply(lambda x: float(x[0]) if isinstance(x, list) and len(x) > 0 else 0.0)
df_pandas['Geography_Germany'] = df_pandas['GeographyVec'].apply(lambda x: float(x[1]) if isinstance(x, list) and len(x) > 1 else 0.0)
df_pandas = df_pandas.drop('GeographyVec', axis=1)

# Convertir toutes les colonnes en float
for col in df_pandas.columns:
    if col not in ['Geography_France', 'Geography_Germany']:
        df_pandas[col] = pd.to_numeric(df_pandas[col], errors='coerce')

# Supprimer les lignes avec des valeurs nulles
df_pandas = df_pandas.dropna()

print(f"‚úì Donn√©es nettoy√©es: {len(df_pandas)} lignes")
print(f"‚úì Colonnes: {list(df_pandas.columns)}")

# √âTAPE 5 : Cr√©er le DataFrame Spark avec gestion d'erreur
try:
    # Convertir en Spark DataFrame
    df_spark = spark.createDataFrame(df_pandas)
    
    # Forcer l'action (cache pour √©viter les recalculs)
    df_spark = df_spark.cache()
    
    # Compter avec try-except
    count = df_spark.count()
    print(f"‚úì DataFrame Spark cr√©√© avec succ√®s: {count} lignes")
    
    # Afficher le sch√©ma
    print("\n‚úì Sch√©ma du DataFrame:")
    df_spark.printSchema()
    
    # Afficher un √©chantillon
    print("\n‚úì √âchantillon des donn√©es:")
    df_spark.show(5)
    
except Exception as e:
    print(f"‚ùå Erreur lors de la cr√©ation du DataFrame Spark: {e}")
    print("\n‚ö†Ô∏è Utilisation de Pandas/Scikit-learn √† la place...")
    
    # PLAN B : Utiliser directement Pandas et scikit-learn
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix, accuracy_score, f1_score
    from imblearn.under_sampling import RandomUnderSampler
    
    print("\n" + "="*60)
    print("PIPELINE AVEC SCIKIT-LEARN")
    print("="*60)
    
    # Pr√©parer X et y
    feature_columns = ['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 
                       'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'GenderIndex',
                       'Geography_France', 'Geography_Germany']
    
    X = df_pandas[feature_columns]
    y = df_pandas['Exited'].astype(int)
    
    print(f"\n‚úì Features: {feature_columns}")
    print(f"‚úì Target distribution:\n{y.value_counts()}")
    
    # Gestion du d√©s√©quilibre
    rus = RandomUnderSampler(random_state=42)
    X_balanced, y_balanced = rus.fit_resample(X, y)
    
    print(f"\n‚úì Apr√®s undersampling:\n{pd.Series(y_balanced).value_counts()}")
    
    # Split
    X_train, X_test, y_train, y_test = train_test_split(
        X_balanced, y_balanced, test_size=0.2, random_state=42
    )
    
    # Normalisation
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    print(f"\n‚úì Train: {len(X_train)} | Test: {len(X_test)}")
    
    # Entra√Æner plusieurs mod√®les
    models = {
        'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
        'Random Forest': RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42),
        'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, max_depth=5, random_state=42)
    }
    
    print("\n" + "="*60)
    print("ENTRA√éNEMENT ET √âVALUATION DES MOD√àLES")
    print("="*60)
    
    results = {}
    
    for name, model in models.items():
        print(f"\n{name}:")
        
        # Entra√Ænement
        model.fit(X_train_scaled, y_train)
        
        # Pr√©dictions
        y_pred = model.predict(X_test_scaled)
        y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]
        
        # M√©triques
        acc = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        auc = roc_auc_score(y_test, y_pred_proba)
        
        results[name] = {'accuracy': acc, 'f1': f1, 'auc': auc}
        
        print(f"  - Accuracy: {acc:.4f}")
        print(f"  - F1-Score: {f1:.4f}")
        print(f"  - AUC-ROC: {auc:.4f}")
        
        # Matrice de confusion
        print(f"\n  Confusion Matrix:")
        print(confusion_matrix(y_test, y_pred))
    
    # Meilleur mod√®le
    best_model_name = max(results, key=lambda x: results[x]['auc'])
    print(f"\nüèÜ Meilleur mod√®le: {best_model_name}")
    print(f"   AUC-ROC: {results[best_model_name]['auc']:.4f}")
    
    # Feature importance (si Random Forest ou Gradient Boosting)
    if best_model_name in ['Random Forest', 'Gradient Boosting']:
        best_model = models[best_model_name]
        importances = best_model.feature_importances_
        
        print(f"\n‚úì Feature Importance ({best_model_name}):")
        for feature, importance in sorted(zip(feature_columns, importances), 
                                         key=lambda x: x[1], reverse=True):
            print(f"  {feature}: {importance:.4f}")

Aucune session Spark √† arr√™ter
Python path: c:\Users\elkho\OneDrive\Desktop\IA\Briefs\breif6_PredictiondelAttritionClientBancaire\venv\Scripts\python.exe
‚úì Nouvelle session Spark cr√©√©e
‚úì Donn√©es r√©cup√©r√©es: 10000 lignes
‚úì Donn√©es nettoy√©es: 10000 lignes
‚úì Colonnes: ['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'Exited', 'GenderIndex', 'Geography_France', 'Geography_Germany']
‚úì DataFrame Spark cr√©√© avec succ√®s: 10000 lignes

‚úì Sch√©ma du DataFrame:
root
 |-- CreditScore: long (nullable = true)
 |-- Age: long (nullable = true)
 |-- Tenure: long (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: long (nullable = true)
 |-- HasCrCard: long (nullable = true)
 |-- IsActiveMember: long (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: long (nullable = true)
 |-- GenderIndex: double (nullable = true)
 |-- Geography_France: double (nullable = true)
 |--

In [3]:
! pip install pyspark==3.5.1

Collecting pyspark==3.5.1
  Using cached pyspark-3.5.1.tar.gz (317.0 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.9
    Uninstalling py4j-0.10.9.9:
      Successfully uninstalled py4j-0.10.9.9
  Attempting uninstall: pyspark
    Found existing installation: pyspark 4.0.1
    Uninstalling pyspark-4.0.1:


ERROR: Could not install packages due to an OSError: [WinError 32] Le processus ne peut pas acc√©der au fichier car ce fichier est utilis√© par un autre processus: 'c:\\users\\elkho\\onedrive\\desktop\\ia\\briefs\\breif6_predictiondelattritionclientbancaire\\venv\\lib\\site-packages\\pyspark\\jars\\aircompressor-2.0.2.jar'
Check the permissions.



In [4]:
! pip install pymongo
! pip install pandas
! pip install seaborn













In [5]:
# ============================================
# √âTAPE 6 : CONSTRUCTION DU PIPELINE ML
# ============================================

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import pymongo
from pymongo import MongoClient
import pandas as pd
from pyspark.sql import SparkSession
from seaborn import boxplot

spark = (
    SparkSession.builder
        .appName("Prediction")
        .master("local[*]")
        .getOrCreate()
)
print("Spark Session created successfully")
print("Version de Spark :", spark.version)

# ============================================
# √âTAPE 6.1 : R√©cup√©rer les donn√©es depuis MongoDB
# ============================================

# Connexion √† MongoDB
# client = MongoClient("mongodb://localhost:27017/")
# db = client["bank_churn_db"]
# collection = db["preprocessed_data"]

client = MongoClient("mongodb://localhost:27017/")
db = client["clients_bancaires"]
collection = db["df_clean_collection"]

# R√©cup√©rer toutes les donn√©es
donnees_mongo = list(collection.find({}))

# Convertir en DataFrame Pandas
df_pandas = pd.DataFrame(donnees_mongo)
# ‚úÖ SOLUTION : Supprimer la colonne _id AVANT conversion Spark
if '_id' in df_pandas.columns:
    df_pandas = df_pandas.drop('_id', axis=1)
# Convertir en DataFrame Spark
df = spark.createDataFrame(df_pandas)

# Afficher les premi√®res lignes
print("Donn√©es charg√©es depuis MongoDB:")
df.show(5)

# Afficher le sch√©ma
print("Sch√©ma des donn√©es:")
df.printSchema()

# Fermer la connexion MongoDB
client.close()

# ============================================
# √âTAPE 6.2 : V√©rifier et g√©rer le d√©s√©quilibre de classes
# ============================================

# Compter combien de clients ont Exited=1 et Exited=0
nombre_exited_1 = df.filter(col("Exited") == 1).count()
nombre_exited_0 = df.filter(col("Exited") == 0).count()

print(f"Nombre de clients avec Exited=1 (churn): {nombre_exited_1}")
print(f"Nombre de clients avec Exited=0 (pas de churn): {nombre_exited_0}")

# Afficher la distribution
print("Distribution des classes:")
df.groupBy("Exited").count().show()

# Si les classes sont tr√®s d√©s√©quilibr√©es, faire undersampling
if nombre_exited_1 < nombre_exited_0:
    # La classe 1 est minoritaire
    nombre_a_garder = nombre_exited_1
    df_classe_1 = df.filter(col("Exited") == 1)
    df_classe_0 = df.filter(col("Exited") == 0)
    # √âchantillonner la classe 0 pour avoir le m√™me nombre
    df_classe_0_echantillon = df_classe_0.sample(False, nombre_a_garder / nombre_exited_0, seed=42)
    df_equilibre = df_classe_1.union(df_classe_0_echantillon)
else:
    # La classe 0 est minoritaire
    nombre_a_garder = nombre_exited_0
    df_classe_1 = df.filter(col("Exited") == 1)
    df_classe_0 = df.filter(col("Exited") == 0)
    # √âchantillonner la classe 1 pour avoir le m√™me nombre
    df_classe_1_echantillon = df_classe_1.sample(False, nombre_a_garder / nombre_exited_1, seed=42)
    df_equilibre = df_classe_0.union(df_classe_1_echantillon)

print("Apr√®s √©quilibrage:")
df_equilibre.groupBy("Exited").count().show()

# ============================================
# √âTAPE 6.3 : D√©finir les colonnes features
# ============================================

# Toutes les colonnes num√©riques (features)
colonnes_features = [
    "CreditScore",
    "Age",
    "Tenure",
    "Balance",
    "NumOfProducts",
    "HasCrCard",
    "IsActiveMember",
    "EstimatedSalary",
    "gender_indexed",      # D√©j√† encod√©e
    "geography_indexed"    # D√©j√† encod√©e
]

# Colonne cible
colonne_cible = "Exited"

# V√©rifier que toutes les colonnes existent
print("V√©rification des colonnes:")
for col_name in colonnes_features:
    if col_name in df_equilibre.columns:
        print(f"‚úì {col_name} existe")
    else:
        print(f"‚úó {col_name} MANQUANTE!")

# ============================================
# √âTAPE 6.4 : Assembler toutes les features dans un vecteur
# ============================================

# Cr√©er le VectorAssembler
assembler = VectorAssembler(
    inputCols=colonnes_features,
    outputCol="features_raw"
)

# Appliquer l'assembler
df_avec_features = assembler.transform(df_equilibre)

# Afficher le r√©sultat
print("Features assembl√©es (premiers exemples):")
df_avec_features.select(colonnes_features + ["features_raw"]).show(5, truncate=False)

# ============================================
# √âTAPE 6.5 : Normaliser les features avec StandardScaler
# ============================================

# Cr√©er le StandardScaler
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,  # Normaliser avec √©cart-type
    withMean=True  # Centrer sur la moyenne
)

# Entra√Æner le scaler sur les donn√©es
scaler_model = scaler.fit(df_avec_features)

# Appliquer la normalisation
df_final = scaler_model.transform(df_avec_features)

# Afficher le r√©sultat
print("Features normalis√©es (premiers exemples):")
df_final.select("features_raw", "features").show(5, truncate=False)

# ============================================
# √âTAPE 6.6 : Encoder la colonne cible (Exited) en "label"
# ============================================

# Cr√©er le StringIndexer pour la colonne cible
indexer_target = StringIndexer(inputCol="Exited", outputCol="label")

# Appliquer l'encodage
df_final = indexer_target.fit(df_final).transform(df_final)

# Afficher
print("Label encod√© (Exited -> label):")
df_final.select("Exited", "label").show(10)

# ============================================
# √âTAPE 6.7 : S√©parer les donn√©es en train (80%) et test (20%)
# ============================================

# S√©paration al√©atoire avec seed=42 pour reproductibilit√©
train_data, test_data = df_equilibre.randomSplit([0.8, 0.2], seed=42)

print(f"Nombre d'exemples d'entra√Ænement: {train_data.count()}")
print(f"Nombre d'exemples de test: {test_data.count()}")

# V√©rifier la distribution dans chaque set
print("Distribution dans train_data:")
train_data.groupBy("Exited").count().show()

print("Distribution dans test_data:")
test_data.groupBy("Exited").count().show()

# ============================================
# √âTAPE 6.8 : Choisir et cr√©er le mod√®le MLlib
# ============================================

# Random Forest Classifier (bon choix pour d√©butants)
modele = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,      # Nombre d'arbres dans la for√™t
    maxDepth=10,       # Profondeur maximale des arbres
    seed=42
)

print("Mod√®le choisi: Random Forest Classifier")

# ============================================
# √âTAPE 6.9 : Construire le Pipeline complet
# ============================================

# Cr√©er le pipeline avec toutes les √©tapes
pipeline = Pipeline(stages=[
    assembler,          # Assembler les features
    scaler,             # Normaliser
    indexer_target,     # Encoder la cible
    modele              # Entra√Æner le mod√®le
])

print("Pipeline cr√©√© avec les √©tapes:")
print("1. VectorAssembler")
print("2. StandardScaler")
print("3. StringIndexer (target)")
print("4. RandomForestClassifier")

# ============================================
# √âTAPE 6.10 : Entra√Æner le mod√®le
# ============================================

print("\n" + "="*50)
print("D√©but de l'entra√Ænement du mod√®le...")
print("="*50)

pipeline_model = pipeline.fit(train_data)

print("‚úì Entra√Ænement termin√©!")

# ============================================
# √âTAPE 6.11 : Faire des pr√©dictions sur les donn√©es de test
# ============================================

print("\nFaire des pr√©dictions sur les donn√©es de test...")
predictions = pipeline_model.transform(test_data)

# Afficher les pr√©dictions
print("\nExemples de pr√©dictions:")
predictions.select("Exited", "label", "prediction", "probability").show(20, truncate=False)

# ============================================
# √âTAPE 6.12 : √âvaluer les performances du mod√®le
# ============================================

# √âvaluateur pour l'AUC (Area Under Curve) - m√©trique principale
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator_auc.evaluate(predictions)
print(f"\n{'='*50}")
print(f"AUC (Area Under ROC): {auc:.4f}")
print(f"{'='*50}")

# √âvaluateur pour la pr√©cision (accuracy)
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator_accuracy.evaluate(predictions)
print(f"Accuracy (Pr√©cision): {accuracy:.4f}")

# √âvaluateur pour la pr√©cision par classe (precision)
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="weightedPrecision"
)

precision = evaluator_precision.evaluate(predictions)
print(f"Precision: {precision:.4f}")

# √âvaluateur pour le rappel (recall)
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="weightedRecall"
)

recall = evaluator_recall.evaluate(predictions)
print(f"Recall: {recall:.4f}")

# ============================================
# √âTAPE 6.13 : Afficher la matrice de confusion (simple)
# ============================================

print("\nMatrice de confusion:")
predictions.groupBy("label", "prediction").count().orderBy("label", "prediction").show()

# ============================================
# √âTAPE 6.14 : Sauvegarder le mod√®le entra√Æn√©
# ============================================

# Sauvegarder le pipeline complet
chemin_modele = "models/churn_prediction_model"
pipeline_model.write().overwrite().save(chemin_modele)

print(f"\n‚úì Mod√®le sauvegard√© dans '{chemin_modele}'")

# Pour charger plus tard, utilisez :
# from pyspark.ml import PipelineModel
# pipeline_model_charge = PipelineModel.load("models/churn_prediction_model")

print("\n" + "="*50)
print("PIPELINE TERMIN√â AVEC SUCC√àS!")
print("="*50)

Spark Session created successfully
Version de Spark : 4.0.1
Donn√©es charg√©es depuis MongoDB:
+----------+--------+-----------+---+------+---------+-------------+---------+--------------+---------------+------+------------+-----------+
|CustomerId| Surname|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyVec|GenderIndex|
+----------+--------+-----------+---+------+---------+-------------+---------+--------------+---------------+------+------------+-----------+
|  15634602|Hargrave|        619| 42|     2|        0|            1|        1|             1|      101348.88|     1|  [1.0, 0.0]|        1.0|
|  15647311|    Hill|        608| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|  [0.0, 0.0]|        1.0|
|  15619304|    Onio|        502| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|  [1.0, 0.0]|        1.0|
|  15701354|    Boni|        699| 39|     1|        0

IllegalArgumentException: [FIELD_NOT_FOUND] No such struct field `gender_indexed` in `CustomerId`, `Surname`, `CreditScore`, `Age`, `Tenure`, `Balance`, `NumOfProducts`, `HasCrCard`, `IsActiveMember`, `EstimatedSalary`, `Exited`, `GeographyVec`, `GenderIndex`. SQLSTATE: 42704

In [None]:
import platform, pyspark
print(platform.python_version())
print(pyspark.version)

3.13.7
<module 'pyspark.version' from 'c:\\Users\\elkho\\OneDrive\\Desktop\\IA\\Briefs\\breif6_PredictiondelAttritionClientBancaire\\venv\\Lib\\site-packages\\pyspark\\version.py'>
