In [13]:
import numpy as np

import mne
import asrpy
from mne.preprocessing import ICA

from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.metrics import classification_report
from sklearn.model_selection import StratifiedKFold, cross_val_score, cross_val_predict, LeaveOneOut, LeavePOut
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
# import asrpy
import os

In [14]:
def extract_temporal_features(X):
    """
    Extact mean, std and higher order moments
    """
    features = []
    for epoch in X:
        mean = np.mean(epoch, axis=-1)
        std = np.std(epoch, axis=-1)
        m3 = (np.mean(epoch**3,axis = -1) - mean**3)/(std**2)
        m4 = (np.mean(epoch**4,axis = -1) - mean**4)/(std**3)
        m5 = (np.mean(epoch**5,axis = -1) - mean**5)/(std**4)
        features.append(np.concatenate(np.concatenate([mean,std])))
    print(np.shape(features))
    return np.array(features)


def find_gaming_set_files(directory):
    # Liste pour stocker les chemins des fichiers correspondants
    matching_files = []

    # Parcourir tous les fichiers dans le répertoire donné
    for root, _, files in os.walk(directory):
        for file in files:
            # Vérifier si le fichier commence par "GAMING" et se termine par ".set"
            if file.startswith("GAMING") and file.endswith(".set"):
                # Ajouter le chemin complet du fichier à la liste
                matching_files.append(os.path.join(root, file))
    
    return matching_files

In [15]:

# Chemins des fichiers EEG
directory_path = "C:/recordings/Game_recordings_test/TOUT_4_second_apple"

raw_fnames = find_gaming_set_files(directory_path)


print(raw_fnames)

# Initialiser les listes pour les données

X_all_runs = []
y_all_runs = []
epochs_right = []
epochs_left = []




tmin = -0.5  # début de chaque époque (1,0 secondes avant le déclencheur)
tmax = 1.5  # fin de chaque époque (au déclencheur)
baseline = (-0.5, 0)  # baseline du début à t = 0
reject = dict(eeg=150e-3)  # seuil de rejet pour les électrodes EEG
# L_H_freq = [(8,13)],(13,30)
L_H_freq = [(8,13),(13,30)]
Notchs_freq = (50,60)
f_low = 1
# %%
# Charger les fichiers et créer des époques pour chaque run

for raw_fname in raw_fnames:
    raw = mne.io.read_raw_eeglab(raw_fname,preload=True)
    raw.filter( f_low, None)
    raw.notch_filter(Notchs_freq)
    # Appliquer ASR pour nettoyer les artefacts
    # asr = asrpy.ASR(sfreq=raw.info["sfreq"], cutoff=20)
    # asr.fit(raw)
    # raw = asr.transform(raw)
    ica = ICA()
    ica.fit(raw)
    raw= ica.apply(raw)
    X_run = []
    
    for l_freq, h_freq in L_H_freq:
        raw2 = raw.copy()
        raw2.filter(l_freq, h_freq)
        events , event_id= mne.events_from_annotations(raw2)
        event_id_RL= {'right' : event_id['right'] , 'left' : event_id['left']}

        epochs = mne.Epochs(raw2, events, event_id_RL, tmin, tmax, proj=True, baseline=baseline, reject=reject, preload=True)

        epochs_right.append(epochs['right'])
        epochs_left.append(epochs['left'])
        X_run += [epochs.get_data(copy=True)]
    X_run = np.moveaxis(X_run, 0, 1)
    y_run = epochs.events[:, -1] == event_id['right']


    X_all_runs.append(X_run)
    y_all_runs.append(y_run)







['C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_1_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_2_cheat.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_2_cheat_2.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_2_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_2_noc_2.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_3_1_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_3_2_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_3_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_4_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_5_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_8_3_noc.set', 'C:/recordings/Game_recordings_test/TOUT_4_second_apple\\GAMING_APPLE_8_4_noc.set',

In [16]:

# N_epochs,N_bank, N_channel,N_time_stamp = len(epochs.events) , len(L_H_freq), len(raw.ch_names), raw.n_times
X = np.concatenate(X_all_runs, axis=0)
y = np.concatenate(y_all_runs, axis=0)

X_features = extract_temporal_features( X )

# Séparation des données en ensembles d'entraînement et de test





# Extraction des caractéristiques temporelles

# X_train_features = extract_temporal_features( X_train )

# X_test_features = extract_temporal_features(X_test )



X_train, X_test, y_train, y_test = train_test_split(X_features, y, test_size=0.2, random_state=23)

# Création d'un pipeline avec normalisation, PCA, et SVM

pipeline = make_pipeline(StandardScaler(), SVC(kernel='linear'))
cv = StratifiedKFold(20, shuffle=True)
# cv = LeavePOut(5)

# score = pipeline.fit(X_train, y_train).score(X_test, y_test)

scores = cross_val_score(pipeline, X_features, y, cv = cv)
print(f"Cross validation scores: {scores}")
print(f"Mean: {scores.mean()}")
# y_pred = cross_val_predict(pipeline,X_features,y, cv=cv)
# fig,ax = plt.subplots(1,1)
# ax.plot(scores)

(705, 32)
Cross validation scores: [0.55555556 0.61111111 0.63888889 0.58333333 0.69444444 0.65714286
 0.65714286 0.6        0.51428571 0.54285714 0.6        0.68571429
 0.62857143 0.71428571 0.62857143 0.6        0.68571429 0.71428571
 0.74285714 0.6       ]
Mean: 0.6327380952380952


In [17]:
for i in range(len(X_all_runs)):

    print(np.shape(X_all_runs[i]))


(41, 2, 8, 501)
(41, 2, 8, 501)
(40, 2, 8, 501)
(41, 2, 8, 501)
(41, 2, 8, 501)
(64, 2, 8, 501)
(64, 2, 8, 501)
(59, 2, 8, 501)
(46, 2, 8, 501)
(61, 2, 8, 501)
(67, 2, 8, 501)
(100, 2, 8, 501)
(40, 2, 8, 501)
