# Modèle IA réseau de neurones #
Notebook pour tester différents réseaux de neurones pour traiter des signaux.

#### Chargement des données

In [58]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from scipy.signal import butter, lfilter
import csv
import matplotlib.pyplot as plt

# -------------------- 1. Paramètres --------------------
data_folder = 'EEG-EyeBlinks/EEG-IO'  # Dataset EEG
fs = 250.0  # Fréquence d'échantillonnage
window_size = int(fs * 1)  # Fenêtre de 1 seconde = 250 échantillons

# Fonction de filtrage passe-bas
def lowpass(sig, fc, fs, order=4):
    B, A = butter(order, np.array(fc) / (fs / 2), btype='low')
    return lfilter(B, A, sig, axis=0)

# Fonction pour lire les labels (blinks)
def decode_stim(data_path, file_stim):
    blinks = []
    with open(os.path.join(data_path, file_stim)) as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            if row[0]=="corrupt":
                pass
            elif row[0]=="blinks":
                pass
            elif row[1]=='1':
                blinks.append(float(row[0]))
    return np.array(blinks)

# -------------------- 2. Chargement des données EEG --------------------
list_of_files = [f for f in os.listdir(data_folder) if '_data' in f]
file_sig = list_of_files[0]
file_stim = file_sig.replace('_data', '_labels')

print(f"Lecture de : {file_sig} et {file_stim}")

# Charger EEG (time, fp1, fp2)
data_sig = np.loadtxt(os.path.join(data_folder, file_sig), delimiter=";", skiprows=1, usecols=(0, 1, 2))
data_sig[:, 1] = lowpass(data_sig[:, 1], 10, fs)  # Filtrage
data_sig[:, 2] = lowpass(data_sig[:, 2], 10, fs)

# Charger les labels de clignements
blinks = decode_stim(data_folder, file_stim)

# -------------------- 3. Synchronisation des labels --------------------
labels = np.zeros(len(data_sig))
for blink_time in blinks:
    index = np.argmax(data_sig[:, 0] >= blink_time)  # Trouver l'index le plus proche du temps de clignement
    labels[index] = 1  # Marquer le clignement

# -------------------- 4. Création des fenêtres temporelles --------------------
X, Y = [], []
for i in range(len(data_sig) - window_size):
    X.append(data_sig[i : i + window_size, 1:])  # fp1, fp2
    # Vérifier si la fenêtre contient un '1' dans la colonne des labels
    if 1 in labels[i+1 : i + window_size-1]:
        Y.append(1)
    else:
        Y.append(0)

X, Y = np.array(X), np.array(Y)


# -------------------- 5. Création du Dataset PyTorch --------------------
class EEGDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.Y = torch.tensor(Y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

# Création du DataLoader
dataset = EEGDataset(X, Y)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

print(f"Dataset : {len(dataset)} séquences chargées (taille {X.shape[1]} échantillons par séquence).")


Lecture de : S00_data.csv et S00_labels.csv
Dataset : 25318 séquences chargées (taille 250 échantillons par séquence).


In [59]:
for X_batch, Y_batch in dataloader:
    # Y_batch contient les labels pour chaque batch
    print(Y_batch)  # 

tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 1., 1., 0., 0., 0., 1., 1., 1., 0.,
        0., 0., 0., 1., 0., 1., 0., 0., 1., 1., 0., 0., 1., 0.])
tensor([0., 1., 0., 1., 0., 1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 1., 0., 0., 1., 0., 1., 0., 0., 1., 0., 1., 1.])
tensor([0., 0., 1., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
        1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 1.])
tensor([1., 0., 0., 0., 0., 1., 1., 0., 0., 0., 0., 0., 1., 0., 1., 0., 0., 0.,
        0., 1., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.])
tensor([0., 0., 1., 0., 0., 0., 0., 0., 1., 1., 1., 0., 0., 0., 1., 0., 0., 0.,
        1., 0., 1., 0., 0., 0., 0., 1., 0., 1., 0., 0., 0., 1.])
tensor([0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 1., 0., 0., 0., 0., 0.,
        0., 1., 1., 0., 0., 1., 1., 0., 0., 0., 1., 1., 0., 0.])
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
        1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

# Définir l'architecture du modèle CNN+LSTM avec les ajustements
class CNN_LSTM_Model(nn.Module):
    def __init__(self, input_size, cnn_filters, lstm_units, num_classes, dropout_prob=0.5):
        super(CNN_LSTM_Model, self).__init__()
        
        # Partie CNN
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=cnn_filters, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm1d(cnn_filters)
        self.conv2 = nn.Conv1d(in_channels=cnn_filters, out_channels=cnn_filters * 2, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(cnn_filters * 2)
        self.pool = nn.MaxPool1d(2)
        
        # Partie LSTM
        self.lstm = nn.LSTM(input_size=cnn_filters * 2, hidden_size=lstm_units, batch_first=True)
        
        # Partie Fully Connected
        self.fc1 = nn.Linear(lstm_units, 128)
        self.dropout = nn.Dropout(p=dropout_prob)
        self.fc2 = nn.Linear(128, num_classes)
        
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        # Passer par les couches CNN
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        # Appliquer le pooling
        x = self.pool(x)
        
        # Passer par LSTM
        x, (h_n, c_n) = self.lstm(x.transpose(1, 2))  # LSTM attend un input (batch, seq_len, input_size)
        
        # Prendre la dernière sortie du LSTM
        x = h_n[-1]
        
        # Passer par les couches fully connected
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)  # Appliquer le dropout après la couche FC
        x = self.fc2(x)
        x = self.softmax(x)
        
        return x


# Hyperparamètres
input_size = 2  # Nombre de canaux (fp1, fp2)
cnn_filters = 32  # Nombre de filtres CNN
lstm_units = 64  # Unités LSTM
num_classes = 2  # 2 classes : clignement ou non
learning_rate = 0.001
batch_size = 32
num_epochs = 20
dropout_prob = 0.5

# Initialiser le modèle
model = CNN_LSTM_Model(input_size, cnn_filters, lstm_units, num_classes, dropout_prob)

# Définir la fonction de perte et l'optimiseur
criterion = nn.CrossEntropyLoss()  # Pour la classification binaire
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Scheduler pour réduire le taux d'apprentissage
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

# ------------------- 2. Entraînement du modèle -------------------
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs):
    model.train()  # Mettre le modèle en mode entraînement
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in train_loader:
            # Redimensionner les entrées pour le CNN : (batch_size, input_size, sequence_length)
            inputs = inputs.transpose(1, 2)  # (batch_size, input_size, sequence_length)
            
            # Mettre les labels en long pour CrossEntropyLoss
            labels = labels.long()
            
            # Zero gradients des optimizers
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            
            # Calculer la perte
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            # Backward pass et optimisation
            loss.backward()
            optimizer.step()
            
            # Calculer les statistiques de performance
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct / total
        
        # Validation après chaque époque
        val_acc = evaluate_model(model, val_loader)
        
        # Ajuster le taux d'apprentissage
        scheduler.step(epoch_loss)
        
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}, Validation Accuracy: {val_acc:.4f}")

# ------------------- 3. Évaluation du modèle -------------------
def evaluate_model(model, val_loader):
    model.eval()  # Mettre le modèle en mode évaluation
    correct = 0
    total = 0
    y_true = []
    y_pred = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.transpose(1, 2)
            labels = labels.long()
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())
    
    val_acc = correct / total
    # Calculer le F1-score
    f1 = f1_score(y_true, y_pred, average='weighted')
    print(f'Validation F1-score: {f1:.4f}')
    
    return val_acc

# ------------------- 4. Séparation des données et création des DataLoaders -------------------
# Utilisez un DataLoader pour l'entraînement et la validation (séparation de vos données)
# Exemple de séparation des données (adapté selon vos données)
train_data, val_data = train_test_split(dataset, test_size=0.2)  # Utilisez la méthode qui convient
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

# ------------------- 5. Entraînement -------------------
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs)
