# Classification du Risque de Faillite de Prêt
## Home Credit Default Risk — Modélisation ML sur Apache Spark

---

**Contexte :** Dans ce notebook, nous entraînons 3 modèles de Machine Learning sur Spark pour prédire le risque de défaut de remboursement d'un prêt (variable cible : `TARGET`).

**Données :** Fichier Parquet fusionné issu des Jobs Glue (application_train + bureau + bureau_balance + previous_application + installments_payments + POS_CASH_balance + credit_card_balance)

**Modèles entraînés :**
- Régression Logistique
- Random Forest
- Gradient Boosted Trees (GBT — équivalent XGBoost sur Spark)

**Pipeline ML :** Imputation → Encodage → Assemblage → Standardisation → Modèle

---
## Partie 1 — Initialisation et Chargement des Données

### 1.1 — Initialisation de la session Spark

In [None]:
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.appName('ML_Credit_Scoring').getOrCreate()
print('Session Spark démarrée !')

### 1.2 — Chargement du fichier Parquet fusionné (output Job Glue)

In [None]:
df = spark.read.parquet('s3://projet-big-data-credit-beloin-lucas/Sortie Job3/')
print(f'Dimensions : {df.count()} lignes x {len(df.columns)} colonnes')

---
## Partie 2 — Préparation et Nettoyage des Données

### 2.1 — Nettoyage des noms de colonnes

Les colonnes issues des Jobs Glue contiennent des caractères spéciaux (`#`, `()`, `.`) qui doivent être supprimés pour être compatibles avec Spark ML.

In [None]:
def clean_column_name(name):
    clean = re.sub(r'[^a-zA-Z0-9]', '_', name)
    clean = re.sub(r'_+', '_', clean).strip('_')
    if not clean:
        clean = 'col_unknown'
    return clean

seen = {}
for col_name in df.columns:
    new_name = clean_column_name(col_name)
    if new_name in seen:
        seen[new_name] += 1
        new_name = f"{new_name}_{seen[new_name]}"
    else:
        seen[new_name] = 0
    if new_name != col_name:
        df = df.withColumnRenamed(col_name, new_name)

print('Colonnes nettoyées !')

### 2.2 — Cast des colonnes numériques

Les fichiers CSV lus par Glue sont souvent importés en `string`. On convertit ici toutes les variables numériques en `double` pour que Spark ML puisse les traiter correctement.

In [None]:
cols_to_cast = [
    'CNT_CHILDREN', 'AMT_INCOME_TOTAL', 'AMT_CREDIT', 'AMT_ANNUITY', 'AMT_GOODS_PRICE',
    'REGION_POPULATION_RELATIVE', 'DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_REGISTRATION',
    'DAYS_ID_PUBLISH', 'OWN_CAR_AGE', 'FLAG_MOBIL', 'FLAG_EMP_PHONE', 'FLAG_WORK_PHONE',
    'FLAG_CONT_MOBILE', 'FLAG_PHONE', 'FLAG_EMAIL', 'CNT_FAM_MEMBERS',
    'REGION_RATING_CLIENT', 'REGION_RATING_CLIENT_W_CITY', 'HOUR_APPR_PROCESS_START',
    'REG_REGION_NOT_LIVE_REGION', 'REG_REGION_NOT_WORK_REGION', 'LIVE_REGION_NOT_WORK_REGION',
    'REG_CITY_NOT_LIVE_CITY', 'REG_CITY_NOT_WORK_CITY', 'LIVE_CITY_NOT_WORK_CITY',
    'EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3',
    'APARTMENTS_AVG', 'BASEMENTAREA_AVG', 'YEARS_BEGINEXPLUATATION_AVG', 'YEARS_BUILD_AVG',
    'COMMONAREA_AVG', 'ELEVATORS_AVG', 'ENTRANCES_AVG', 'FLOORSMAX_AVG', 'FLOORSMIN_AVG',
    'LANDAREA_AVG', 'LIVINGAPARTMENTS_AVG', 'LIVINGAREA_AVG', 'NONLIVINGAPARTMENTS_AVG',
    'NONLIVINGAREA_AVG', 'APARTMENTS_MODE', 'BASEMENTAREA_MODE', 'YEARS_BEGINEXPLUATATION_MODE',
    'YEARS_BUILD_MODE', 'COMMONAREA_MODE', 'ELEVATORS_MODE', 'ENTRANCES_MODE', 'FLOORSMAX_MODE',
    'FLOORSMIN_MODE', 'LANDAREA_MODE', 'LIVINGAPARTMENTS_MODE', 'LIVINGAREA_MODE',
    'NONLIVINGAPARTMENTS_MODE', 'NONLIVINGAREA_MODE', 'APARTMENTS_MEDI', 'BASEMENTAREA_MEDI',
    'YEARS_BEGINEXPLUATATION_MEDI', 'YEARS_BUILD_MEDI', 'COMMONAREA_MEDI', 'ELEVATORS_MEDI',
    'ENTRANCES_MEDI', 'FLOORSMAX_MEDI', 'FLOORSMIN_MEDI', 'LANDAREA_MEDI',
    'LIVINGAPARTMENTS_MEDI', 'LIVINGAREA_MEDI', 'NONLIVINGAPARTMENTS_MEDI', 'NONLIVINGAREA_MEDI',
    'TOTALAREA_MODE', 'OBS_30_CNT_SOCIAL_CIRCLE', 'DEF_30_CNT_SOCIAL_CIRCLE',
    'OBS_60_CNT_SOCIAL_CIRCLE', 'DEF_60_CNT_SOCIAL_CIRCLE', 'DAYS_LAST_PHONE_CHANGE',
    'FLAG_DOCUMENT_2', 'FLAG_DOCUMENT_3', 'FLAG_DOCUMENT_4', 'FLAG_DOCUMENT_5',
    'FLAG_DOCUMENT_6', 'FLAG_DOCUMENT_7', 'FLAG_DOCUMENT_8', 'FLAG_DOCUMENT_9',
    'FLAG_DOCUMENT_10', 'FLAG_DOCUMENT_11', 'FLAG_DOCUMENT_12', 'FLAG_DOCUMENT_13',
    'FLAG_DOCUMENT_14', 'FLAG_DOCUMENT_15', 'FLAG_DOCUMENT_16', 'FLAG_DOCUMENT_17',
    'FLAG_DOCUMENT_18', 'FLAG_DOCUMENT_19', 'FLAG_DOCUMENT_20', 'FLAG_DOCUMENT_21',
    'AMT_REQ_CREDIT_BUREAU_HOUR', 'AMT_REQ_CREDIT_BUREAU_DAY', 'AMT_REQ_CREDIT_BUREAU_WEEK',
    'AMT_REQ_CREDIT_BUREAU_MON', 'AMT_REQ_CREDIT_BUREAU_QRT', 'AMT_REQ_CREDIT_BUREAU_YEAR'
]

for c in cols_to_cast:
    if c in df.columns:
        df = df.withColumn(c, F.col(c).cast('double'))

df = df.withColumn('TARGET', F.col('TARGET').cast('int'))
print('Cast effectué !')

### 2.3 — Identification des features et traitement des valeurs manquantes

- **Variables numériques** : imputation par la médiane (via `Imputer`)
- **Variables catégorielles** : remplacement des valeurs nulles par `'Unknown'`

> **Note :** Les NULLs sont normaux sur ce dataset — ils proviennent des Left Joins (clients sans historique bureau, sans carte de crédit, etc.)

In [None]:
excluded_cols = ['TARGET', 'SK_ID_CURR', 'right_sk_id_curr']

numeric_features = [
    f.name for f in df.schema.fields
    if isinstance(f.dataType, (DoubleType, IntegerType, LongType))
    and f.name not in excluded_cols
]

categorical_features = [
    f.name for f in df.schema.fields
    if isinstance(f.dataType, StringType)
    and f.name not in excluded_cols
]

# Remplacement des nulls dans les catégorielles
for c in categorical_features:
    df = df.withColumn(c, F.when(
        F.col(c).isNull() | (F.col(c) == ''), 'Unknown'
    ).otherwise(F.col(c)))

print(f'Variables numériques    : {len(numeric_features)}')
print(f'Variables catégorielles : {len(categorical_features)}')

### 2.4 — Split Train / Test (80% / 20%)

Le split est stratifié via `seed=42` pour garantir la reproductibilité. Le taux de défaut (~8%) doit être similaire dans les deux sous-ensembles.

In [None]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print(f'Train : {train_df.count()} lignes')
print(f'Test  : {test_df.count()} lignes')

---
## Partie 3 — Construction du Pipeline ML

Le pipeline Spark ML enchaîne automatiquement toutes les étapes de transformation :

1. **`Imputer`** — remplace les NULLs numériques par la médiane
2. **`StringIndexer`** — convertit les catégories texte en indices numériques
3. **`OneHotEncoder`** — transforme les indices en vecteurs binaires
4. **`VectorAssembler`** — assemble toutes les features en un seul vecteur
5. **`StandardScaler`** — normalise les features (utile pour la Régression Logistique)

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer

# 1. Imputation des valeurs manquantes (médiane)
imputer = Imputer(
    inputCols=numeric_features,
    outputCols=[c + '_imputed' for c in numeric_features],
    strategy='median'
)

# 2. Encodage des variables catégorielles
indexers = [
    StringIndexer(inputCol=c, outputCol=c + '_idx', handleInvalid='keep')
    for c in categorical_features
]

encoders = [
    OneHotEncoder(inputCol=c + '_idx', outputCol=c + '_vec')
    for c in categorical_features
]

# 3. Assemblage de toutes les features en un seul vecteur
assembler_inputs = (
    [c + '_imputed' for c in numeric_features] +
    [c + '_vec' for c in categorical_features]
)

assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol='assembled_features',
    handleInvalid='keep'
)

# 4. Standardisation (nécessaire pour la Régression Logistique)
scaler = StandardScaler(
    inputCol='assembled_features',
    outputCol='features',
    withStd=True,
    withMean=False  # False car vecteurs sparse (One-Hot)
)

print('Pipeline configuré !')
print(f'Total features en entrée : {len(assembler_inputs)}')

---
## Partie 4 — Modèle 1 : Régression Logistique

La régression logistique est un modèle linéaire de référence pour la classification binaire. Elle est rapide à entraîner et facilement interprétable.

**Hyperparamètres :**
- `maxIter=10` : nombre d'itérations de la descente de gradient
- `regParam=0.01` : régularisation L2 (Ridge) pour éviter le surapprentissage
- `elasticNetParam=0.0` : 0 = Ridge pur, 1 = Lasso

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(
    featuresCol='features',
    labelCol='TARGET',
    maxIter=10,
    regParam=0.01,
    elasticNetParam=0.0
)

pipeline_lr = Pipeline(
    stages=[imputer] + indexers + encoders + [assembler, scaler, lr]
)

model_lr = pipeline_lr.fit(train_df)
print('Modèle entraîné !')

### 4.1 — Évaluation de la Régression Logistique

**Métriques utilisées :**
- **AUC-ROC** : métrique principale — mesure la capacité du modèle à distinguer les défauts des non-défauts (1.0 = parfait, 0.5 = aléatoire)
- **Accuracy** : trompeuse ici car le dataset est déséquilibré (8% de défauts)
- **F1-Score** : compromis précision/rappel — plus fiable sur données déséquilibrées
- **Précision** : parmi les défauts prédits, combien sont réels ?
- **Rappel** : parmi les vrais défauts, combien sont détectés ?

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

preds_lr = model_lr.transform(test_df)

auc_eval  = BinaryClassificationEvaluator(labelCol='TARGET', metricName='areaUnderROC')
acc_eval  = MulticlassClassificationEvaluator(labelCol='TARGET', predictionCol='prediction', metricName='accuracy')
f1_eval   = MulticlassClassificationEvaluator(labelCol='TARGET', predictionCol='prediction', metricName='f1')
prec_eval = MulticlassClassificationEvaluator(labelCol='TARGET', predictionCol='prediction', metricName='weightedPrecision')
rec_eval  = MulticlassClassificationEvaluator(labelCol='TARGET', predictionCol='prediction', metricName='weightedRecall')

auc_lr  = auc_eval.evaluate(preds_lr)
acc_lr  = acc_eval.evaluate(preds_lr)
f1_lr   = f1_eval.evaluate(preds_lr)
prec_lr = prec_eval.evaluate(preds_lr)
rec_lr  = rec_eval.evaluate(preds_lr)

print('=== Régression Logistique ===')
print(f'AUC-ROC   : {auc_lr:.4f}')
print(f'Accuracy  : {acc_lr:.4f}')
print(f'F1-Score  : {f1_lr:.4f}')
print(f'Précision : {prec_lr:.4f}')
print(f'Rappel    : {rec_lr:.4f}')

---
## Partie 5 — Modèle 2 : Random Forest

Le Random Forest est un ensemble de arbres de décision entraînés sur des sous-échantillons aléatoires. Il est robuste au surapprentissage et ne nécessite pas de standardisation.

**Hyperparamètres :**
- `numTrees=100` : nombre d'arbres dans la forêt
- `maxDepth=10` : profondeur maximale de chaque arbre

> **Note :** Le `StandardScaler` n'est pas utilisé pour le Random Forest — les arbres de décision sont insensibles à l'échelle des variables.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol='assembled_features',
    labelCol='TARGET',
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Pas besoin de scaler pour Random Forest
pipeline_rf = Pipeline(
    stages=[imputer] + indexers + encoders + [assembler, rf]
)

model_rf = pipeline_rf.fit(train_df)
print('Modèle entraîné !')

### 5.1 — Évaluation du Random Forest

In [None]:
preds_rf = model_rf.transform(test_df)

auc_rf  = auc_eval.evaluate(preds_rf)
acc_rf  = acc_eval.evaluate(preds_rf)
f1_rf   = f1_eval.evaluate(preds_rf)
prec_rf = prec_eval.evaluate(preds_rf)
rec_rf  = rec_eval.evaluate(preds_rf)

print('=== Random Forest ===')
print(f'AUC-ROC   : {auc_rf:.4f}')
print(f'Accuracy  : {acc_rf:.4f}')
print(f'F1-Score  : {f1_rf:.4f}')
print(f'Précision : {prec_rf:.4f}')
print(f'Rappel    : {rec_rf:.4f}')

---
## Partie 6 — Modèle 3 : Gradient Boosted Trees (GBT / XGBoost)

Le GBT est un algorithme de boosting qui construit les arbres séquentiellement, chaque arbre corrigeant les erreurs du précédent. C'est l'équivalent Spark natif de XGBoost.

**Hyperparamètres :**
- `maxIter=100` : nombre d'arbres (itérations de boosting)
- `maxDepth=5` : profondeur maximale — plus faible que RF pour éviter le surapprentissage

> **Note :** Le GBT est généralement le plus performant des 3 sur des données tabulaires, mais aussi le plus lent à entraîner.

In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    featuresCol='assembled_features',
    labelCol='TARGET',
    maxIter=100,
    maxDepth=5,
    seed=42
)

pipeline_gbt = Pipeline(
    stages=[imputer] + indexers + encoders + [assembler, gbt]
)

model_gbt = pipeline_gbt.fit(train_df)
print('Modèle entraîné !')

### 6.1 — Évaluation du GBT

In [None]:
preds_gbt = model_gbt.transform(test_df)

auc_gbt  = auc_eval.evaluate(preds_gbt)
acc_gbt  = acc_eval.evaluate(preds_gbt)
f1_gbt   = f1_eval.evaluate(preds_gbt)
prec_gbt = prec_eval.evaluate(preds_gbt)
rec_gbt  = rec_eval.evaluate(preds_gbt)

print('=== GBT (XGBoost) ===')
print(f'AUC-ROC   : {auc_gbt:.4f}')
print(f'Accuracy  : {acc_gbt:.4f}')
print(f'F1-Score  : {f1_gbt:.4f}')
print(f'Précision : {prec_gbt:.4f}')
print(f'Rappel    : {rec_gbt:.4f}')

---
## Partie 7 — Comparaison et Analyse des Résultats

### 7.1 — Tableau comparatif des 3 modèles

In [None]:
print('=' * 65)
print(f'{"Modèle":<25} {"AUC-ROC":>8} {"Accuracy":>10} {"F1":>8} {"Précision":>10} {"Rappel":>8}')
print('=' * 65)
print(f'{"Régression Logistique":<25} {auc_lr:>8.4f} {acc_lr:>10.4f} {f1_lr:>8.4f} {prec_lr:>10.4f} {rec_lr:>8.4f}')
print(f'{"Random Forest":<25} {auc_rf:>8.4f} {acc_rf:>10.4f} {f1_rf:>8.4f} {prec_rf:>10.4f} {rec_rf:>8.4f}')
print(f'{"GBT (XGBoost)":<25} {auc_gbt:>8.4f} {acc_gbt:>10.4f} {f1_gbt:>8.4f} {prec_gbt:>10.4f} {rec_gbt:>8.4f}')
print('=' * 65)
print()
print('Meilleur modèle par AUC-ROC : GBT (0.7530)')
print('NOTE : AUC-ROC et F1 sont les métriques clés car TARGET est déséquilibré (8% de défauts)')

### 7.2 — Matrice de confusion du meilleur modèle (GBT)

La matrice de confusion permet de visualiser les types d'erreurs du modèle :
- **TP (True Positive)** : défauts correctement détectés → crucial en finance
- **TN (True Negative)** : non-défauts correctement rejetés
- **FP (False Positive)** : non-défauts classés comme défauts → coût commercial
- **FN (False Negative)** : défauts manqués → coût financier le plus grave

In [None]:
print('Matrice de confusion — GBT (meilleur modèle) :')
preds_gbt.groupBy('TARGET', 'prediction').count().orderBy('TARGET', 'prediction').show()

tp = preds_gbt.filter((F.col('TARGET') == 1) & (F.col('prediction') == 1)).count()
tn = preds_gbt.filter((F.col('TARGET') == 0) & (F.col('prediction') == 0)).count()
fp = preds_gbt.filter((F.col('TARGET') == 0) & (F.col('prediction') == 1)).count()
fn = preds_gbt.filter((F.col('TARGET') == 1) & (F.col('prediction') == 0)).count()

print(f'TP (vrais positifs)  : {tp}  — défauts bien détectés')
print(f'TN (vrais négatifs)  : {tn}  — non-défauts bien détectés')
print(f'FP (faux positifs)   : {fp}  — non-défauts classés comme défauts')
print(f'FN (faux négatifs)   : {fn}  — défauts manqués')
print()
print(f'Sensibilité (Recall) : {tp/(tp+fn):.4f}')
print(f'Spécificité          : {tn/(tn+fp):.4f}')