In [2]:
# ------------------------------------------
# 📂 Importations nécessaires
# ------------------------------------------

# Bibliothèques standard Python
import os
import json
from collections import Counter

# Bibliothèques de manipulation de données
import numpy as np
import pandas as pd
from joblib import dump

# Bibliothèques de visualisation
import matplotlib.pyplot as plt
import seaborn as sns

# Scikit-learn
from sklearn.preprocessing import StandardScaler, label_binarize
from sklearn.utils import shuffle
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
    confusion_matrix,
    classification_report,
    ConfusionMatrixDisplay,
    recall_score,
    roc_curve,
    auc
)

# TensorFlow et Keras
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import (
    EarlyStopping,
    ReduceLROnPlateau,
    Callback
)
from tensorflow.keras.layers import (
    GRU,
    Dense,
    Input,
    Dropout,
    LayerNormalization,
    Flatten,
    Attention,
    MultiHeadAttention,
    GlobalAveragePooling1D,
    Lambda
)

In [9]:
# ------------------------------------------
# 🚀 Chargement des données locales
# ------------------------------------------

# Fichiers d'entrée
train_file = "../Cleaned_data/mitbih_train_trimmed.csv"
test_file = "../Cleaned_data/mitbih_test_trimmed.csv"

In [10]:
# ------------------------------------------
# 📥 Chargement des données avec fonction
# ------------------------------------------

def load_data(file_path):
    print(f"Chargement des données depuis {file_path}...")
    df = pd.read_csv(file_path)  # Chargement du fichier CSV
    X = df.iloc[:, :-1].values  # Toutes les colonnes sauf la dernière (features)
    y = df.iloc[:, -1].astype(int).values  # Dernière colonne = labels
    X, y = shuffle(X, y, random_state=42)  # Mélange aléatoire des données
    print(f"Nombre d'échantillons : {len(X)}")
    print("Distribution des classes :", Counter(y))  # Distribution des classes
    return X, y

# Charger les données en utilisant la fonction
train_ecgs, train_labels = load_data(train_file)
test_ecgs, test_labels = load_data(test_file)

Chargement des données depuis ../Cleaned_data/mitbih_train_trimmed.csv...
Nombre d'échantillons : 87553
Distribution des classes : Counter({0: 72470, 4: 6431, 2: 5788, 1: 2223, 3: 641})
Chargement des données depuis ../Cleaned_data/mitbih_test_trimmed.csv...
Nombre d'échantillons : 21891
Distribution des classes : Counter({0: 18117, 4: 1608, 2: 1448, 1: 556, 3: 162})


In [None]:
# ------------------------------------------
# 🔄 Augmentation des données
# ------------------------------------------
def add_gaussian_noise(ecg, noise_level=0.01):
    noise = np.random.normal(0, noise_level, len(ecg))
    return ecg + noise

def shift_signal(ecg, shift=10):
    return np.roll(ecg, shift)

def combine_augmentations(ecg, noise_level=0.01, shift=10):
    return shift_signal(add_gaussian_noise(ecg, noise_level), shift)

def augment_class_diverse(ecgs, deficit):
    """
    Augmente une classe spécifique en utilisant plusieurs techniques.
    """
    augmented_ecgs = []
    for i in range(deficit):
        ecg = ecgs[i % len(ecgs)]  # Réutiliser les échantillons si nécessaire
        if i % 3 == 0:
            augmented_ecgs.append(shift_signal(ecg))
        elif i % 3 == 1:
            augmented_ecgs.append(add_gaussian_noise(ecg))
        else:
            augmented_ecgs.append(combine_augmentations(ecg))
    return augmented_ecgs


In [None]:
# ------------------------------------------
# ➗ Séparer les données par classes
# ------------------------------------------

def separate_sains_malades(ecgs, labels):
    """
    Sépare les données en deux catégories : Sains (0) et Malades (1).
    """
    sains = []
    malades = []
    for ecg, label in zip(ecgs, labels):
        if label == 0:
            sains.append(ecg)
        else:  # Regroupe les classes 1, 2, 3, 4 en "Malades"
            malades.append(ecg)
    return sains, malades

# Séparation des données d'entraînement
sains_train, malades_train = separate_sains_malades(train_ecgs, train_labels)

# Distribution initiale
print(f"Nombre de sujets sains : {len(sains_train)}")
print(f"Nombre de sujets malades : {len(malades_train)}")

# Étape 3 : Augmentation des données
# Ajuster la taille cible pour équilibrer les classes
final_group_size = max(len(sains_train), len(malades_train))  # Taille cible basée sur la classe majoritaire

# Augmentation pour les malades
deficit_malades = final_group_size - len(malades_train)
augmented_malades = augment_class_diverse(malades_train, deficit_malades)
malades_train += augmented_malades

# Augmentation pour les sains (si nécessaire)
deficit_sains = final_group_size - len(sains_train)
augmented_sains = augment_class_diverse(sains_train, deficit_sains)
sains_train += augmented_sains

print(f"Après augmentation : {len(sains_train)} sujets sains, {len(malades_train)} sujets malades")

# Combiner les deux catégories
final_ecgs_train = sains_train + malades_train
final_labels_train = [0] * len(sains_train) + [1] * len(malades_train)

# Mélanger les données augmentées
final_ecgs_train, final_labels_train = shuffle(final_ecgs_train, final_labels_train, random_state=42)

# Étape 4 : Répétez pour les données de test
sains_test, malades_test = separate_sains_malades(test_ecgs, test_labels)

final_ecgs_test = sains_test + malades_test
final_labels_test = [0] * len(sains_test) + [1] * len(malades_test)

final_ecgs_test, final_labels_test = shuffle(final_ecgs_test, final_labels_test, random_state=42)

# Étape 5 : Visualisation des nouvelles distributions
final_distribution_train = Counter(final_labels_train)
final_distribution_test = Counter(final_labels_test)

# Pie chart pour l'entraînement
plt.figure(figsize=(8, 6))
plt.pie(
   [final_distribution_train[0], final_distribution_train[1]],
   labels=["Sains", "Malades"],
   autopct='%1.1f%%',
   startangle=90,
   colors=['#C7E5D6', '#B8D8E8']
)
plt.title("Distribution des ECGs (Entraînement)")
plt.show()

# Pie chart pour le test
plt.figure(figsize=(8, 6))
plt.pie(
   [final_distribution_test[0], final_distribution_test[1]],
   labels=["Sains", "Malades"],
   autopct='%1.1f%%',
   startangle=90,
   colors=['#C7E5D6', '#B8D8E8']
)
plt.title("Distribution des ECGs (Test)")
plt.show()

In [None]:
# Vérification des shapes des données initiales
final_ecgs_train = np.array(final_ecgs_train)  # Conversion en array numpy
final_ecgs_test = np.array(final_ecgs_test)    # Conversion en array numpy

print("final_ecgs_train shape initial:", final_ecgs_train.shape)
print("final_ecgs_test shape initial:", final_ecgs_test.shape)

# 1. Normalisation des données (en s'assurant qu'elles sont en 2D)
scaler = StandardScaler()

# Reshape en 2D pour la normalisation
final_ecgs_train = final_ecgs_train.reshape(final_ecgs_train.shape[0], -1)
final_ecgs_test = final_ecgs_test.reshape(final_ecgs_test.shape[0], -1)

# Normalisation
final_ecgs_train = scaler.fit_transform(final_ecgs_train)
final_ecgs_test = scaler.transform(final_ecgs_test)

# Reshape pour compatibilité avec GRU (format 3D)
final_ecgs_train = final_ecgs_train.reshape(-1, 182, 1)
final_ecgs_test = final_ecgs_test.reshape(-1, 182, 1)

# Vérification des dimensions finales
print("final_ecgs_train shape final:", final_ecgs_train.shape)
print("final_ecgs_test shape final:", final_ecgs_test.shape)

# Conversion des labels en tableaux NumPy
final_labels_train = np.array(final_labels_train)
final_labels_test = np.array(final_labels_test)

In [None]:
# Callback pour suivre le recall
class RecallCallback(Callback):
    def __init__(self, validation_data=None, training_data=None):
        super(RecallCallback, self).__init__()
        self.validation_data = validation_data
        self.training_data = training_data
        self.train_recalls = []
        self.val_recalls = []

    def on_epoch_end(self, epoch, logs={}):
        # Calcul du recall sur les données d'entraînement
        y_pred = self.model.predict(self.training_data[0])
        y_pred_classes = (y_pred[:, 1] >= 0.5).astype(int)
        train_recall = recall_score(self.training_data[1], y_pred_classes)
        self.train_recalls.append(train_recall)
        logs['recall'] = train_recall

        # Calcul du recall sur les données de validation
        if self.validation_data:
            val_pred = self.model.predict(self.validation_data[0])
            val_pred_classes = (val_pred[:, 1] >= 0.5).astype(int)
            val_recall = recall_score(self.validation_data[1], val_pred_classes)
            self.val_recalls.append(val_recall)
            logs['val_recall'] = val_recall

In [None]:
# Paramètres du modèle
input_shape = final_ecgs_train.shape[1:]  # Ajusté selon les données reshaped
gru_units = 64
dropout_rate = 0.2
num_classes = 2

# Modification du bloc GRU avec attention
def gru_attention_block(inputs, gru_units, dropout_rate):
    # Première couche GRU
    x = GRU(gru_units, return_sequences=True)(inputs)
    x = LayerNormalization()(x)
    x = Dropout(dropout_rate)(x)

    # Deuxième couche GRU
    x = GRU(gru_units * 2, return_sequences=True)(x)
    x = LayerNormalization()(x)
    x = Dropout(dropout_rate)(x)

    # Attention
    attention_output = Attention()([x, x])
    return attention_output

In [None]:
# Définition des couches
inputs = Input(shape=input_shape)
x = gru_attention_block(inputs, gru_units, dropout_rate)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
x = LayerNormalization()(x)
x = Dropout(dropout_rate)(x)

outputs = Dense(num_classes, activation="softmax")(x)

# Création et compilation du modèle
model = Model(inputs=inputs, outputs=outputs)
optimizer = Adam(learning_rate=0.001)
model.compile(
    optimizer=optimizer,
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

In [None]:
# Définition des callbacks
early_stopping = EarlyStopping(
    monitor="val_loss",
    patience=8,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor="val_loss",
    factor=0.2,     # Réduire à 0.5 pour une décroissance plus douce
    patience=4,     # Augmenter à 6-8 pour plus de stabilité
    verbose=1,
    min_lr=1e-7,    # Ajuster à 1e-6
    cooldown=2  
)

# Initialisation du callback de recall
recall_callback = RecallCallback(
    validation_data=(final_ecgs_test, final_labels_test),
    training_data=(final_ecgs_train, final_labels_train)
)

# Calculer la distribution originale (avant regroupement en 2 classes)
original_distribution = Counter([label for label in final_labels_train])
print("Distribution originale brute:", original_distribution)

# Calculer la somme des malades (classes 1-4)
nb_sains = original_distribution[0]  # classe 0
nb_malades = sum(original_distribution[i] for i in range(1, 5))  # somme des classes 1-4

binary_distribution = {0: nb_sains, 1: nb_malades}
print("Distribution binaire:", binary_distribution)

# Calculer les poids
max_samples = max(binary_distribution.values())
class_weights = {
    label: max_samples/count
    for label, count in binary_distribution.items()
}

print("Poids par classe:", class_weights)

In [None]:
# Préparation des ensembles d'entraînement et de validation
X_train, X_val, y_train, y_val = train_test_split(
    final_ecgs_train,
    final_labels_train,
    test_size=0.2,  # 20% pour validation
    random_state=42,
    stratify=final_labels_train
)

# Entraînement
history = model.fit(
    X_train,
    y_train,
    epochs=50,
    batch_size=128,
    validation_data=(X_val, y_val),  # Utilisation des données de validation
    callbacks=[early_stopping, reduce_lr, recall_callback],
    verbose=1,
    class_weight=class_weights
)

In [None]:
# Sauvegarder le modèle
dump(scaler, 'ecg_scaler_m1.joblib')
model.save('ecg_model_m1.h5')

# Sauvegarder l'historique d'entraînement
history_dict = history.history
np.save('training_history.npy', history_dict)

In [None]:
# Création des graphiques de loss et accuracy
plt.figure(figsize=(15, 5))

# Plot de la loss
plt.subplot(1, 3, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Évolution de la fonction de perte')
plt.xlabel('Époque')
plt.ylabel('Perte')
plt.legend()
plt.grid(True)

# Plot de l'accuracy
plt.subplot(1, 3, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Évolution de l\'accuracy')
plt.xlabel('Époque')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

# Plot du recall (nouveau)
plt.subplot(1, 3, 3)
plt.plot(recall_callback.train_recalls, label='Training Recall')
plt.plot(recall_callback.val_recalls, label='Validation Recall')
plt.title('Évolution du Recall')
plt.xlabel('Époque')
plt.ylabel('Recall')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# Afficher les valeurs finales
print("\nRésumé des performances finales :")
print(f"Perte finale sur l'ensemble d'entraînement : {history.history['loss'][-1]:.4f}")
print(f"Perte finale sur l'ensemble de validation : {history.history['val_loss'][-1]:.4f}")
print(f"Accuracy finale sur l'ensemble d'entraînement : {history.history['accuracy'][-1]:.4f}")
print(f"Accuracy finale sur l'ensemble de validation : {history.history['val_accuracy'][-1]:.4f}")
print(f"Recall final sur l'ensemble d'entraînement : {recall_callback.train_recalls[-1]:.4f}")
print(f"Recall final sur l'ensemble de validation : {recall_callback.val_recalls[-1]:.4f}")



In [None]:
# Prédictions
y_pred_proba = model.predict(final_ecgs_test)[:, 1]
y_pred = np.argmax(model.predict(final_ecgs_test), axis=-1)

# Métriques de base
accuracy_test = accuracy_score(final_labels_test, y_pred)
print(f"\nAccuracy sur le dataset de test : {accuracy_test:.4f}")
print("\nRapport détaillé :")
print(classification_report(final_labels_test, y_pred, target_names=['Classe 0', 'Classe 1']))

In [None]:
# Matrice de confusion
plt.figure(figsize=(8, 6))
cm = confusion_matrix(final_labels_test, y_pred)
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=['Classe 0', 'Classe 1'],
    yticklabels=['Classe 0', 'Classe 1']
)
plt.xlabel('Prédictions')
plt.ylabel('Vrais Labels')
plt.title('Matrice de Confusion')
plt.show()

# % Pour la première matrice de confusion
plt.figure(figsize=(8, 6))
cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
sns.heatmap(
    cm_percentage,
    annot=True,
    fmt='.1f',  # Format avec 1 décimale
    cmap="Blues",
    xticklabels=['Classe 0', 'Classe 1'],
    yticklabels=['Classe 0', 'Classe 1']
)
plt.xlabel('Prédictions')
plt.ylabel('Vrais Labels')
plt.title('Matrice de Confusion (%)')
plt.show()

# % Pour la matrice de confusion (pour le total)
plt.figure(figsize=(8, 6))
total = cm.sum()
cm_percentage = (cm / total) * 100  # Division par le total global

sns.heatmap(
    cm_percentage,
    annot=True,
    fmt='.1f',  # Format avec 1 décimale
    cmap="Blues",
    xticklabels=['Classe 0', 'Classe 1'],
    yticklabels=['Classe 0', 'Classe 1']
)
plt.xlabel('Prédictions')
plt.ylabel('Vrais Labels')
plt.title('Matrice de Confusion (% du total)')
plt.show()

# Courbe ROC
fpr, tpr, thresholds = roc_curve(final_labels_test, y_pred_proba)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Taux de faux positifs')
plt.ylabel('Taux de vrais positifs')
plt.title('Courbe ROC')
plt.legend(loc="lower right")
plt.show()

# Optimisation du seuil
precisions = []
recalls = []
specificities = []
f1_scores = []
thresholds_metrics = []

for threshold in thresholds:
    y_pred_threshold = (y_pred_proba >= threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(final_labels_test, y_pred_threshold).ravel()

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    precisions.append(precision)
    recalls.append(recall)
    specificities.append(specificity)
    f1_scores.append(f1)
    thresholds_metrics.append(threshold)

# DataFrame des métriques
metrics_df = pd.DataFrame({
    'Threshold': thresholds_metrics,
    'Precision': precisions,
    'Recall': recalls,
    'Specificity': specificities,
    'F1': f1_scores
})

# Trouver le seuil optimal
min_specificity = 0.95  # Plus strict sur la spécificité (préserve classe 0)
min_recall_0 = 0.98    # Maintenir un bon recall pour la classe 0

valid_thresholds = metrics_df[
    (metrics_df['Specificity'] >= min_specificity) &
    ((1 - metrics_df['Precision']) <= 0.1)
]

if len(valid_thresholds) > 0:
    optimal_threshold = valid_thresholds.loc[valid_thresholds['Recall'].idxmax(), 'Threshold']
else:
    print("Aucun seuil ne satisfait les critères, utilisation du seuil par défaut 0.5")
    optimal_threshold = 0.5

print(f"\nSeuil optimal : {optimal_threshold:.3f}")

# Avant d'appliquer le seuil, affichons les métriques pour plusieurs seuils
test_thresholds = [0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, optimal_threshold]
print("\nComparaison des différents seuils :")
for threshold in test_thresholds:
    y_pred_test = (y_pred_proba >= threshold).astype(int)
    print(f"\nSeuil : {threshold:.3f}")
    print(classification_report(final_labels_test, y_pred_test, target_names=['Classe 0', 'Classe 1']))

# Appliquer le seuil optimal
y_pred_optimal = (y_pred_proba >= optimal_threshold).astype(int)

# Afficher les résultats avec le seuil optimal
print("\nRésultats avec le seuil optimal :")
print(classification_report(final_labels_test, y_pred_optimal, target_names=['Classe 0', 'Classe 1']))

# Nouvelle matrice de confusion avec le seuil optimal
plt.figure(figsize=(8, 6))
cm_optimal = confusion_matrix(final_labels_test, y_pred_optimal)
sns.heatmap(
    cm_optimal,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=['Classe 0', 'Classe 1'],
    yticklabels=['Classe 0', 'Classe 1']
)
plt.xlabel('Prédictions')
plt.ylabel('Vrais Labels')
plt.title('Matrice de Confusion (Seuil Optimal)')
plt.show()

# Matrice de confusion en pourcentage
plt.figure(figsize=(8, 6))
cm_optimal_percentage = cm_optimal.astype('float') / cm_optimal.sum(axis=1)[:, np.newaxis] * 100
sns.heatmap(
    cm_optimal_percentage,
    annot=True,
    fmt='.1f',
    cmap="Blues",
    xticklabels=['Classe 0', 'Classe 1'],
    yticklabels=['Classe 0', 'Classe 1']
)
plt.xlabel('Prédictions')
plt.ylabel('Vrais Labels')
plt.title('Matrice de Confusion avec Seuil Optimal (%)')
plt.show()

# Courbe d'évolution du recall
plt.figure(figsize=(10, 6))
plt.plot(thresholds_metrics, recalls, label='Recall', color='blue')
plt.axvline(x=optimal_threshold, color='red', linestyle='--', label='Seuil optimal')
plt.xlabel('Seuil')
plt.ylabel('Recall')
plt.title('Évolution du Recall en fonction du seuil')
plt.legend()
plt.grid(True)
plt.show()

# Courbe ROC avec seuil optimal
plt.figure(figsize=(8, 6))

# Tracer la courbe ROC
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')

# Trouver les coordonnées correspondantes au seuil optimal
optimal_idx = np.argmin(np.abs(thresholds - optimal_threshold))  # Index du seuil optimal
optimal_fpr = fpr[optimal_idx]
optimal_tpr = tpr[optimal_idx]

# Ajouter un point pour le seuil optimal
plt.scatter(optimal_fpr, optimal_tpr, color='red', label=f'Seuil optimal ({optimal_threshold:.2f})', zorder=5)

# Ajuster les limites et ajouter des labels
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Taux de faux positifs')
plt.ylabel('Taux de vrais positifs')
plt.title('Courbe ROC avec seuil optimal')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()
