# Étape 2 — Supervisé & Deep Learning (ECG 1D)

Ce notebook correspond à **l’Étape 2** du projet :
- Classification supervisée de signaux ECG
- Basé sur ce que tu as réellement : **battements ECG 1D en CSV (PTBDB / MIT-BIH)**  
- Modèles : **CNN1D** et **LSTM**
- Évaluation : Accuracy, F1-macro, ROC-AUC (si applicable)

⚠️ Hypothèses :
- `X = toutes les colonnes sauf la dernière`
- `y = dernière colonne` (labels)
- Pas d’images ECG ni de textes cliniques ici (pipeline cohérent avec Étape 1)


In [1]:

# Cellule 1 — Imports
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, classification_report, roc_auc_score

import tensorflow as tf
from tensorflow.keras import layers as L, Model


## 1) Chargement des données (CSV battements ECG)

In [4]:

# Racine réelle = parent du dossier Projet
PROJECT_ROOT = Path.cwd().parent

DATA_DIR = PROJECT_ROOT / "Dataset Projet"

print("PROJECT_ROOT:", PROJECT_ROOT)
print("DATA_DIR:", DATA_DIR)
print("Existe ?", DATA_DIR.exists())

FILES = {
    "ptbdb_normal": "ptbdb_normal.csv",
    "ptbdb_abnormal": "ptbdb_abnormal.csv",
    "mitbih_train": "mitbih_train.csv",
    "mitbih_test": "mitbih_test.csv",
}

def load_csv(path):
    df = pd.read_csv(path, header=None)
    df = df.apply(pd.to_numeric, errors="coerce")
    return df

dfs = {}
for k, f in FILES.items():
    p = DATA_DIR / f
    if p.exists():
        dfs[k] = load_csv(p)
        print(k, dfs[k].shape)


PROJECT_ROOT: /Users/raphc/Documents/Cours M2/algorithmique supervisé
DATA_DIR: /Users/raphc/Documents/Cours M2/algorithmique supervisé/Dataset Projet
Existe ? True
ptbdb_normal (4046, 188)
ptbdb_abnormal (10506, 188)
mitbih_train (87554, 188)
mitbih_test (21892, 188)


## 2) Construction de X / y

Par défaut :
- PTBDB normal + abnormal
- Sinon MIT-BIH (train + test)


In [3]:

# Cellule 3 — Build dataset

def split_xy(df):
    return df.iloc[:, :-1], df.iloc[:, -1]

USE = "PTBDB"  # ou "MITBIH"

if USE == "PTBDB":
    Xn, yn = split_xy(dfs["ptbdb_normal"])
    Xa, ya = split_xy(dfs["ptbdb_abnormal"])
    X = pd.concat([Xn, Xa], ignore_index=True)
    y = pd.concat([yn, ya], ignore_index=True)
else:
    Xt, yt = split_xy(dfs["mitbih_train"])
    Xv, yv = split_xy(dfs["mitbih_test"])
    X = pd.concat([Xt, Xv], ignore_index=True)
    y = pd.concat([yt, yv], ignore_index=True)

# nettoyage NaN
mask = ~np.isnan(X.values).any(axis=1)
X = X.loc[mask].values.astype("float32")
y = y.loc[mask].values.astype("int64")

print("X shape:", X.shape)
print("y distribution:", dict(pd.Series(y).value_counts()))


KeyError: 'ptbdb_normal'

## 3) Train / Test split + normalisation

In [None]:

# Cellule 4 — Split + scaling

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# reshape pour réseaux 1D : (N, T, 1)
X_train_nn = X_train[..., None]
X_test_nn = X_test[..., None]

print("Train:", X_train_nn.shape, "Test:", X_test_nn.shape)


## 4) Modèle CNN 1D

In [None]:

# Cellule 5 — CNN1D

def build_cnn1d(input_shape, n_classes):
    inp = L.Input(shape=input_shape)
    x = L.Conv1D(32, 5, padding="same", activation="relu")(inp)
    x = L.MaxPool1D(2)(x)
    x = L.Conv1D(64, 5, padding="same", activation="relu")(x)
    x = L.MaxPool1D(2)(x)
    x = L.Conv1D(128, 3, padding="same", activation="relu")(x)
    x = L.GlobalAveragePooling1D()(x)
    x = L.Dense(64, activation="relu")(x)
    out = L.Dense(n_classes, activation="softmax")(x)

    model = Model(inp, out)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

n_classes = len(np.unique(y))
cnn = build_cnn1d(X_train_nn.shape[1:], n_classes)
cnn.summary()


In [None]:

# Cellule 6 — Entraînement CNN

hist_cnn = cnn.fit(
    X_train_nn, y_train,
    validation_split=0.2,
    epochs=15,
    batch_size=256,
    verbose=1
)


In [None]:

# Cellule 7 — Évaluation CNN

y_pred = np.argmax(cnn.predict(X_test_nn), axis=1)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("F1 macro:", f1_score(y_test, y_pred, average="macro"))
print(classification_report(y_test, y_pred))


## 5) Modèle LSTM 1D

In [None]:

# Cellule 8 — LSTM

def build_lstm(input_shape, n_classes):
    inp = L.Input(shape=input_shape)
    x = L.LSTM(64, return_sequences=True)(inp)
    x = L.LSTM(64)(x)
    x = L.Dense(64, activation="relu")(x)
    out = L.Dense(n_classes, activation="softmax")(x)

    model = Model(inp, out)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

lstm = build_lstm(X_train_nn.shape[1:], n_classes)
lstm.summary()


In [None]:

# Cellule 9 — Entraînement LSTM

hist_lstm = lstm.fit(
    X_train_nn, y_train,
    validation_split=0.2,
    epochs=15,
    batch_size=256,
    verbose=1
)


In [None]:

# Cellule 10 — Évaluation LSTM

y_pred_lstm = np.argmax(lstm.predict(X_test_nn), axis=1)

print("Accuracy:", accuracy_score(y_test, y_pred_lstm))
print("F1 macro:", f1_score(y_test, y_pred_lstm, average="macro"))
print(classification_report(y_test, y_pred_lstm))


## 6) Comparaison des modèles

In [None]:

# Cellule 11 — Tableau récapitulatif

results = pd.DataFrame([
    {
        "model": "CNN1D",
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_macro": f1_score(y_test, y_pred, average="macro"),
    },
    {
        "model": "LSTM",
        "accuracy": accuracy_score(y_test, y_pred_lstm),
        "f1_macro": f1_score(y_test, y_pred_lstm, average="macro"),
    },
])

results


## 7) Conclusion (à reporter dans le rapport)

- Modèles testés : CNN1D, LSTM  
- Données : battements ECG (CSV)  
- Split : 80/20 stratifié  
- Métriques : Accuracy, F1-macro  
- Modèle retenu : celui maximisant F1-macro  
