# Credit Risk Classification (Risque)

**Objectif**: Construire un modèle de classification pour prédire la variable cible `Risque ∈ {Risque Elevé, Risque Faible}` à partir de 7 variables d’entrée `A1..A7`.

**Livrables couverts dans ce notebook**:
- Récupération et vérification des données
- Exploration des données
- Nettoyage et préparation
- Développement d’au moins 3 modèles
- Évaluation et comparaison
- Sauvegarde du meilleur modèle

> Dataset utilisé: `data/Risque_data.xlsx` (690 lignes, 8 colonnes).


In [None]:
from __future__ import annotations

from pathlib import Path

import joblib
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)
from sklearn.model_selection import StratifiedKFold, cross_validate, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier

DATA_PATH = Path('../data/Risque_data.xlsx')
# Same path/name as the Streamlit app expects
ARTIFACT_PATH = Path('../artifacts/credit_risk_model.joblib')
TARGET_COL = 'Risque'

pd.set_option('display.max_columns', 200)
np.random.seed(42)
print('DATA_PATH:', DATA_PATH.resolve())
print('ARTIFACT_PATH:', ARTIFACT_PATH.resolve())


In [None]:
# 1) Récupération et vérification des données

df = pd.read_excel(DATA_PATH)
print('shape:', df.shape)
print('columns:', df.columns.tolist())

# Vérifications attendues
expected_cols = ['A1','A2','A3','A4','A5','A6','A7', TARGET_COL]
missing = [c for c in expected_cols if c not in df.columns]
assert not missing, f"Colonnes manquantes: {missing}"

print('\nDtypes:')
display(df.dtypes)

print('\nValeurs de la cible:')
display(df[TARGET_COL].value_counts(dropna=False))

print('\nValeurs manquantes (par colonne):')
display(df.isna().sum())

print('\nAperçu:')
display(df.head())


## 2) Exploration des données (EDA)

On explore:
- statistiques descriptives (numériques / catégorielles)
- duplications
- distribution de la cible
- cohérence des catégories



In [None]:
numeric_cols = ['A1','A2','A3','A4']
cat_cols = ['A5','A6','A7']

print('Duplicates:', int(df.duplicated().sum()))

print('\nDescribe (numériques):')
display(df[numeric_cols].describe().T)

print('\nDescribe (catégorielles):')
display(df[cat_cols].describe().T)

print('\nCardinalité catégories:')
display(pd.DataFrame({c: [df[c].nunique(dropna=True)] for c in cat_cols}).T.rename(columns={0:'n_unique'}))

print('\nQuelques valeurs par variable catégorielle:')
for c in cat_cols:
    vals = df[c].dropna().astype(str).unique().tolist()[:20]
    print(f"{c}: {vals}{' ...' if df[c].nunique(dropna=True)>20 else ''}")


## 3) Nettoyage et préparation des données

On applique un nettoyage simple et reproductible:
- suppression des doublons
- gestion des valeurs manquantes (imputation)
- (optionnel) filtrage d’outliers sur les variables numériques

Ensuite, on prépare les données pour le Machine Learning via une **pipeline sklearn**:
- imputation + standardisation pour les variables numériques
- imputation + one-hot encoding pour les variables catégorielles



In [None]:
# Basic cleaning (mirrors the Streamlit app defaults)

raw = df.copy()

# Drop duplicates
n_dup = int(raw.duplicated().sum())
raw = raw.drop_duplicates()
print('duplicates removed:', n_dup)

# Optional outlier filtering (disabled by default)
USE_OUTLIER_FILTER = False
Z_THRESHOLD = 3.0


def zscore_keep_mask(s: pd.Series, thr: float = 3.0) -> pd.Series:
    x = pd.to_numeric(s, errors='coerce')
    mu = x.mean()
    sigma = x.std(ddof=0)
    if not np.isfinite(sigma) or sigma == 0:
        return pd.Series([True] * len(s), index=s.index)
    z = (x - mu) / sigma
    return z.abs() <= thr


clean = raw.copy()
if USE_OUTLIER_FILTER:
    mask = pd.Series([True] * len(clean), index=clean.index)
    for c in numeric_cols:
        mask &= zscore_keep_mask(clean[c], Z_THRESHOLD)
    clean = clean.loc[mask].copy()

print('rows before:', len(df), 'rows after cleaning:', len(clean))

# Target check
assert TARGET_COL in clean.columns
assert set(clean[TARGET_COL].dropna().unique()) <= {'Risque Elevé', 'Risque Faible'}

clean.head()


In [None]:
# Encode target to binary (same convention as the app)
# Risque Elevé -> 0, Risque Faible -> 1

y = clean[TARGET_COL].map({'Risque Elevé': 0, 'Risque Faible': 1}).astype('int64')
X = clean.drop(columns=[TARGET_COL])

print('X shape:', X.shape)
print('y distribution:')
display(y.value_counts().rename(index={0:'Risque Elevé', 1:'Risque Faible'}))

# Preprocessor
numeric_features = numeric_cols
categorical_features = cat_cols

preprocessor = ColumnTransformer(
    transformers=[
        (
            'num',
            Pipeline(steps=[
                ('imputer', SimpleImputer(strategy='median')),
                ('scaler', StandardScaler()),
            ]),
            numeric_features,
        ),
        (
            'cat',
            Pipeline(steps=[
                ('imputer', SimpleImputer(strategy='most_frequent')),
                # Keep dense output so HistGradientBoostingClassifier can train.
                ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
            ]),
            categorical_features,
        ),
    ],
    remainder='drop',
)

preprocessor


## 4) Développement des modèles (au moins 3 algorithmes)

On entraîne 3 classifieurs via des pipelines complètes:
- Régression Logistique (baseline)
- Random Forest
- Gradient Boosting

On évalue par validation croisée stratifiée, puis on fait un test final sur un holdout.



In [None]:
def make_pipeline(model) -> Pipeline:
    return Pipeline(steps=[('pre', preprocessor), ('clf', model)])

models = {
    'HistGB': HistGradientBoostingClassifier(random_state=42),
    'RandomForest': RandomForestClassifier(n_estimators=400, random_state=42),
    'GradientBoosting': GradientBoostingClassifier(random_state=42),
}

scoring = {
    'accuracy': 'accuracy',
    'precision': 'precision',
    'recall': 'recall',
    'f1': 'f1',
    'roc_auc': 'roc_auc',
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

cv_rows = []
cv_details = {}

for name, model in models.items():
    pipe = make_pipeline(model)
    out = cross_validate(pipe, X, y, cv=cv, scoring=scoring, n_jobs=None)
    row = {
        'model': name,
        **{f"cv_{k}_mean": float(np.mean(out[f"test_{k}"])) for k in scoring.keys()},
        **{f"cv_{k}_std": float(np.std(out[f"test_{k}"])) for k in scoring.keys()},
    }
    cv_rows.append(row)
    cv_details[name] = out

cv_results = pd.DataFrame(cv_rows).sort_values('cv_f1_mean', ascending=False).reset_index(drop=True)
cv_results


In [None]:
# Choose best model by CV F1 (you can change the criterion if needed)
best_name = str(cv_results.iloc[0]['model'])
print('Best by CV F1:', best_name)

best_model = models[best_name]
best_pipe = make_pipeline(best_model)

# Final evaluation on a holdout split
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.30,
    random_state=42,
    stratify=y,
)

best_pipe.fit(X_train, y_train)
y_pred = best_pipe.predict(X_test)

metrics = {
    'model': best_name,
    'accuracy': float(accuracy_score(y_test, y_pred)),
    'precision': float(precision_score(y_test, y_pred, zero_division=0)),
    'recall': float(recall_score(y_test, y_pred, zero_division=0)),
    'f1': float(f1_score(y_test, y_pred, zero_division=0)),
}

# Optional AUC if available
roc_auc = None
if hasattr(best_pipe, 'predict_proba'):
    try:
        proba = best_pipe.predict_proba(X_test)[:, 1]
        roc_auc = float(roc_auc_score(y_test, proba))
        metrics['roc_auc'] = roc_auc
    except Exception:
        pass

print('Holdout metrics:')
display(pd.DataFrame([metrics]))

print('\nConfusion matrix (rows=true, cols=pred) [0=Elevé, 1=Faible]:')
display(pd.DataFrame(confusion_matrix(y_test, y_pred), index=['true_0','true_1'], columns=['pred_0','pred_1']))

print('\nClassification report:')
print(classification_report(y_test, y_pred, target_names=['Risque Elevé','Risque Faible'], zero_division=0))


## 5) Sauvegarde du meilleur modèle

On sauvegarde un bundle `joblib` qui contient:
- `pipeline`: la pipeline sklearn complète (prétraitement + modèle)
- `metadata`: infos utiles (colonnes, métriques, configuration)

Ce format est compatible avec l’application Streamlit (chargement via `joblib.load`).



In [None]:
# Save artifact (same layout as the Streamlit app expects)

# Numeric ranges (for sanity checks in the app)
numeric_ranges = {}
for c in numeric_cols:
    s = pd.to_numeric(clean[c], errors='coerce')
    numeric_ranges[c] = {'min': float(np.nanmin(s)), 'max': float(np.nanmax(s))}

metadata = {
    'model_name': best_name,
    'target_col': TARGET_COL,
    'label_mapping': {'Risque Elevé': 0, 'Risque Faible': 1},
    'numeric_cols': numeric_cols,
    'categorical_cols': cat_cols,
    'cv_results': cv_results.to_dict(orient='records'),
    'holdout_metrics': metrics,
    'metrics': {'numeric_ranges': numeric_ranges},
    'cleaning': {
        'drop_duplicates': True,
        'use_outlier_filter': USE_OUTLIER_FILTER,
        'z_threshold': Z_THRESHOLD,
    },
}

ARTIFACT_PATH.parent.mkdir(parents=True, exist_ok=True)
joblib.dump({'pipeline': best_pipe, 'metadata': metadata}, ARTIFACT_PATH)

print('Saved artifact to:', ARTIFACT_PATH.resolve())
print('Bundle keys:', joblib.load(ARTIFACT_PATH).keys())
