## <center style="color:blue;">**SparkAttriNet**</center>

Dans le secteur bancaire, anticiper la perte de clients est essentiel pour réduire le taux d’attrition et renforcer la fidélisation. 

Ce projet exploite la puissance de PySpark pour analyser de grands volumes de données, MLlib pour entraîner un modèle prédictif, MongoDB pour stocker les données transformées, et Streamlit pour visualiser les résultats et faciliter la prise de décision.

<br>

### <span style="color:green;">**Chargement des Données :**</span>

#### <span style="color:orange;">**1. Etablir la Connection avec MongoDB :**</span>

In [137]:
import pandas as pd
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")

db = client["sparkattrinet"]

collection = db["bank"]

#### <span style="color:orange;">**2. Créer la Session Spark :**</span>

In [138]:
from pyspark.sql import SparkSession

spark =(
    SparkSession.builder
    .appName("SparkAttriNet - Prédiction de l'Attrition Client Bancaire")
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

#### <span style="color:orange;">**3. Récupérer les Données et les Enregistrer sous format CSV :**</span>

In [139]:
docs = list(collection.find()) 

for doc in docs :
    doc["_id"] = str(doc['_id'])

pd.DataFrame(docs).to_csv("../data/temp/data.csv", index=False)

#### <span style="color:orange;">**4. Charger les Doonées sous format PySpark DataFrame :**</span>

In [140]:
df = spark.read.csv(
    "../data/temp/data.csv",
    header=True,
    inferSchema=True
)

df = df.drop("_id")

df.printSchema()

df.show(5,truncate=False)

root
 |-- Sex: double (nullable = true)
 |-- Region: double (nullable = true)
 |-- CreditScore: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)

+---+------+-----------+---+------+---------+-------------+---------+--------------+---------------+------+
|Sex|Region|CreditScore|Age|Tenure|Balance  |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---+------+-----------+---+------+---------+-------------+---------+--------------+---------------+------+
|1.0|0.0   |619.0      |42 |2     |0.0      |1            |1        |1             |101348.88      |1     |
|1.0|2.0   |608.0      |41 |1     |83807.86 |1            |0        |1             |112542.58      |0     |
|1.

<br>

### <span style="color:green;">**Gestion du Déséquilibre de Classes :**</span>

#### <span style="color:orange;">**1. Identifier la Classe Majoritaire :**</span>

In [141]:
p_df = df.toPandas()

print(p_df["Exited"].value_counts())

Exited
0    7959
1    2037
Name: count, dtype: int64


  [PACKAGE_NOT_INSTALLED] PyArrow >= 11.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


#### <span style="color:orange;">**2. Appliquer SMOTE Sampling :**</span>

In [142]:
from imblearn.over_sampling import SMOTE

X = p_df.drop(columns=["Exited"])
y = p_df["Exited"]

smote = SMOTE(random_state=42)
X_res, y_res = smote.fit_resample(X, y)

p_df_res = pd.concat([X_res, y_res], axis=1)

print(p_df_res["Exited"].value_counts())

Exited
1    7959
0    7959
Name: count, dtype: int64


#### <span style="color:orange;">**3. Transformer en PySpark :**</span>

In [143]:
p_df_res.to_csv("../data/temp/data.csv", index=False)

df = spark.read.csv(
    "../data/temp/data.csv",
    header=True,
    inferSchema=True
)

df.groupBy("Exited").count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 7959|
|     0| 7959|
+------+-----+



<br>

### <span style="color:green;">**Selection, Assemblage et Standarisation des Features :**</span>

#### <span style="color:orange;">**1. Sélectionner et Assembler les Features :**</span>

In [144]:
from pyspark.ml.feature import VectorAssembler

feature_cols = [col for col in df.columns if col != "Exited"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

df_vec = assembler.transform(df)

df_vec = df_vec.select("features", "Exited")

df_vec.show(2, truncate=False)

df_vec.count()

+-------------------------------------------------------+------+
|features                                               |Exited|
+-------------------------------------------------------+------+
|[1.0,0.0,619.0,42.0,2.0,0.0,1.0,1.0,1.0,101348.88]     |1     |
|[1.0,2.0,608.0,41.0,1.0,83807.86,1.0,0.0,1.0,112542.58]|0     |
+-------------------------------------------------------+------+
only showing top 2 rows


15918

#### <span style="color:orange;">**2. Standariser les Features :**</span>

In [145]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

df_scaled = scaler.fit(df_vec).transform(df_vec)

In [146]:
df_scaled = df_scaled.drop("features")

df_scaled = df_scaled.select(["scaled_features", "Exited"])

df_scaled.show(5, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|scaled_features                                                                                                                                                                                           |Exited|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|[1.0732410343916199,-1.0189022035876123,-0.3294205389157971,0.09798913406328658,-1.039040761758242,-1.3342758555096281,-0.7320491812025733,0.7748578372390554,1.3051274493010556,0.01416412973324428]     |1     |
|[1.0732410343916199,1.623344785208039,-0.44980039292599105,-0.0019142180163445482,-1.4063310916569278,0.030677462805769884,-0.7320491812025733,-1.29047

<br>

### <span style="color:green;">**Séparation des Données et Choix du Modèle :**</span>

#### <span style="color:orange;">**1. Séparation des Données :**</span>

In [147]:
train_df, test_df = df_scaled.randomSplit([0.8, 0.2], seed=42)

print("- Train count:", train_df.count())
print("- Test count:", test_df.count())

print(f"\n- Train Pourcentage : {(train_df.count() * 100)/df_scaled.count() :.2f} %")
print(f"- Test Pourcentage : {(test_df.count() * 100)/df_scaled.count() :.2f} %")

- Train count: 12797
- Test count: 3121

- Train Pourcentage : 80.39 %
- Test Pourcentage : 19.61 %


#### <span style="color:orange;">**2. Choisir le Modèle :**</span>

**Caractéristiques de Problématique :**

- **Type de problème :** Classification binaire (Exited = 0 ou 1).

- **Nombre de features :** 10 entre numériques et catégorielles encodées.

- **Taille du dataset :** Presque 16 000 lignes après Equilibrage avec SMOTE.

- Features standardisées (StandardScaler)

**Logistic Regression :**

- **Avantages :**

    - Simple et rapide à entraîner

    - Interprétable : coefficients permettent de comprendre l’importance de chaque feature

    - Fonctionne très bien sur des features standardisées

    - Bien adaptée aux datasets de taille moyenne (~10k–100k lignes)

- **Limites :**

    - Modèle linéaire → il ne capture pas les interactions complexes entre features

    - Moins performant si les relations sont non linéaires

In [148]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="Exited",
    maxIter=100,
    regParam=0.01
)

#### <span style="color:orange;">**3. Enregistrer les Variables avec ``joblib`` :**</span>

In [151]:
import joblib
df_scaled_pd = df_scaled.toPandas()
train_df_pd = train_df.toPandas()
test_df_pd = test_df.toPandas()

df_scaled_pd.to_csv("../data/scaled/data.csv", index=False)

joblib.dump(df_scaled_pd, "../utils/df_scaled.pkl")
joblib.dump(train_df_pd, "../utils/train_df.pkl")
joblib.dump(test_df_pd, "../utils/test_df.pkl")

  [PACKAGE_NOT_INSTALLED] PyArrow >= 11.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  [PACKAGE_NOT_INSTALLED] PyArrow >= 11.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  [PACKAGE_NOT_INSTALLED] PyArrow >= 11.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


['../utils/test_df.pkl']

<br>

### <span style="color:green;">**Construction du Pipeline :**</span>

In [150]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, scaler, lr]) 