In [12]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import confusion_matrix, roc_auc_score, classification_report
import matplotlib.pyplot as plt
import mlflow
import mlflow.pytorch

In [19]:
ARCH = 'baseline'  # 'baseline', 'wide', 'deep'
IMAGE_SIZE = 224   # doit correspondre à preprocessing.py et training
BATCH_SIZE = 32
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

BASE_DIR = os.path.dirname(os.getcwd())  # racine du projet
PROCESSED_DIR = os.path.join(BASE_DIR, "data", "processed")
MODEL_PATH = f"baseline_{ARCH}.pth"

In [20]:
images = np.load(os.path.join(PROCESSED_DIR, "images.npy"))
labels = np.load(os.path.join(PROCESSED_DIR, "labels.npy"))

# Ajouter le canal
if len(images.shape) == 3:  # si grayscale
    images = images[:, np.newaxis, :, :]
    
images_tensor = torch.tensor(images, dtype=torch.float32)
labels_tensor = torch.tensor(labels, dtype=torch.long)

dataset = TensorDataset(images_tensor, labels_tensor)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
_, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Nombre d’échantillons test : {test_size}")

Nombre d’échantillons test : 186


In [21]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=4, arch='baseline', image_size=224):
        super(SimpleCNN, self).__init__()
        self.arch = arch
        self.image_size = image_size
        
        if arch == 'baseline':
            self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
            self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        elif arch == 'wide':
            self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
            self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        elif arch == 'deep':
            self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
            self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
            self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        
        self.pool = nn.MaxPool2d(2,2)
        
        # Calcul automatique de la taille après convolutions + poolings
        def get_flatten_size():
            size = image_size
            size = size // 2  # pool1
            size = size // 2  # pool2
            if arch == 'deep':
                size = size // 2  # pool3
                return 64 * size * size
            else:
                return (32 if arch != 'wide' else 64) * size * size
        
        flatten_size = get_flatten_size()
        self.fc1 = nn.Linear(flatten_size, 128)
        self.fc2 = nn.Linear(128, num_classes)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        if self.arch == 'deep':
            x = F.relu(self.conv3(x))
            x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [22]:
full_model_path = os.path.join(BASE_DIR, MODEL_PATH)
if not os.path.exists(full_model_path):
    raise FileNotFoundError(f"Le fichier du modèle n'existe pas : {full_model_path}")

model = SimpleCNN(num_classes=4, arch=ARCH, image_size=IMAGE_SIZE).to(DEVICE)
model.load_state_dict(torch.load(full_model_path, map_location=DEVICE))
model.eval()
print(f"Modèle chargé avec succès ✅ : {full_model_path}")

Modèle chargé avec succès ✅ : c:\Users\USER\Desktop\ecg-classification\baseline_baseline.pth


  model.load_state_dict(torch.load(full_model_path, map_location=DEVICE))


In [None]:
all_targets, all_preds = [], []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_targets.extend(targets.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

all_targets = np.array(all_targets)
all_preds = np.array(all_preds)

# Matrice de confusion
cm = confusion_matrix(all_targets, all_preds)
test_acc = np.mean(all_preds == all_targets)

# Sensibilité et spécificité
TP = np.diag(cm)
FP = cm.sum(axis=0) - TP
FN = cm.sum(axis=1) - TP
TN = cm.sum() - (TP + FP + FN)

sensitivity = TP / (TP + FN + 1e-8)
specificity = TN / (TN + FP + 1e-8)

# AUC
try:
    from sklearn.preprocessing import label_binarize
    all_targets_bin = label_binarize(all_targets, classes=np.arange(4))
    all_preds_bin = label_binarize(all_preds, classes=np.arange(4))
    auc = roc_auc_score(all_targets_bin, all_preds_bin, multi_class='ovr')
except:
    auc = 0.0

# Affichage résultats
print(f"Test Accuracy: {test_acc:.4f}")
print(f"Sensibilité: {sensitivity}")
print(f"Spécificité: {specificity}")
print(f"AUC: {auc:.4f}")
print("Matrice de confusion:\n", cm)

Modèle chargé avec succès ✅


In [23]:
# -------------------------
# 8️⃣ Courbes Loss & Accuracy depuis MLflow
# -------------------------
import mlflow

# Connexion à l'expérience MLflow utilisée pendant l'entraînement
experiment_name = "ECG_Classification_Tuning"
mlflow.set_experiment(experiment_name)

# Récupérer les runs
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name(experiment_name)
runs = client.search_runs(experiment.experiment_id)

# Pour ce notebook, on prend le dernier run
if len(runs) == 0:
    print("Aucun run MLflow trouvé pour cet experiment")
else:
    last_run = runs[0]
    run_id = last_run.info.run_id

    # Récupérer les métriques par epoch
    import pandas as pd
    train_loss = client.get_metric_history(run_id, "train_loss")
    train_acc = client.get_metric_history(run_id, "train_accuracy")

    # Convertir en listes triées par step
    train_loss = sorted(train_loss, key=lambda x: x.step)
    train_acc = sorted(train_acc, key=lambda x: x.step)

    loss_values = [m.value for m in train_loss]
    acc_values = [m.value for m in train_acc]
    epochs = [m.step + 1 for m in train_loss]

    # Tracer
    plt.figure(figsize=(12,5))
    plt.subplot(1,2,1)
    plt.plot(epochs, loss_values, marker='o')
    plt.title("Train Loss depuis MLflow")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")

    plt.subplot(1,2,2)
    plt.plot(epochs, acc_values, marker='o', color='orange')
    plt.title("Train Accuracy depuis MLflow")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.show()


  return FileStore(store_uri, store_uri)
2025/12/09 21:33:14 INFO mlflow.tracking.fluent: Experiment with name 'ECG_Classification_Tuning' does not exist. Creating a new experiment.


Aucun run MLflow trouvé pour cet experiment
