# Tous les modèles

Modèle de base (LSTM)

In [None]:
# Création du modèle
def create_model():
    model = Sequential([
        LSTM(64, input_shape=(187, 1), return_sequences=False),
        Dense(5, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.0005),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
# Création du modèle
def create_model2():
    model = Sequential([
        LSTM(64, input_shape=(187, 1), return_sequences=False),
        Dense(32, activation='relu'),
        Dense(5, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
# Création du modèle
def create_model3():
    model = Sequential([
        LSTM(128, input_shape=(187, 1), return_sequences=True),
        LSTM(64),
        Dense(32, activation='relu'),
        Dense(5, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

Introduction de la data augmentation et deux modèle

In [None]:
def augment_data(X_data, y_data, target_class, augmentation_factor):
    """Augmentation améliorée des données"""
    class_indices = np.where(y_data == target_class)[0]
    X_class = X_data[class_indices]
    augmented_X = []
    augmented_y = []

    for signal in X_class:
        for _ in range(augmentation_factor):
            # Combinaison de plusieurs techniques
            augmented_signal = signal.copy()

            # Bruit gaussien avec amplitude variable
            noise_level = np.random.uniform(0.01, 0.03)
            augmented_signal += np.random.normal(0, noise_level, signal.shape)

            # Décalage temporel aléatoire
            shift = np.random.randint(-10, 10)
            augmented_signal = np.roll(augmented_signal, shift, axis=0)

            # Scaling amplitude
            scale = np.random.uniform(0.9, 1.1)
            augmented_signal *= scale

            # Inversion temporelle aléatoire
            if np.random.random() > 0.5:
                augmented_signal = augmented_signal[::-1]

            augmented_X.append(augmented_signal)
            augmented_y.append(target_class)

    return np.array(augmented_X), np.array(augmented_y)

def balance_classes(X_data, y_data):
    """Version améliorée de l'équilibrage des classes"""
    class_counts = np.bincount(y_data)
    target_count = np.max(class_counts)

    # Stockage des données augmentées
    all_augmented_X = []
    all_augmented_y = []

    for class_label in range(len(class_counts)):
        if class_counts[class_label] < target_count:
            needed = target_count - class_counts[class_label]

            # Calculer le facteur d'augmentation
            base_count = class_counts[class_label]
            augmentation_factor = int(np.ceil(needed / base_count))

            # Générer plus de données que nécessaire
            X_aug, y_aug = augment_data(
                X_data, y_data,
                class_label, augmentation_factor
            )

            # Sélectionner le nombre exact nécessaire
            indices = np.random.choice(len(X_aug), needed, replace=False)
            all_augmented_X.append(X_aug[indices])
            all_augmented_y.append(y_aug[indices])

    if len(all_augmented_X) > 0:
        X_balanced = np.concatenate([X_data] + all_augmented_X)
        y_balanced = np.concatenate([y_data] + all_augmented_y)

        # Mélanger les données
        indices = np.random.permutation(len(X_balanced))
        X_balanced = X_balanced[indices]
        y_balanced = y_balanced[indices]

        return X_balanced, y_balanced

    return X_data, y_data

In [None]:
def train_binary_model(X_train_binary, y_train_binary):
    model = Sequential([
        LSTM(100, input_shape=(187, 1), return_sequences=False),
        Dense(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    # Callbacks
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True
    )

    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=3,
        min_lr=0.00001,
        verbose=1
    )

    history = model.fit(
        X_train_binary, y_train_binary,
        validation_split=0.15,
        epochs=100,
        batch_size=32,
        callbacks=[early_stopping, reduce_lr],
        verbose=1
    )

    return model, history

In [None]:
def train_anomaly_model(X_train_anomaly, y_train_anomaly):
    model = Sequential([
        LSTM(100, input_shape=(187, 1), return_sequences=False),
        Dense(32, activation='relu'),
        Dense(4, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy'],
        # Ajouter :
        loss_weights=None,
        weighted_metrics=None
    )

    # Callbacks
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True
    )

    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=3,
        min_lr=0.00001,
        verbose=1
    )

    history = model.fit(
        X_train_anomaly, y_train_anomaly,
        validation_split=0.15,
        epochs=100,
        batch_size=32,
        callbacks=[early_stopping, reduce_lr],
        verbose=1
    )

    return model, history

LSTM avec attention

In [None]:
def create_binary_model():
    """Crée le modèle binaire avec attention et régularisation"""
    inputs = Input(shape=(187, 1))
    lstm_out = LSTM(64, return_sequences=True,
                   kernel_regularizer=l2(0.01))(inputs)

    attention = Dense(1, activation='tanh')(lstm_out)
    attention_weights = Activation('softmax')(attention)
    context = Multiply()([lstm_out, attention_weights])
    context = Lambda(lambda x: K.sum(x, axis=1))(context)

    dense = Dense(32, activation='relu')(context)
    outputs = Dense(1, activation='sigmoid')(dense)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
def create_anomaly_model():
    """Crée le modèle d'anomalies avec attention et régularisation"""
    inputs = Input(shape=(187, 1))
    lstm_out = LSTM(64, return_sequences=True,
                   kernel_regularizer=l2(0.01))(inputs)

    attention = Dense(1, activation='tanh')(lstm_out)
    attention_weights = Activation('softmax')(attention)
    context = Multiply()([lstm_out, attention_weights])
    context = Lambda(lambda x: K.sum(x, axis=1))(context)

    dense = Dense(32, activation='relu')(context)
    outputs = Dense(4, activation='softmax')(dense)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

Introduction du GRU

In [None]:
def create_binary_model():
    """Version simplifiée sans attention"""
    inputs = Input(shape=(187, 1))

    # GRU simple avec dropout
    x = GRU(64, return_sequences=False,
            kernel_regularizer=l2(0.01),
            recurrent_dropout=0.2)(inputs)

    # Couches denses avec dropout
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

def create_anomaly_model():
    """Version simplifiée sans attention"""
    inputs = Input(shape=(187, 1))

    # GRU simple avec dropout
    x = GRU(64, return_sequences=False,
            kernel_regularizer=l2(0.01),
            recurrent_dropout=0.2)(inputs)

    # Couches denses avec dropout
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(4, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

GRU avec attention

In [None]:
def create_binary_model():
    """Modèle binaire avec GRU et Transformer"""
    inputs = Input(shape=(187, 1))

    # GRU Layer
    gru_out = GRU(64, return_sequences=True,
                  kernel_regularizer=l2(0.01))(inputs)

    # Transformer Encoder Layer
    attention_out = MultiHeadAttention(num_heads=4, key_dim=64)(gru_out, gru_out)
    attention_out = Add()([gru_out, attention_out])
    attention_out = LayerNormalization()(attention_out)

    # FFN
    ffn_out = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(attention_out)
    ffn_out = Dropout(0.3)(ffn_out)
    ffn_out = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(ffn_out)
    ffn_out = Add()([attention_out, ffn_out])
    ffn_out = LayerNormalization()(ffn_out)

    # Pooling
    global_pooling = Lambda(lambda x: tf.reduce_mean(x, axis=1))(ffn_out)

    # Dense layers
    dense = Dense(32, activation='relu', kernel_regularizer=l2(0.005))(global_pooling)
    dense = Dropout(0.3)(dense)
    outputs = Dense(1, activation='sigmoid')(dense)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

def create_anomaly_model():
    inputs = Input(shape=(187, 1))

    # GRU avec optimisations
    gru_out = GRU(64, return_sequences=True,
                  kernel_regularizer=l2(0.01),
                  implementation=2,  # Implémentation plus rapide
                  reset_after=True)(inputs)

    # Réduire la complexité du réseau
    dense = Dense(32, activation='relu')(gru_out)
    outputs = Dense(4, activation='softmax')(dense)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

Optimisation du GRU et transformers

In [None]:
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

In [None]:
# Pipeline optimisé pour les données
def prepare_tf_data(X, y, batch_size=128):
    dataset = tf.data.Dataset.from_tensor_slices((X, y))
    dataset = dataset.shuffle(buffer_size=1024).batch(batch_size).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

In [None]:
# Préparation des datasets optimisés
train_dataset = prepare_tf_data(X_train, y_train)
val_dataset = prepare_tf_data(X_test, y_test)

Modèle Transformers

In [None]:
def create_binary_model():
    """Modèle binaire avec Transformer"""
    inputs = Input(shape=(187, 1))

    # Projection initiale
    x = Dense(32)(inputs)

    # Transformer avec une seule couche d'attention
    attention = MultiHeadAttention(num_heads=4, key_dim=32)(x, x)
    attention = Add()([x, attention])
    attention = LayerNormalization()(attention)

    # Feed Forward Network
    ffn = Dense(64, activation='relu')(attention)
    ffn = Dropout(0.2)(ffn)
    ffn = Dense(32)(ffn)
    ffn = Add()([attention, ffn])
    ffn = LayerNormalization()(ffn)

    # Pooling et classification
    x = tf.reduce_mean(ffn, axis=1)
    x = Dense(32, activation='relu')(x)
    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

def create_anomaly_model():
    """Modèle anomalies avec Transformer"""
    inputs = Input(shape=(187, 1))

    # Projection initiale
    x = Dense(32)(inputs)

    # Premier bloc Transformer
    attention = MultiHeadAttention(num_heads=4, key_dim=32)(x, x)
    attention = Add()([x, attention])
    x = LayerNormalization()(attention)

    # Feed Forward Network
    ffn = Dense(64, activation='relu')(x)
    ffn = Dropout(0.2)(ffn)
    ffn = Dense(32)(ffn)
    x = Add()([x, ffn])
    x = LayerNormalization()(x)

    # Pooling et classification
    x = tf.reduce_mean(x, axis=1)
    x = Dense(32, activation='relu')(x)
    outputs = Dense(4, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

Modèle transformers et gru

In [None]:
def transformer_encoder(inputs, head_size=64, num_heads=4, ff_dim=128, dropout=0.1):
    """
    Bloc Transformer Encoder avec attention multi-têtes et connexion résiduelle.
    """
    attention = MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs)
    attention = Dropout(dropout)(attention)
    x = Add()([attention, inputs])  # Connexion résiduelle
    x = LayerNormalization(epsilon=1e-6)(x)

    ff = Dense(ff_dim, activation="relu")(x)
    ff = Dropout(dropout)(ff)
    ff = Dense(inputs.shape[-1])(ff)
    x = Add()([ff, x])  # Connexion résiduelle
    x = LayerNormalization(epsilon=1e-6)(x)

    return x

def attention_layer(inputs):
    """
    Mécanisme d'attention classique.
    """
    attention_weights = Dense(1, activation="tanh")(inputs)
    attention_weights = Dense(1, activation="softmax", name="attention_weights")(attention_weights)
    context_vector = Lambda(lambda x: K.sum(x, axis=1))(inputs * attention_weights)
    return context_vector

def create_combined_model(input_shape, num_classes, head_size=64, num_heads=4, ff_dim=128, num_transformer_blocks=2, dropout=0.1):
    """
    Modèle combiné Transformers et Attention.
    """
    inputs = Input(shape=input_shape)

    # Blocs Transformers
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    # Mécanisme d'attention
    attention_output = attention_layer(x)

    # Dense Layers
    dense = Dense(64, activation="relu")(attention_output)
    dense = Dropout(0.2)(dense)
    outputs = Dense(num_classes, activation="softmax" if num_classes > 1 else "sigmoid")(dense)

    model = Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="sparse_categorical_crossentropy" if num_classes > 1 else "binary_crossentropy",
        metrics=["accuracy"]
    )
    return model

# Code Utile

Retirer les 0 et normaliser le tous

In [None]:
def trim_trailing_zeros_from_signal(signal):
    """Retire les zéros à la fin d'un signal ECG"""
    signal = np.array(signal)
    last_nonzero = -1
    for i in reversed(range(len(signal))):
        if abs(signal[i]) > 1e-10:
            last_nonzero = i
            break
    return signal[:last_nonzero + 1]

def normalize_signal_length(signal, target_length=187):
    current_length = len(signal)
    if current_length < target_length:
        # Utiliser l'interpolation au lieu du padding avec des zéros
        x_current = np.linspace(0, 1, current_length)
        x_target = np.linspace(0, 1, target_length)
        return np.interp(x_target, x_current, signal)
    return signal[:target_length]

Code pour faire la séparation, pour les deux modèles

In [None]:
def generate_binary_data(df):
    """Génère les données pour classification binaire"""
    ecgs, labels = [], []
    for _, row in tqdm(df.iterrows(), total=len(df)):
        # Nettoyage du signal
        signal = row.iloc[0:187].values
        cleaned_signal = trim_trailing_zeros_from_signal(signal)
        normalized_signal = normalize_signal_length(cleaned_signal)

        ecgs.append(normalized_signal)
        labels.append(1 if int(row.iloc[187]) != 0 else 0)
    return np.array(ecgs), np.array(labels)

def prepare_anomaly_data(train_df, test_df):
    """Prépare les données pour le modèle d'anomalies"""
    X_train_anomaly, y_train_anomaly = [], []
    X_test_anomaly, y_test_anomaly = [], []

    # Traitement des données d'entraînement
    for _, row in tqdm(train_df.iterrows(), total=len(train_df)):
        if int(row.iloc[187]) != 0:
            # Nettoyage du signal
            signal = row.iloc[0:187].values
            cleaned_signal = trim_trailing_zeros_from_signal(signal)
            normalized_signal = normalize_signal_length(cleaned_signal)

            X_train_anomaly.append(normalized_signal)
            label = int(row.iloc[187]) - 1 if int(row.iloc[187]) != 4 else 3
            y_train_anomaly.append(label)

    # Traitement des données de test
    for _, row in tqdm(test_df.iterrows(), total=len(test_df)):
        if int(row.iloc[187]) != 0:
            signal = row.iloc[0:187].values
            cleaned_signal = trim_trailing_zeros_from_signal(signal)
            normalized_signal = normalize_signal_length(cleaned_signal)

            X_test_anomaly.append(normalized_signal)
            label = int(row.iloc[187]) - 1 if int(row.iloc[187]) != 4 else 3
            y_test_anomaly.append(label)

    # Conversion et normalisation
    X_train_anomaly = np.array(X_train_anomaly)
    X_test_anomaly = np.array(X_test_anomaly)
    y_train_anomaly = np.array(y_train_anomaly)
    y_test_anomaly = np.array(y_test_anomaly)

    # Normalisation
    scaler = StandardScaler()
    X_train_anomaly = scaler.fit_transform(X_train_anomaly)
    X_test_anomaly = scaler.transform(X_test_anomaly)

    # Reshape pour LSTM
    X_train_anomaly = X_train_anomaly.reshape(X_train_anomaly.shape[0], X_train_anomaly.shape[1], 1)
    X_test_anomaly = X_test_anomaly.reshape(X_test_anomaly.shape[0], X_test_anomaly.shape[1], 1)

    return X_train_anomaly, X_test_anomaly, y_train_anomaly, y_test_anomaly

Vérification que l'augmentation fonctionne

In [None]:
def verify_augmentation():
    """Vérifie la data augmentation et sa distribution"""
    # Afficher la distribution avant augmentation
    print("=== Distribution avant augmentation ===")
    unique, counts = np.unique(y_train_anomaly, return_counts=True)
    for u, c in zip(unique, counts):
        print(f"Classe {u}: {c} échantillons")

    # Afficher la distribution après augmentation
    print("\n=== Distribution après augmentation ===")
    unique, counts = np.unique(y_train_anomaly_balanced, return_counts=True)
    for u, c in zip(unique, counts):
        print(f"Classe {u}: {c} échantillons")

    # Visualiser un exemple de signal original et augmenté pour la classe 2
    plt.figure(figsize=(15, 5))

    # Signal original de la classe 2
    idx_original = np.where(y_train_anomaly == 2)[0][0]
    plt.subplot(121)
    plt.plot(X_train_anomaly[idx_original])
    plt.title("Signal original - Classe 2")

    # Signal augmenté de la classe 2
    idx_augmented = np.where(y_train_anomaly_balanced == 2)[0][-1]
    plt.subplot(122)
    plt.plot(X_train_anomaly_balanced[idx_augmented])
    plt.title("Signal augmenté - Classe 2")
    plt.show()

Vérification que les 0 des signaux ont bien été supprimé

In [None]:
def verify_cleaning():
    """Vérifie le nettoyage des signaux"""
    train_df = pd.read_csv(train_path)

    # Exemple avec un signal
    original_signal = train_df.iloc[0, 0:187].values
    cleaned_signal = trim_trailing_zeros_from_signal(original_signal)
    normalized_signal = normalize_signal_length(cleaned_signal)

    # Visualisation
    plt.figure(figsize=(15, 5))

    plt.subplot(131)
    plt.plot(original_signal)
    plt.title("Signal original")

    plt.subplot(132)
    plt.plot(cleaned_signal)
    plt.title("Signal sans zéros trailing")

    plt.subplot(133)
    plt.plot(normalized_signal)
    plt.title("Signal normalisé")

    plt.tight_layout()
    plt.show()

    print(f"Longueur originale: {len(original_signal)}")
    print(f"Longueur après nettoyage: {len(cleaned_signal)}")
    print(f"Longueur après normalisation: {len(normalized_signal)}")

Voir la distribution des classes

In [None]:
def plot_class_distribution(original_labels, augmented_labels, label_names=None):
    """
    Compare la distribution des classes avant et après augmentation.

    :param original_labels: Labels originaux (avant augmentation)
    :param augmented_labels: Labels après augmentation
    :param label_names: Liste des noms des classes (optionnel)
    """
    original_counts = np.bincount(original_labels)
    augmented_counts = np.bincount(augmented_labels)

    x = np.arange(len(original_counts))

    plt.figure(figsize=(12, 6))

    # Distribution originale
    plt.bar(x - 0.2, original_counts, width=0.4, label='Original', color='blue', alpha=0.7)

    # Distribution après augmentation
    plt.bar(x + 0.2, augmented_counts, width=0.4, label='Augmented', color='orange', alpha=0.7)

    # Ajout des labels
    if label_names:
        plt.xticks(x, label_names, rotation=45)
    else:
        plt.xticks(x, [f'Classe {i}' for i in x])

    plt.title("Distribution des classes avant et après augmentation")
    plt.xlabel("Classes")
    plt.ylabel("Nombre d'échantillons")
    plt.legend()
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.show()

In [None]:
def plot_anomaly_class_distribution(original_labels, augmented_labels, label_names=None):
    """
    Compare la distribution des classes d'anomalies (malades) avant et après augmentation.

    :param original_labels: Labels originaux (avant augmentation)
    :param augmented_labels: Labels après augmentation
    :param label_names: Liste des noms des classes (optionnel)
    """
    original_counts = np.bincount(original_labels)
    augmented_counts = np.bincount(augmented_labels)

    x = np.arange(len(original_counts))

    plt.figure(figsize=(12, 6))

    # Distribution originale
    plt.bar(x - 0.2, original_counts, width=0.4, label='Original', color='blue', alpha=0.7)

    # Distribution après augmentation
    plt.bar(x + 0.2, augmented_counts, width=0.4, label='Augmented', color='orange', alpha=0.7)

    # Ajout des labels
    if label_names:
        plt.xticks(x, label_names, rotation=45)
    else:
        plt.xticks(x, [f'Classe {i+1}' for i in x])  # Les classes des anomalies

    plt.title("Distribution des classes d'anomalies avant et après augmentation")
    plt.xlabel("Classes d'anomalies")
    plt.ylabel("Nombre d'échantillons")
    plt.legend()
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.show()

Évaluer le modèle

In [None]:
def evaluate_model(model, X_test, y_test, is_binary=True):
    """Évalue les performances du modèle"""
    y_pred = model.predict(X_test)

    if is_binary:
        y_pred_classes = (y_pred > 0.5).astype(int)
    else:
        y_pred_classes = np.argmax(y_pred, axis=1)

    print("\nRapport de classification:")
    print(classification_report(y_test, y_pred_classes))

    # Matrice de confusion
    plt.figure(figsize=(10, 8))
    cm = confusion_matrix(y_test, y_pred_classes)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Matrice de confusion')
    plt.ylabel('Vraie classe')
    plt.xlabel('Classe prédite')
    plt.show()

ROC

In [None]:
# Fonction pour tracer la courbe ROC
def plot_roc_curve(y_test, y_pred_proba):
    n_classes = 5
    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve((y_test == i).astype(int), y_pred_proba[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    plt.figure(figsize=(10, 8))
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], label=f'Classe {i} (AUC = {roc_auc[i]:.2f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('Taux de faux positifs')
    plt.ylabel('Taux de vrais positifs')
    plt.title('Courbe ROC')
    plt.legend()
    plt.show()

Voir les résultats détaillés



In [None]:
# Métriques détaillées sur le jeu de test
label_mapping = {
    0: "Normal",
    1: "Artial Premature",
    2: "Premature ventricular contraction",
    3: "Fusion of ventricular and normal",
    4: "Fusion of paced and normal"
}
cm = confusion_matrix(y_test, y_pred_classes)
roc_scores = {}
for i in range(5):
    roc_scores[i] = roc_auc_score((y_test == i).astype(int), y_pred[:, i])

# Affichage des résultats
print("\nRésultats sur le jeu de test:")
print("\nMatrice de Confusion:")
print(pd.DataFrame(
    cm,
    index=[label_mapping[i] for i in range(5)],
    columns=[label_mapping[i] for i in range(5)]
))

print("\nScores ROC AUC par classe:")
print(pd.DataFrame({
    'Classe': [label_mapping[i] for i in range(5)],
    'Score ROC AUC': [roc_scores[i] for i in range(5)]
}))

print("\nRapport de classification détaillé:")
print(classification_report(y_test, y_pred_classes,
                          target_names=[label_mapping[i] for i in range(5)]))

# Code final implémentant toutes les fonctionnalités

In [None]:
# Importations nécessaires
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GRU, Dense, Dropout, Input, Lambda, LayerNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras import mixed_precision
from tensorflow.keras.layers import MultiHeadAttention, Dense, Dropout, LayerNormalization, Add, Input, Flatten
from tensorflow.keras.models import Model

# Configuration GPU et mixed precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Fonctions pour préparer et nettoyer les données
def trim_trailing_zeros_from_signal(signal):
    """Retire les zéros à la fin d'un signal ECG."""
    signal = np.array(signal)
    last_nonzero = -1
    for i in reversed(range(len(signal))):
        if abs(signal[i]) > 1e-10:
            last_nonzero = i
            break
    return signal[:last_nonzero + 1]

def normalize_signal_length(signal, target_length=187):
    """Normalise la longueur d'un signal."""
    current_length = len(signal)
    if current_length < target_length:
        x_current = np.linspace(0, 1, current_length)
        x_target = np.linspace(0, 1, target_length)
        return np.interp(x_target, x_current, signal)
    return signal[:target_length]

def prepare_data(train_path, test_path):
    """Charge et prépare les données d'entraînement et de test."""
    train_df = pd.read_csv(train_path)
    test_df = pd.read_csv(test_path)

    X_train, y_train = generate_binary_data(train_df)
    X_test, y_test = generate_binary_data(test_df)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

    return X_train, X_test, y_train, y_test

def generate_binary_data(df):
    """Génère les données pour classification binaire."""
    ecgs, labels = [], []
    for _, row in tqdm(df.iterrows(), total=len(df)):
        signal = row.iloc[0:187].values
        cleaned_signal = trim_trailing_zeros_from_signal(signal)
        normalized_signal = normalize_signal_length(cleaned_signal)
        ecgs.append(normalized_signal)
        labels.append(1 if int(row.iloc[187]) != 0 else 0)
    return np.array(ecgs), np.array(labels)

def prepare_anomaly_data(train_df, test_df):
    """Prépare les données pour le modèle d'anomalies"""
    X_train_anomaly, y_train_anomaly = [], []
    X_test_anomaly, y_test_anomaly = [], []

    # Traitement des données d'entraînement
    for _, row in tqdm(train_df.iterrows(), total=len(train_df)):
        if int(row.iloc[187]) != 0:
            # Nettoyage du signal
            signal = row.iloc[0:187].values
            cleaned_signal = trim_trailing_zeros_from_signal(signal)
            normalized_signal = normalize_signal_length(cleaned_signal)

            X_train_anomaly.append(normalized_signal)
            label = int(row.iloc[187]) - 1 if int(row.iloc[187]) != 4 else 3
            y_train_anomaly.append(label)

    # Traitement des données de test
    for _, row in tqdm(test_df.iterrows(), total=len(test_df)):
        if int(row.iloc[187]) != 0:
            signal = row.iloc[0:187].values
            cleaned_signal = trim_trailing_zeros_from_signal(signal)
            normalized_signal = normalize_signal_length(cleaned_signal)

            X_test_anomaly.append(normalized_signal)
            label = int(row.iloc[187]) - 1 if int(row.iloc[187]) != 4 else 3
            y_test_anomaly.append(label)

    # Conversion et normalisation
    X_train_anomaly = np.array(X_train_anomaly)
    X_test_anomaly = np.array(X_test_anomaly)
    y_train_anomaly = np.array(y_train_anomaly)
    y_test_anomaly = np.array(y_test_anomaly)

    # Normalisation
    scaler = StandardScaler()
    X_train_anomaly = scaler.fit_transform(X_train_anomaly)
    X_test_anomaly = scaler.transform(X_test_anomaly)

    # Reshape pour LSTM
    X_train_anomaly = X_train_anomaly.reshape(X_train_anomaly.shape[0], X_train_anomaly.shape[1], 1)
    X_test_anomaly = X_test_anomaly.reshape(X_test_anomaly.shape[0], X_test_anomaly.shape[1], 1)

    return X_train_anomaly, X_test_anomaly, y_train_anomaly, y_test_anomaly

In [None]:
def transformer_encoder(inputs, head_size=64, num_heads=4, ff_dim=128, dropout=0.1):
    """
    Implémente un encodeur Transformer.

    Args:
        inputs: Entrée du modèle.
        head_size: Dimension des têtes d'attention.
        num_heads: Nombre de têtes d'attention.
        ff_dim: Dimension du réseau feed-forward.
        dropout: Taux de dropout.

    Returns:
        La sortie de l'encodeur Transformer.
    """
    # Attention multi-têtes
    attention = MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs)
    attention = Dropout(dropout)(attention)
    x = Add()([attention, inputs])  # Connexion résiduelle
    x = LayerNormalization(epsilon=1e-6)(x)

    # Réseau feed-forward
    ff = Dense(ff_dim, activation="relu")(x)
    ff = Dropout(dropout)(ff)
    ff = Dense(inputs.shape[-1])(ff)
    x = Add()([ff, x])  # Connexion résiduelle
    x = LayerNormalization(epsilon=1e-6)(x)

    return x

def create_transformer_model(input_shape, num_classes, head_size=64, num_heads=2, ff_dim=128, num_transformer_blocks=2, dropout=0.1):
    """
    Crée un modèle Transformers pour la classification.

    Args:
        input_shape: Forme des données d'entrée.
        num_classes: Nombre de classes en sortie.
        head_size: Dimension des têtes d'attention.
        num_heads: Nombre de têtes d'attention.
        ff_dim: Dimension du réseau feed-forward.
        num_transformer_blocks: Nombre de blocs Transformers.
        dropout: Taux de dropout.

    Returns:
        Modèle Keras.
    """
    inputs = Input(shape=input_shape)
    x = inputs

    # Ajouter plusieurs blocs Transformers
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    # Pooling global (moyenne des features)
    x = Flatten()(x)

    # Couche dense finale
    outputs = Dense(num_classes, activation="softmax" if num_classes > 1 else "sigmoid")(x)

    model = Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="sparse_categorical_crossentropy" if num_classes > 1 else "binary_crossentropy",
        metrics=["accuracy"]
    )
    return model

In [None]:
def augment_data(X_data, y_data, target_class, augmentation_factor):
    """Augmentation améliorée des données"""
    class_indices = np.where(y_data == target_class)[0]
    X_class = X_data[class_indices]
    augmented_X = []
    augmented_y = []

    for signal in X_class:
        for _ in range(augmentation_factor):
            # Combinaison de plusieurs techniques
            augmented_signal = signal.copy()

            # Bruit gaussien avec amplitude variable
            noise_level = np.random.uniform(0.01, 0.03)
            augmented_signal += np.random.normal(0, noise_level, signal.shape)

            # Décalage temporel aléatoire
            shift = np.random.randint(-10, 10)
            augmented_signal = np.roll(augmented_signal, shift, axis=0)

            # Scaling amplitude
            scale = np.random.uniform(0.9, 1.1)
            augmented_signal *= scale

            # Inversion temporelle aléatoire
            if np.random.random() > 0.5:
                augmented_signal = augmented_signal[::-1]

            augmented_X.append(augmented_signal)
            augmented_y.append(target_class)

    return np.array(augmented_X), np.array(augmented_y)

def balance_classes(X_data, y_data):
    """Version améliorée de l'équilibrage des classes"""
    class_counts = np.bincount(y_data)
    target_count = np.max(class_counts)

    # Stockage des données augmentées
    all_augmented_X = []
    all_augmented_y = []

    for class_label in range(len(class_counts)):
        if class_counts[class_label] < target_count:
            needed = target_count - class_counts[class_label]

            # Calculer le facteur d'augmentation
            base_count = class_counts[class_label]
            augmentation_factor = int(np.ceil(needed / base_count))

            # Générer plus de données que nécessaire
            X_aug, y_aug = augment_data(
                X_data, y_data,
                class_label, augmentation_factor
            )

            # Sélectionner le nombre exact nécessaire
            indices = np.random.choice(len(X_aug), needed, replace=False)
            all_augmented_X.append(X_aug[indices])
            all_augmented_y.append(y_aug[indices])

    if len(all_augmented_X) > 0:
        X_balanced = np.concatenate([X_data] + all_augmented_X)
        y_balanced = np.concatenate([y_data] + all_augmented_y)

        # Mélanger les données
        indices = np.random.permutation(len(X_balanced))
        X_balanced = X_balanced[indices]
        y_balanced = y_balanced[indices]

        return X_balanced, y_balanced

    return X_data, y_data

In [None]:
# Pipeline optimisé pour les données
def prepare_tf_data(X, y, batch_size=128):
    dataset = tf.data.Dataset.from_tensor_slices((X, y))
    dataset = dataset.shuffle(buffer_size=1024).batch(batch_size).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

In [None]:
# Fonction d'entraînement avec callbacks
def train_model(model, train_dataset, val_dataset, epochs=20):
    callbacks = [
        EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True),
        ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, verbose=1)
    ]
    history = model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=epochs,
        callbacks=callbacks,
        verbose=1
    )
    return history

In [None]:
# Évaluation et rapport des résultats
def evaluate_model(model, test_dataset, is_binary=True):
    y_pred = []
    y_true = []

    for X_batch, y_batch in test_dataset:
        y_pred.extend(model.predict(X_batch))
        y_true.extend(y_batch.numpy())

    y_pred_classes = (np.array(y_pred) > 0.5).astype(int) if is_binary else np.argmax(y_pred, axis=1)
    print("\nRapport de classification:")
    print(classification_report(y_true, y_pred_classes))

    cm = confusion_matrix(y_true, y_pred_classes)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Matrice de confusion')
    plt.ylabel('Vraie classe')
    plt.xlabel('Classe prédite')
    plt.show()

In [None]:
# Modèle Transformers pour la classification binaire
transformer_binary_model = create_transformer_model(
    input_shape=(187, 1),
    num_classes=1,
    head_size=32,
    num_heads=2,
    ff_dim=64,
    num_transformer_blocks=2,
    dropout=0.1
)

# Entraînement du modèle binaire
binary_transformer_history = train_model(
    transformer_binary_model,
    train_dataset,
    val_dataset,
    epochs=20
)

# Évaluation du modèle Transformers (binaire)
print("\nÉvaluation du modèle Transformers (binaire) :")
evaluate_model(transformer_binary_model, val_dataset, is_binary=True)

In [None]:
# Modèle Transformers pour les anomalies
transformer_anomaly_model = create_transformer_model(
    input_shape=(187, 1),
    num_classes=4,
    head_size=32,
    num_heads=2,
    ff_dim=64,
    num_transformer_blocks=2,
    dropout=0.1
)

# Entraînement du modèle pour les anomalies
anomaly_transformer_history = train_model(
    transformer_anomaly_model,
    anomaly_dataset,
    val_anomaly_dataset,
    epochs=20
)

# Évaluation du modèle Transformers (anomalies)
print("\nÉvaluation du modèle Transformers (anomalies) :")
evaluate_model(transformer_anomaly_model, val_anomaly_dataset, is_binary=False)
