In [1]:
import numpy as np 
import torch
import mne
import matplotlib as plt 
import os 
import mne
from torch.utils.data import DataLoader
from torch.autograd import Variable
import torch.autograd as autograd
from torchvision.models import vgg19

from torch import nn
from torch import Tensor
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor
from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce

## Importation & Pre-processing

In [None]:


# Chemin vers le dossier parent
dossier_parent = "C:/Users/MAREZ10/OneDrive - Université Laval/Bureau/Projet Transformers/eeg_data"

# Dictionnaire pour stocker les données MNE par sous-dossier
donnees_par_sous_dossier = {}

# Parcours des sous-dossiers
for nom_sous_dossier in os.listdir(dossier_parent):
    
    chemin_sous_dossier = os.path.join(dossier_parent, nom_sous_dossier)
    
    if os.path.isdir(chemin_sous_dossier):
        
        donnees_par_sous_dossier[nom_sous_dossier] = []
        
        for nom_fichier in os.listdir(chemin_sous_dossier):
            
            if nom_fichier.endswith('.edf'):
                
                chemin_fichier = os.path.join(chemin_sous_dossier, nom_fichier)
                donnees_mne = mne.io.read_raw_edf(chemin_fichier)
                donnees_par_sous_dossier[nom_sous_dossier].append(donnees_mne)

# Affichage des informations sur les données MNE
for nom_sous_dossier, donnees_mne in donnees_par_sous_dossier.items():
    print(f"Sujet {nom_sous_dossier} : {len(donnees_mne)} fichiers .edf")




In [None]:
X = list(donnees_par_sous_dossier.keys())
Y = list(len(elem) for elem in donnees_par_sous_dossier.values())

# Création de l'histogramme
plt.figure(figsize=(10, 6))
plt.bar(X,Y, color='skyblue')
plt.xlabel('Sujets')
plt.ylabel('Nombre d\'enregistrements')
plt.title('Nombre d\'enregistrements par sujet')
plt.xticks(rotation=45, ha='right')  # Rotation des étiquettes sur l'axe des x pour une meilleure lisibilité
plt.tight_layout()  # Ajustement automatique du tracé pour éviter les chevauchements
plt.show()





### Temps d'échantillonnage - Nombre d'enregistrements n'ayant pas une durée de 1 heure par sujet 

In [None]:
# Dictionnaire pour stocker le nombre de fichiers par sous-dossier
nombre_par_sous_dossier = {}

# Parcours des sous-dossiers
for nom_sous_dossier, donnees_mne_liste in donnees_par_sous_dossier.items():
    # Initialisation du compteur pour ce sous-dossier
    cpt = 0
    # Parcours des données MNE dans ce sous-dossier
    for donnees_mne in donnees_mne_liste:
        # Vérification si la durée de la donnée MNE est inférieure à la valeur donnée
        if donnees_mne.times[-1] < 3599.99609375:
            # Incrémentation du compteur
            cpt += 1
    # Stockage du nombre dans le dictionnaire
    nombre_par_sous_dossier[nom_sous_dossier] = cpt

# Affichage du nombre de fichiers qui satisfont la condition pour chaque sous-dossier
for nom_sous_dossier, nombre in nombre_par_sous_dossier.items():
    print(f"Sous-dossier {nom_sous_dossier} : {nombre} fichiers")


### Preprocessing

In [2]:

# Chemin d'accès au fichier
file_path = "C:/Users/MAREZ10/OneDrive - Université Laval/Bureau/Projet Transformers/eeg_data"


In [3]:
def read_summary(file_path):
    # Initialisation des listes pour stocker les informations
    seizure_presence = {}
    
    # Chemin d'accès au fichier
    #file_path = "chb04-summary.txt"
    
    # Ouvrir et lire le fichier
    is_seizure = False
    all_files_str = []
    seizure_start = 0
    with open(file_path, "r") as file:
        # Lire chaque ligne du fichier
        for line in file:
            # Traiter la ligne actuelle
            if line.strip():
                if "File Name:" in line:
                  start = len("File Name: ")
                  all_files_str.append(str(line[start:len(line)-1]))
                elif ("Seizure" in line) and ("Start Time: " in line):
                  if line[len("Seizure S")-1] == "S":
                    start = len("Seizure Start Time: ")
                  else:
                    start = len("Seizure 1 Start Time: ")
                  end = len(" seconds")+1
                  seizure_start = int(line[start:len(line)-end])
                elif ("Seizure"in line) and ("End Time: " in line):
                  if line[len("Seizure E")-1] == "E":
                    start = len("Seizure End Time: ")
                  else:
                    start = len("Seizure 1 End Time: ")
                  end = len(" seconds")+1
                  seizure_end = int(line[start:len(line)-end])
                  if not all_files_str[len(all_files_str)-1] in seizure_presence.keys():
                    seizure_presence[all_files_str[len(all_files_str)-1]] = []
                  seizure_presence[all_files_str[len(all_files_str)-1]].append((seizure_start, seizure_end))
    print("Seizure presence init" ,seizure_presence)
    return seizure_presence, all_files_str


def separate_data_intervals(file_str, seizure_presence,path):
  # Durée de chaque intervalle en secondes (2 minutes)
  interval_duration = 60

  raw = mne.io.read_raw_edf(path + file_str)
  #data, times = raw[:, :]
  total_duration = raw.times[-1] # en secondes

  # Nombre total d'intervalle de 10 minutes
  num_intervals = int(total_duration / interval_duration)
  labels = []
  data = []
  
  # print("Seizure presence: ", seizure_presence) 
  
  # Diviser les données en intervalles de 10 minutes
  for i in range(num_intervals):
      # Calculer le temps de début et de fin de chaque intervalle
      start_time = i * interval_duration
      end_time = (i + 1) * interval_duration
            

      # Convertir le temps en indice
      start_idx = raw.time_as_index(start_time)
      end_idx = raw.time_as_index(end_time)

      # Extraire les données de l'intervalle
      interval_data, interval_times = raw[:, start_idx:end_idx]
      data.append(interval_data)

      
      if file_str in seizure_presence.keys():
        is_seizure = False
        for start_seizure, end_seizure in seizure_presence[file_str]:


          if (start_seizure >= start_time and start_seizure <= end_time) or (end_seizure >= start_time and end_seizure <= end_time) or (start_seizure <= start_time and end_seizure >= end_time):
                
            is_seizure = True
  
            break
        if is_seizure:
          labels.append(1)
        else:
          labels.append(0)
      else:
        labels.append(0)
  return data, labels



In [4]:
def load_data_for_patient(file_path, patient):
      
    path = file_path +"/"+ patient + "/"
    seizure_presence, all_files_str = read_summary(path+patient+"-summary.txt")
    data = []
    labels = []
    for file in all_files_str:
      print("Reading file : ", file)
      interval_data, label = separate_data_intervals(file, seizure_presence,path)
      print("End of file")
      for i in range(len(interval_data)):
        data.append(interval_data[i])
        labels.append(label[i])
    return data, labels

In [None]:
data, labels = load_data_for_patient(file_path, "chb01")

In [6]:
oc={}
for elem in labels:
    oc[elem] = labels.count(elem)

print(oc)

{0: 2377, 1: 15}


In [None]:
for item in data : 
    print(item.shape)

In [7]:
from sklearn.model_selection import train_test_split

data = np.array(data)
labels = np.array(labels)

train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, random_state=42)

In [8]:
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)

(1913, 23, 15360)
(1913,)
(479, 23, 15360)
(479,)


## Training 

In [9]:
import transformers
import convolution
from importlib import reload
reload(transformers)
reload(convolution)

<module 'convolution' from 'c:\\Users\\MAREZ10\\OneDrive - Université Laval\\Documents\\GitHub\\EEG_Seizures_Transformers-\\convolution.py'>

In [19]:

class Conformer(nn.Sequential):
    def __init__(self, emb_size=40, nb_channels =23, depth=6, n_classes=2, **kwargs):
        super().__init__(

            convolution.PatchEmbedding(emb_size, nb_channels),
            transformers.TransformerEncoder(depth, emb_size),
            transformers.ClassificationHead(emb_size, n_classes)
        )

batch_size = 4
n_epochs = 100

torch.cuda.empty_cache()
model = Conformer().cuda()

Tensor = torch.cuda.FloatTensor
LongTensor = torch.cuda.LongTensor

criterion = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.5, 0.999))


In [18]:
def train(train_data, train_label, test_data, test_label):

        
        train_data = torch.from_numpy(train_data)
        train_label = torch.from_numpy(train_label)
        dataset = torch.utils.data.TensorDataset(train_data, train_label)
        dataloader = torch.utils.data.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)

        test_data = torch.from_numpy(test_data)
        test_label = torch.from_numpy(test_label)
        #test_dataset = torch.utils.data.TensorDataset(test_data, test_label)
        #test_dataloader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)


        test_data = Variable(test_data.type(Tensor))
        test_label = Variable(test_label.type(Tensor))

        bestAcc = 0
        averAcc = 0
        num = 0
        Y_true = 0
        Y_pred = 0



        for e in range(n_epochs):
            model.train()
            for i, (train_data, train_label) in enumerate(dataloader):

                train_data = Variable(train_data.cuda().type(Tensor))
                train_label = Variable(train_label.cuda().type(Tensor))

                tok, outputs = model(train_data)

                loss = criterion(torch.argmax(outputs,dim=1), train_label) 

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()




            # test process
            if (e + 1) % 1 == 0:
                model.eval()
                Tok, Cls = model(test_data)


                loss_test = criterion(Cls, test_label)
                y_pred = torch.max(Cls, 1)[1]
                acc = float((y_pred == test_label).cpu().numpy().astype(int).sum()) / float(test_label.size(0))
                train_pred = torch.max(outputs, 1)[1]
                train_acc = float((train_pred == train_label).cpu().numpy().astype(int).sum()) / float(train_label.size(0))

                print('Epoch:', e,
                      '  Train loss: %.6f' % loss.detach().cpu().numpy(),
                      '  Test loss: %.6f' % loss_test.detach().cpu().numpy(),
                      '  Train accuracy %.6f' % train_acc,
                      '  Test accuracy is %.6f' % acc)

                
                num = num + 1
                averAcc = averAcc + acc
                if acc > bestAcc:
                    bestAcc = acc
                    Y_true = test_label
                    Y_pred = y_pred


        torch.save(model.module.state_dict(), 'model.pth')
        averAcc = averAcc / num
        print('The average accuracy is:', averAcc)
        print('The best accuracy is:', bestAcc)
        

        return bestAcc, averAcc, Y_true, Y_pred

In [20]:
train(train_data, train_labels, test_data, test_labels)

RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
