## CNN (Convolutional Neural Networks)

Dans cette partie je vais essayer d'entrainer un CNN pour mon probleme de classification.

Dans un premier temps il va falloir créer un code pour arriver à utiliser correctement le dataset donnée

Pour standardisé les enregistrement ECG j'ai choisie d'utilisé un zéro padding (ajouté des 0 à la fin de l'enregistrement) pour éviter de supprimer des informations des ECG et pour ne pas rajouter des info qui pourrait fausser les résultats. Le désavantage de cette méthode étant que je me retrouve avec des enregistrement assez gros (18286).

In [12]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(device)

cpu


In [13]:
import pandas as pd
import numpy as np
import torch
import scipy.io
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader

# Charger les labels
labels_df = pd.read_csv('REFERENCE-v3.csv', header=None)
labels_df.columns = ['filename', 'label']

# Créer un mapping pour convertir les labels en entiers
label_mapping = {'N': 0, 'A': 1, 'O': 2, '~': 3}

# Appliquer la conversion des labels
labels_df['label'] = labels_df['label'].map(label_mapping)

if labels_df['label'].isnull().any():
    raise ValueError("Certains labels n'ont pas été convertis correctement.")


# Fonction pour charger un fichier ECG au format .mat et appliquer un zero padding si nécessaire
# 18286 correspondant a la taille maximale d'un enregistrement
def load_ecg(filename, target_length=18286):

    mat = scipy.io.loadmat(f'training2017/{filename}.mat')
    signal = mat['val'][0]
    
    # Appliquer le zero padding ou tronquer
    if len(signal) < target_length:
        pad_size = target_length - len(signal)
        signal = np.pad(signal, (0, pad_size), 'constant')
    else:
        signal = signal[:target_length]
    
    return signal


# Charger tous les signaux ECG et appliquer le zero padding
padded_signals = []
for filename in labels_df['filename']:
    signal = load_ecg(filename)
    padded_signals.append(signal)


# Convertir en tenseurs PyTorch
padded_signals = torch.tensor(padded_signals, dtype=torch.float32)
labels = torch.tensor(labels_df['label'].values, dtype=torch.long)

# Diviser les données en ensembles d'entraînement (80%) et de test (20%)
signals_train, signals_test, labels_train, labels_test = train_test_split(padded_signals, labels, test_size=0.2, random_state=42)


In [14]:
class ECGDataset(Dataset):
    def __init__(self, signals, labels):
        self.signals = signals
        self.labels = labels
    
    def __len__(self):
        return len(self.signals)
    
    def __getitem__(self, idx):
        signal = self.signals[idx].unsqueeze(0)  # Ajouter une dimension de canal pour CNN
        label = self.labels[idx]
        return signal, label

# Créer les datasets pour l'entraînement et le test
train_dataset = ECGDataset(signals_train, labels_train)
test_dataset = ECGDataset(signals_test, labels_test)

# Créer les DataLoaders pour l'entraînement et le test
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


On va ensuite devoir établir une architecture pour notre CNN, ici je suis partie sur quelque chose de simple avec 3 couche de convolution entrecoupé de couche de pooling et de reLu pour l'activation ainsi que 2 couches totalement connecté (Linear) à la fin, elles aussi entrecoupé de ReLu.

In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ECGCNN(nn.Module):
    def __init__(self, num_classes=4):
        super(ECGCNN, self).__init__()
        
        # Première couche convolutionnelle
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=7, stride=1, padding=3)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # Deuxième couche convolutionnelle
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)
        
        # Troisième couche convolutionnelle
        self.conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        
        # Fully connected layers (couches denses)
        self.fc1 = nn.Linear(64 * (18286 // 8), 128)  # La taille d'entrée dépend du nombre de couches de pooling
        self.fc2 = nn.Linear(128, num_classes)
    
    def forward(self, x):
        # Appliquer la première couche convolutionnelle avec activation et pooling
        x = self.pool(F.relu(self.conv1(x)))
        
        # Appliquer la deuxième couche convolutionnelle avec activation et pooling
        x = self.pool(F.relu(self.conv2(x)))
        
        # Appliquer la troisième couche convolutionnelle avec activation et pooling
        x = self.pool(F.relu(self.conv3(x)))
        
        # Aplatir les sorties de la dernière couche convolutionnelle pour les fully connected layers
        x = x.view(x.size(0), -1)
        
        # Appliquer la première couche fully connected avec ReLU
        x = F.relu(self.fc1(x))
        
        # Appliquer la couche fully connected finale pour la classification
        x = self.fc2(x)
        
        return x

# Initialiser le modèle
model = ECGCNN(num_classes=4)

# Afficher l'architecture du modèle
print(model)


ECGCNN(
  (conv1): Conv1d(1, 16, kernel_size=(7,), stride=(1,), padding=(3,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(16, 32, kernel_size=(5,), stride=(1,), padding=(2,))
  (conv3): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (fc1): Linear(in_features=146240, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=4, bias=True)
)


In [16]:
import torch.optim as optim

def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()  # Mettre le modèle en mode "entraînement"
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        for signals, labels in train_loader:
            # Envoyer les données au bon appareil (GPU si disponible)
            signals = signals.to(device)
            labels = labels.to(device)

            # Réinitialiser les gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(signals)
            loss = criterion(outputs, labels)
            
            # Backward pass (calculer les gradients)
            loss.backward()
            
            # Mise à jour des poids
            optimizer.step()
            
            # Accumuler la perte
            running_loss += loss.item()
        
        # Afficher la perte moyenne pour chaque époque
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
    
    print('Finished Training')


In [17]:
def evaluate_model(model, test_loader, device):
    model.eval()  # Mettre le modèle en mode "évaluation"
    correct = 0
    total = 0
    
    with torch.no_grad():  # Désactiver la calcul des gradients (inutiles en mode évaluation)
        for signals, labels in test_loader:
            signals = signals.to(device)
            labels = labels.to(device)
            
            # Forward pass (prédictions)
            outputs = model(signals)
            _, predicted = torch.max(outputs.data, 1)
            
            # Calculer le nombre de prédictions correctes
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    # Afficher la précision (accuracy)
    accuracy = 100 * correct / total
    print(f'Accuracy: {accuracy:.2f}%')


In [18]:
# Initialiser le modèle, la fonction de perte et l'optimiseur
# Initialiser le modèle, la fonction de perte et l'optimiseur
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ECGCNN(num_classes=4).to(device)

criterion = nn.CrossEntropyLoss()  # Fonction de perte pour la classification
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Optimiseur Adam

# Entraîner le modèle
train_model(model, train_loader, criterion, optimizer, num_epochs=10)





Epoch [1/10], Loss: 6.5024
Epoch [2/10], Loss: 0.8938
Epoch [3/10], Loss: 0.6780
Epoch [4/10], Loss: 0.4289
Epoch [5/10], Loss: 0.2320
Epoch [6/10], Loss: 0.1188
Epoch [7/10], Loss: 0.0794
Epoch [8/10], Loss: 0.0644
Epoch [9/10], Loss: 0.0679
Epoch [10/10], Loss: 0.0551
Finished Training


TypeError: evaluate_model() missing 1 required positional argument: 'device'

In [19]:
# Évaluer le modèle
evaluate_model(model, test_loader, device)

Accuracy: 52.34%
