# TP noté 2 – Évaluation des classificateurs binaires et régression logistique (HW2)
## Notebook starter (hybride) — à compléter en groupe

### Organisation attendue
Ce notebook suppose l’arborescence suivante (après décompression de l’archive fournie par l’enseignant) :

```
HW2_Binary_Classification/
├── data/
│   ├── x_train.csv
│   ├── y_train.csv
│   ├── x_valid.csv
│   ├── y_valid.csv
│   ├── x_test.csv
│   └── y_test.csv
├── HW2_logistic_regression.ipynb   ← ce notebook
```

### Important – Nature de l’évaluation
L’évaluation porte prioritairement sur la compréhension, la capacité à expliquer, justifier, défendre vos choix, et à adapter votre raisonnement.
Un code fonctionnel sans compréhension démontrée ne garantit pas une bonne note.

### Règles de travail
- Complétez uniquement les cellules marquées TODO.
- N’effacez pas les cellules de test/figure.
- Les réponses aux questions se font dans le rapport PDF (sans code), mais vous trouverez ici des rappels en Markdown.

In [None]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, roc_auc_score, log_loss
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

np.set_printoptions(precision=4, suppress=True)

## 0) Chargement des données (NE PAS MODIFIER)

In [None]:
DATA_DIR = "data"

x_train = pd.read_csv(f"{DATA_DIR}/x_train.csv")
y_train = pd.read_csv(f"{DATA_DIR}/y_train.csv")

x_valid = pd.read_csv(f"{DATA_DIR}/x_valid.csv")
y_valid = pd.read_csv(f"{DATA_DIR}/y_valid.csv")

x_test = pd.read_csv(f"{DATA_DIR}/x_test.csv")
y_test = pd.read_csv(f"{DATA_DIR}/y_test.csv")

# y_* peut être une colonne unique ; on convertit en vecteur 1D
def _to_1d(y_df):
    if isinstance(y_df, pd.DataFrame):
        if y_df.shape[1] != 1:
            raise ValueError("y should have exactly one column")
        return y_df.iloc[:, 0].astype(int).values
    return np.asarray(y_df).astype(int).reshape(-1)

ytr = _to_1d(y_train)
yva = _to_1d(y_valid)
yte = _to_1d(y_test)

print("x_train:", x_train.shape, "y_train:", ytr.shape)
print("x_valid:", x_valid.shape, "y_valid:", yva.shape)
print("x_test :", x_test.shape,  "y_test :", yte.shape)

display(x_train.head())

## A2) Table A1 (Rapport) — descriptif des ensembles
À produire dans le rapport PDF : pour train/valid/test
- total count
- positive label count
- fraction positive

In [None]:
# TODO (optionnel) : construisez ici votre Table A1 pour faciliter le copier-coller dans le rapport.
# Indication : utilisez len(y), y.sum(), y.mean()

def summarize_split(y):
    y = np.asarray(y).reshape(-1)
    return {
        "total_count": int(len(y)),
        "positive_count": int(y.sum()),
        "fraction_positive": float(y.mean())
    }

summary = pd.DataFrame({
    "train": summarize_split(ytr),
    "valid": summarize_split(yva),
    "test":  summarize_split(yte),
})

summary

# Bloc A — Métriques pour prédictions binaires (NumPy)

Vous devez implémenter les fonctions suivantes from scratch.
Elles seront ensuite utilisées dans les analyses (baseline, choix de seuil, matrices de confusion).

Rappel : y_true et y_pred sont des vecteurs de 0/1 de même taille.

In [None]:
# =========================
# TODO A1(a): TP, TN, FP, FN
# =========================
def calc_TP_TN_FP_FN(y_true, y_pred):
    '''
    Return (TP, TN, FP, FN) as integers.
    Convention:
    - Positive class = 1 (cancer)
    - Negative class = 0 (no cancer)
    '''
    # TODO: implement
    raise NotImplementedError


# =========================
# TODO A1(b): Accuracy
# =========================
def calc_ACC(y_true, y_pred):
    '''Return accuracy in [0,1].'''
    # TODO: implement
    raise NotImplementedError


# =========================
# TODO A1(c): TPR (Sensitivity) and TNR (Specificity)
# =========================
def calc_TPR(y_true, y_pred):
    '''TPR = TP / (TP + FN). If denominator is 0, return np.nan.'''
    # TODO: implement
    raise NotImplementedError

def calc_TNR(y_true, y_pred):
    '''TNR = TN / (TN + FP). If denominator is 0, return np.nan.'''
    # TODO: implement
    raise NotImplementedError


# =========================
# TODO A1(d): PPV and NPV
# =========================
def calc_PPV(y_true, y_pred):
    '''PPV = TP / (TP + FP). If denominator is 0, return np.nan.'''
    # TODO: implement
    raise NotImplementedError

def calc_NPV(y_true, y_pred):
    '''NPV = TN / (TN + FN). If denominator is 0, return np.nan.'''
    # TODO: implement
    raise NotImplementedError

In [None]:
# Tests unitaires rapides (NE PAS MODIFIER)
yt = np.array([1,1,0,0,1,0])
yp = np.array([1,0,0,0,1,1])

TP, TN, FP, FN = calc_TP_TN_FP_FN(yt, yp)
print("TP,TN,FP,FN =", TP, TN, FP, FN)

print("ACC =", calc_ACC(yt, yp))
print("TPR =", calc_TPR(yt, yp))
print("TNR =", calc_TNR(yt, yp))
print("PPV =", calc_PPV(yt, yp))
print("NPV =", calc_NPV(yt, yp))

# Vérifications attendues: TP=2, TN=2, FP=1, FN=1
assert (TP, TN, FP, FN) == (2, 2, 1, 1)
assert abs(calc_ACC(yt, yp) - (4/6)) < 1e-12
assert abs(calc_TPR(yt, yp) - (2/3)) < 1e-12
assert abs(calc_TNR(yt, yp) - (2/3)) < 1e-12
assert abs(calc_PPV(yt, yp) - (2/3)) < 1e-12
assert abs(calc_NPV(yt, yp) - (2/3)) < 1e-12

print("OK: tests métriques binaires")

# Bloc B — Baseline "predict-0-always" et coûts des erreurs

À faire dans le notebook : matrice de confusion + métriques sur le jeu de test.
À discuter dans le rapport : pourquoi l’accuracy peut être trompeuse, coûts FP vs FN, métriques prioritaires.

In [None]:
def confusion_matrix_2x2(y_true, y_pred):
    '''
    Return a 2x2 numpy array with orientation:
    rows = true class (0 then 1)
    cols = predicted class (0 then 1)

    [[TN, FP],
     [FN, TP]]
    '''
    TP, TN, FP, FN = calc_TP_TN_FP_FN(y_true, y_pred)
    return np.array([[TN, FP],
                     [FN, TP]], dtype=int)

# Baseline: always predict 0
y_pred0_test = np.zeros_like(yte, dtype=int)

cm0 = confusion_matrix_2x2(yte, y_pred0_test)
acc0 = calc_ACC(yte, y_pred0_test)
tpr0 = calc_TPR(yte, y_pred0_test)
ppv0 = calc_PPV(yte, y_pred0_test)

cm0, acc0, tpr0, ppv0

Questions (Rapport PDF) — Bloc B
1. Quelle accuracy obtient la baseline sur le test ? Est-ce un bon classificateur en pratique ?
2. Décrivez les erreurs possibles (FP/FN) en termes cliniques et leurs coûts.
3. Quelles métriques doivent être prioritaires pour cette application ? Justifiez.

# Bloc C — Régression logistique (sklearn) + sélection de C (validation)

Vous allez entraîner 2 modèles :
- F=2 : age, famhistory
- F=3 : marker, age, famhistory

Vous choisirez C en minimisant la log-loss sur validation, en explorant:
C_grid = np.logspace(-9, 6, 31)

Important : normalisez les features via StandardScaler (Pipeline).

In [None]:
# Sélection des colonnes (NE PAS MODIFIER)
cols_F2 = ["age", "famhistory"]
cols_F3 = ["marker", "age", "famhistory"]

Xtr_F2 = x_train[cols_F2].values
Xva_F2 = x_valid[cols_F2].values
Xte_F2 = x_test[cols_F2].values

Xtr_F3 = x_train[cols_F3].values
Xva_F3 = x_valid[cols_F3].values
Xte_F3 = x_test[cols_F3].values

In [None]:
C_grid = np.logspace(-9, 6, 31)

def fit_lr_select_C_by_valid_logloss(Xtr, ytr, Xva, yva, C_grid, random_state=0):
    '''
    Train LR models across C_grid and return:
    - best_model (Pipeline)
    - best_C
    - table (DataFrame) with columns: C, train_logloss, valid_logloss
    '''
    rows = []
    best_C = None
    best_valid = np.inf
    best_model = None

    for C in C_grid:
        model = Pipeline(steps=[
            ("scaler", StandardScaler()),
            ("lr", LogisticRegression(
                solver="lbfgs",
                penalty="l2",
                C=float(C),
                max_iter=5000,
                random_state=random_state
            ))
        ])
        model.fit(Xtr, ytr)

        ptr = model.predict_proba(Xtr)[:, 1]
        pva = model.predict_proba(Xva)[:, 1]

        tr_ll = log_loss(ytr, ptr, labels=[0,1])
        va_ll = log_loss(yva, pva, labels=[0,1])

        rows.append({"C": float(C), "train_logloss": tr_ll, "valid_logloss": va_ll})

        if va_ll < best_valid:
            best_valid = va_ll
            best_C = float(C)
            best_model = model

    table = pd.DataFrame(rows)
    return best_model, best_C, table

# Entraînez et sélectionnez C pour F=2 puis F=3
best_F2_model, best_C_F2, table_F2 = fit_lr_select_C_by_valid_logloss(Xtr_F2, ytr, Xva_F2, yva, C_grid, random_state=0)
best_F3_model, best_C_F3, table_F3 = fit_lr_select_C_by_valid_logloss(Xtr_F3, ytr, Xva_F3, yva, C_grid, random_state=0)

best_C_F2, best_C_F3

In [None]:
# Visualisation (optionnelle) : log-loss vs C
plt.figure()
plt.semilogx(table_F2["C"], table_F2["valid_logloss"], label="F=2 valid logloss")
plt.semilogx(table_F3["C"], table_F3["valid_logloss"], label="F=3 valid logloss")
plt.xlabel("C (log scale)")
plt.ylabel("Validation log-loss")
plt.title("Sélection de C (validation)")
plt.legend()
plt.show()

## C2) ROC sur le jeu de validation — Figure C1 (obligatoire)

Tracer une figure unique contenant 2 courbes ROC (validation) :
- modèle F=2 (bleu, style '.-')
- modèle F=3 (rouge, style '.-')

In [None]:
# Probabilités sur validation
pva_F2 = best_F2_model.predict_proba(Xva_F2)[:, 1]
pva_F3 = best_F3_model.predict_proba(Xva_F3)[:, 1]

fpr2, tpr2, _ = roc_curve(yva, pva_F2)
fpr3, tpr3, _ = roc_curve(yva, pva_F3)

auc2 = roc_auc_score(yva, pva_F2)
auc3 = roc_auc_score(yva, pva_F3)

plt.figure()
plt.plot(fpr2, tpr2, "b.-", label=f"F=2 (AUC={auc2:.3f})")
plt.plot(fpr3, tpr3, "r.-", label=f"F=3 (AUC={auc3:.3f})")
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.title("Figure C1 — ROC (validation)")
plt.legend()
plt.grid(True, alpha=0.2)
plt.show()

print("best_C_F2:", best_C_F2, "best_C_F3:", best_C_F3)

# Bloc D — Sélection du seuil de décision (modèle F=3)

Comparer 3 stratégies (sur le test) :
1. seuil fixe 0.5
2. seuil choisi sur validation : maximiser TPR sous contrainte PPV ≥ 0.98
3. seuil choisi sur validation : maximiser PPV sous contrainte TPR ≥ 0.98

Important : seuil choisi sur validation puis évalué sur test.

In [None]:
# Helpers (NE PAS MODIFIER)
def predict_from_threshold(probas, threshold):
    return (np.asarray(probas) >= float(threshold)).astype(int)

def compute_perf_metrics_across_thresholds(y_true, probas, thresholds):
    rows = []
    for t in thresholds:
        y_pred = predict_from_threshold(probas, t)
        TP, TN, FP, FN = calc_TP_TN_FP_FN(y_true, y_pred)
        rows.append({
            "threshold": float(t),
            "TP": TP, "TN": TN, "FP": FP, "FN": FN,
            "ACC": calc_ACC(y_true, y_pred),
            "TPR": calc_TPR(y_true, y_pred),
            "TNR": calc_TNR(y_true, y_pred),
            "PPV": calc_PPV(y_true, y_pred),
            "NPV": calc_NPV(y_true, y_pred),
        })
    return pd.DataFrame(rows)

def candidate_thresholds_from_probas(probas, n=200):
    p = np.asarray(probas).reshape(-1)
    lo, hi = float(np.min(p)), float(np.max(p))
    grid = np.linspace(lo, hi, int(n))
    extra = np.array([0.0, 0.5, 1.0, lo, hi])
    thr = np.unique(np.clip(np.concatenate([grid, extra]), 0.0, 1.0))
    return thr

In [None]:
# Probabilités F=3 sur validation et test
pva = best_F3_model.predict_proba(Xva_F3)[:, 1]
pte = best_F3_model.predict_proba(Xte_F3)[:, 1]

thresholds = candidate_thresholds_from_probas(pva, n=400)
metrics_va = compute_perf_metrics_across_thresholds(yva, pva, thresholds)

metrics_va.head()

In [None]:
# TODO D1: sélectionner 2 seuils sur validation
# Stratégie 2: maximiser TPR sous contrainte PPV >= 0.98
# Stratégie 3: maximiser PPV sous contrainte TPR >= 0.98

# Résultat attendu: définir deux scalaires float:
# - thr_best_TPR_under_PPV
# - thr_best_PPV_under_TPR

# --- TODO: implement selection logic ---
raise NotImplementedError

In [None]:
# Évaluation sur test (NE PAS MODIFIER, dépend de vos seuils)
thr_05 = 0.5
yhat_05 = predict_from_threshold(pte, thr_05)

yhat_TPR = predict_from_threshold(pte, thr_best_TPR_under_PPV)
yhat_PPV = predict_from_threshold(pte, thr_best_PPV_under_TPR)

cms = [
    confusion_matrix_2x2(yte, yhat_05),
    confusion_matrix_2x2(yte, yhat_TPR),
    confusion_matrix_2x2(yte, yhat_PPV),
]

tprs = [calc_TPR(yte, yhat_05), calc_TPR(yte, yhat_TPR), calc_TPR(yte, yhat_PPV)]
ppvs = [calc_PPV(yte, yhat_05), calc_PPV(yte, yhat_TPR), calc_PPV(yte, yhat_PPV)]

cms, tprs, ppvs

In [None]:
# Figure D1 — matrices de confusion + (TPR, PPV) sur test (OBLIGATOIRE)
titles = [
    "F=3 LR — thr=0.5",
    f"F=3 LR — thr(TPR max | PPV>=0.98) = {thr_best_TPR_under_PPV:.3f}",
    f"F=3 LR — thr(PPV max | TPR>=0.98) = {thr_best_PPV_under_TPR:.3f}",
]

plt.figure(figsize=(14, 7))

for j in range(3):
    ax = plt.subplot(2, 3, j+1)
    ax.imshow(cms[j], cmap="Blues")
    ax.set_title(titles[j])
    ax.set_xticks([0, 1]); ax.set_yticks([0, 1])
    ax.set_xticklabels(["pred 0", "pred 1"])
    ax.set_yticklabels(["true 0", "true 1"])
    for (i, k), val in np.ndenumerate(cms[j]):
        ax.text(k, i, str(val), ha="center", va="center", color="black")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")

    ax2 = plt.subplot(2, 3, j+4)
    ax2.bar([0, 1], [tprs[j], ppvs[j]])
    ax2.set_xticks([0, 1])
    ax2.set_xticklabels(["TPR", "PPV"])
    ax2.set_ylim([0, 1.05])
    ax2.set_title("Test metrics")
    for idx, v in enumerate([tprs[j], ppvs[j]]):
        ax2.text(idx, min(1.02, v + 0.03), f"{v:.3f}", ha="center")

plt.suptitle("Figure D1 — Performance test (confusion matrices, TPR & PPV)", y=1.02)
plt.tight_layout()
plt.show()

Questions (Rapport PDF) — Bloc D
1. Comparez les 3 matrices de confusion. Quelle stratégie respecte le mieux la priorité clinique (éviter FN à tout prix) tout en réduisant les biopsies inutiles ?
2. En supposant que la pratique actuelle est de biopsier tout le monde : combien de biopsies inutiles sont évitées sur le test ? Quel pourcentage de biopsies actuelles cela représente ?

# Bloc E — Perte logistique et stabilité numérique (NumPy)

Implémenter :
- BCE moyenne à partir de probabilités (avec clipping 1e-14)
- logsumexp stable (base e)
- BCE moyenne à partir de scores (avec conversion base 2)

In [None]:
# TODO E(a): mean BCE from probas (base-2 logs)
def calc_mean_binary_cross_entropy_from_probas(y_true, p_pred, eps=1e-14):
    '''
    y_true: array of 0/1
    p_pred: array of probabilities in (0,1)
    Must clip: eps <= p <= 1-eps
    BCE(y,p) = - y log2(p) - (1-y) log2(1-p)
    Return mean over samples.
    '''
    # TODO: implement
    raise NotImplementedError


# TODO E(b): my_logsumexp (natural log base-e)
def my_logsumexp(a):
    '''
    a: 1D array-like of floats.
    Return log(sum(exp(a))) computed stably, using the logsumexp trick.
    Natural log (np.log).
    '''
    # TODO: implement
    raise NotImplementedError


# TODO E(c): mean BCE from scores via logsumexp
def calc_mean_binary_cross_entropy_from_scores(y_true, s_pred):
    '''
    y_true: 0/1
    s_pred: real-valued scores in (-inf, inf)

    scoreBCE(y,s) = log2(1 + exp(-s)) if y=1
                  = log2(1 + exp( s)) if y=0
    Use: log2(1+exp(flip(y)*s)) = logsumexp([0, flip(y)*s]) / log(2)

    flip(y)= -1 if y=1 else +1
    Use my_logsumexp (base-e) and convert to base-2.
    '''
    # TODO: implement
    raise NotImplementedError

In [None]:
# Tests numériques (NE PAS MODIFIER)
yt = np.array([1, 0, 1, 0], dtype=int)
p  = np.array([0.9, 0.1, 0.8, 0.2], dtype=float)

eps = 1e-14
pc = np.clip(p, eps, 1-eps)
bce_direct = np.mean(-yt*np.log2(pc) - (1-yt)*np.log2(1-pc))

bce_fn = calc_mean_binary_cross_entropy_from_probas(yt, p)
print("BCE probas direct:", bce_direct, "BCE probas fn:", bce_fn)
assert abs(bce_direct - bce_fn) < 1e-10

lse = my_logsumexp(np.array([1000.0, -999.0]))
print("logsumexp([1000,-999]) =", lse)
assert abs(lse - 1000.0) < 1e-6

s = np.array([2.0, -2.0, 1.0, -1.0])
sig = 1.0/(1.0 + np.exp(-s))
bce_p = calc_mean_binary_cross_entropy_from_probas(yt, sig)
bce_s = calc_mean_binary_cross_entropy_from_scores(yt, s)

print("BCE probas(sigmoid(scores)):", bce_p)
print("BCE scores:", bce_s)
assert abs(bce_p - bce_s) < 1e-8

print("OK: tests stabilité numérique")

# À rendre
1. Notebook .ipynb complété (Figures C1 et D1 obligatoires).
2. Rapport PDF (≤ 6 pages), sans code, avec références explicites à vos figures/valeurs.