In [2]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
from scipy.signal import butter, lfilter

### Data Preprocessing

In [None]:
def bandpass_filter(data, lowcut=1.0, highcut=40.0, fs=256, order=5):
    """Applique un filtre passe-bande pour ne garder que les fréquences utiles (1-40 Hz)."""
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, data, axis=0)

def normalize_eeg(data):
    """Normalise les EEG en appliquant un Z-score pour stabiliser les valeurs."""
    return (data - np.mean(data, axis=0)) / np.std(data, axis=0)

def preprocess_eeg(data, fs=256):
    """Pipeline complet de prétraitement des signaux EEG."""
    data = bandpass_filter(data, lowcut=1.0, highcut=40.0, fs=fs)  # Filtrage
    data = normalize_eeg(data)  # Normalisation
    return data

### EEG Data Loading

In [None]:
class EEGDataset(Dataset):
    def __init__(self, data_dir, fs=256):
        """Dataset PyTorch pour charger et prétraiter les EEG.
        
        Args:
            data_dir (str): Chemin vers le dossier contenant les fichiers CSV.
            fs (int, optional): Fréquence d'échantillonnage des EEG. Default: 256 Hz.
        """
        self.data_dir = data_dir
        self.files = os.listdir(data_dir)
        self.fs = fs  # Fréquence d'échantillonnage
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, idx):
        """Charge un fichier EEG, applique le prétraitement et retourne les données avec le label."""
        file_name = self.files[idx]
        file_path = os.path.join(self.data_dir, file_name)

        # Chargement du fichier CSV
        df = pd.read_csv(file_path)
        eeg_data = df.iloc[:, :-1].values  # Exclure la dernière colonne (timestamp ou label)
        
        # Appliquer le prétraitement
        eeg_data = preprocess_eeg(eeg_data, fs=self.fs)
        
        # Convertir en tenseur PyTorch et ajouter une dimension pour la convolution
        eeg_data = torch.tensor(eeg_data, dtype=torch.float32).unsqueeze(0)  # [1, channels, time]

        # Extraire le label à partir du nom du fichier
        label_name, _ = file_name.split("_")
        label_map = {"stop": 0, "avant": 1, "arrière": 2, "gauche": 3, "droite": 4}
        label = label_map[label_name]
        label_tensor = torch.tensor(label, dtype=torch.long)

        return eeg_data, label_tensor

In [None]:
data_dir = os.path.join(os.getcwd(), "data")  # data directory
dataset = EEGDataset(data_dir)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

### Define EEGNet model and Pytorch

In [None]:
class EEGNet(nn.Module):
    def __init__(self, num_classes=5):
        super(EEGNet, self).__init__()
        
        # Première Convolution : Extraction initiale des caractéristiques EEG
        self.conv1 = nn.Conv2d(1, 16, (1, 64), padding=(0, 32))  
        self.batchnorm1 = nn.BatchNorm2d(16)  # Normalisation des features
        
        # Convolution Spatiale : Applique un filtre sur plusieurs électrodes
        self.conv2 = nn.Conv2d(16, 32, (2, 1))  
        
        # Depthwise Convolution : Analyse indépendante des canaux EEG
        self.conv3 = nn.Conv2d(32, 32, (1, 16), groups=32, padding=(0, 8))  
        
        # Pointwise Convolution : Mélange des informations entre les canaux EEG
        self.conv4 = nn.Conv2d(32, 32, (1, 1))  
        self.batchnorm2 = nn.BatchNorm2d(32)  # Deuxième normalisation
        
        # Couche Fully Connected : Transformation en représentation de haut niveau
        self.fc1 = nn.Linear(32 * 2 * 50, 64)
        self.dropout = nn.Dropout(0.5)  # Régularisation pour éviter le sur-apprentissage
        
        # Couche de sortie : Prédiction d'une des 5 classes (stop, avant, arrière, gauche, droite)
        self.fc_output = nn.Linear(64, num_classes)
    
    def forward(self, x):
        x = F.elu(self.batchnorm1(self.conv1(x)))  # Activation après normalisation
        x = F.elu(self.conv2(x))  
        x = F.elu(self.batchnorm2(self.conv3(x)))  
        x = F.elu(self.conv4(x))  
        
        x = x.view(x.size(0), -1)  # Flatten pour la couche fully connected
        x = F.elu(self.fc1(x))
        x = self.dropout(x)  # Appliquer Dropout pour réduire l'overfitting
        
        output = self.fc_output(x)  # Sortie finale du modèle
        return output

### Model Training

In [None]:
# Sélection de l'appareil (GPU si disponible, sinon CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialisation du modèle et transfert sur l'appareil choisi
model = EEGNet().to(device)

# Définition de l'optimiseur (Adam) et de la fonction de perte (CrossEntropy)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Boucle d'entraînement
num_epochs = 30
for epoch in range(num_epochs):
    total_loss = 0  # Initialisation de la perte totale pour l'époque
    
    for eeg, label in dataloader:
        # Transférer les données sur l'appareil utilisé (GPU/CPU)
        eeg, label = eeg.to(device), label.to(device)
        
        # Réinitialisation des gradients
        optimizer.zero_grad()
        
        # Passage avant (forward) : prédiction du modèle
        output = model(eeg)
        
        # Calcul de la perte entre la prédiction et la vérité terrain
        loss = criterion(output, label)
        
        # Rétropropagation (backpropagation) pour calculer les gradients
        loss.backward()
        
        # Mise à jour des poids du modèle
        optimizer.step()
        
        # Accumulation de la perte pour l'époque
        total_loss += loss.item()
    
    # Affichage de la perte moyenne par époque
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss:.4f}")

### Model Save

In [None]:
torch.save(model.state_dict(), "eegnet_model.pth")
print("✅ Modèle Entraîné et Sauvegardé !")

### Test and analysis

In [None]:
def evaluate_model(model, dataloader):
    model.eval()
    all_true, all_pred = [], []
    
    with torch.no_grad():
        for eeg, label in dataloader:
            eeg, label = eeg.to(device), label.to(device)
            output = model(eeg)
            pred = output.argmax(dim=1)
            all_true.extend(label.cpu().numpy())
            all_pred.extend(pred.cpu().numpy())
    
    return all_true, all_pred

def plot_confusion_matrix(true_labels, pred_labels, classes, title):
    cm = confusion_matrix(true_labels, pred_labels)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title(f"Matrice de Confusion - {title}")
    plt.show()

def show_classification_report(true_labels, pred_labels, classes, title):
    print(f"\n📊 Rapport de Classification - {title}")
    print(classification_report(true_labels, pred_labels, target_names=classes))

def analyze_errors(true_labels, pred_labels, classes, title):
    errors = [(t, p) for t, p in zip(true_labels, pred_labels) if t != p]
    unique_errors, counts = np.unique(errors, axis=0, return_counts=True)
    print(f"\n🔍 Pires Erreurs - {title}")
    for (true, pred), count in zip(unique_errors, counts):
        print(f"→ Vrai: {classes[true]} | Prédit: {classes[pred]} | Fois: {count}")

In [None]:
# Exécuter les analyses séparément
all_true, all_pred = evaluate_model(model, dataloader)
classes = ["stop", "avant", "arrière", "gauche", "droite"]
plot_confusion_matrix(all_true, all_pred, classes, "Mouvements")
show_classification_report(all_true, all_pred, classes, "Mouvements")
analyze_errors(all_true, all_pred, classes, "Mouvements")