---
# EEG Stroke - BCI MI

## Processing pipeline of the EEG data
---

### Introduction & methodology

This notebook depicts the exploration of a pipeline for processing EEG data from elbow movement (extension/flexion) and perform a classification task. It is part of a series of notebooks developed with the objective to find a standard pipeline for EEG elbow movement classification.

This notebook specifically tackles the implementation of a spatial filtering based method, using CSP or xDAWN spatial filters. The goal is to establish the performance of the implemented pipeline. To that end, we present the steps that we will follow:

1. Preprocessing:
    * Load the data from each `.npy` file, using the `DataLoader` class.
    * Only use the recorded data of the arm opposite to the stroke side.
    * Only use the electrodes from the stroke side.
    * Label the raw data using the acceleration records (check `DataLoader` implementation).
    * Filter the data between 4Hz and 40Hz.
    * Epoch data around the movement onset (1=extension, 2=flexion, 0=remaining). *Note: data labelled 0 is taken using epoching from -4s to -1s.*
    * Split data into 'train'/'test' sets. Get balanced sets.
    * Label movement (1 and 2) as 1 and no onset as 0.
2. Feature extraction:
    * CSP (spatial filtering)
    * xDAWN (spatial filtering) + covariance
    * xDAWN (spatial filtering) + band power
3. Classification: (comparison of several classifiers)
    * Logistic Regression
    * LDA
    * SVM
    * Random Forest
4. Evaluation: Test A, B, C

NB: 
Additional pipeline has been implemented and tested. As suggested by the good separation data with CSP (see part on CSP visualization), we imagined a Voting Classifier with diverse elementary pipeline [CSP + classif] trained on one session each. The test is depicted in the last section (Pipeline n°4). The results are not great, mainly due to the fact that despite good separation in the patterns space after the CSP, the distribution of the data is not centered, nor rescaled... 

In [None]:
from preprocessing import DataLoader

import mne
import matplotlib.pyplot as plt
import numpy as np
import os
import tqdm

In [None]:
# Filtering bounds
FILTER_FMIN = 4
FILTER_FMAX = 40

# Epoching bounds
EPOCHS_TMIN = -1.5
EPOCHS_TMAX = +1.5
EPOCHS_TMIN_NO_ONSET = -5.0
EPOCHS_TMAX_NO_ONSET = EPOCHS_TMIN_NO_ONSET + EPOCHS_TMAX - EPOCHS_TMIN

# Set seed
RANDOM_STATE = 42

BINARY_CLASSIFICATION = True

In [None]:
NB_SESSIONS = 10

FOLDER_PATH = '../../data/raw/'
FILES_PATHS = [FOLDER_PATH + file_path for file_path in os.listdir(FOLDER_PATH) if file_path.endswith('.npy')]

# Show paths
FILES_PATHS

### Preprocessing

In [None]:
def get_channels(raws, side):
    endings = ('1', '3', '5', '7', '9') if side=='D' else ('2', '4', '6', '8', '10')
    channels_to_remove = [channel for channel in raws.ch_names if channel.endswith(endings)]
    channels = [channel for channel in raws.ch_names if channel not in channels_to_remove]
    return channels

In [None]:
def preproc(file_list, verbose=False, return_epochs=False):
    patients_data   = []
    patients_labels = []
    patients_id = []
    sessions_id = []
    patients_epochs = []

    for fid in tqdm.tqdm(file_list):

        # Load the data
        data_loader = DataLoader(fid)

        # Pick the arm session opposite to the stroke side
        stroke_side = data_loader.stroke_side
        side = 'G' if stroke_side == 'D' else 'D'
        raws = data_loader.get_raws(side)

        # if no data for the arm side, skip
        if raws is None:
            continue

        # Pick the channels of the stroke side
        raws.pick_channels(get_channels(raws, stroke_side))

        # Filter between 4Hz - 48Hz
        raws.filter(FILTER_FMIN, FILTER_FMAX, fir_design='firwin')

        # Epochs over flexion and extension
        events = mne.find_events(raws, stim_channel=['movement'])
        picks  = mne.pick_types(raws.info, eeg=True, stim=False)
        epochs = mne.Epochs(raws, events, tmin=EPOCHS_TMIN, tmax=EPOCHS_TMAX, picks=picks, baseline=None, preload=True)

        epochs_X = epochs.get_data()  # data
        epochs_y = events[:, -1]      # labels

        # Epochs of no movement artificially created
        epochs_no_onset = mne.Epochs(raws, events, tmin=EPOCHS_TMIN_NO_ONSET, tmax=EPOCHS_TMAX_NO_ONSET, picks=picks, baseline=None, preload=True)

        epochs_no_onset_X = epochs_no_onset.get_data()           # data
        epochs_no_onset_y = np.zeros(epochs_no_onset_X.shape[0]) # labels

        # Concatenate epochs
        epochs_X_session = np.concatenate([epochs_X, epochs_no_onset_X], axis=0)
        epochs_y_session = np.concatenate([epochs_y, epochs_no_onset_y], axis=0)

        # Create epochs object
        events_no_onset = events.copy()
        events_no_onset[:, 2] = 0
        events_no_onset[:, 0] = events_no_onset[:,0] - int((EPOCHS_TMAX-EPOCHS_TMIN) * raws.info['sfreq'])

        events_all = np.concatenate([events, events_no_onset], axis=0)
        epochs_all = mne.EpochsArray(
            epochs_X_session,
            epochs.info,
            events=events_all,
            tmin=epochs.tmin,
            )

        # Shuffle epochs
        rng = np.random.RandomState(RANDOM_STATE)
        idx = np.arange(epochs_X_session.shape[0])
        rng.shuffle(idx)
        epochs_X_session = epochs_X_session[idx]
        epochs_y_session = epochs_y_session[idx]

        # Merge flexion and extension if needed
        if BINARY_CLASSIFICATION:
            epochs_y_session[epochs_y_session > 0] = 1

        # Append to the list
        if data_loader.patient_id not in patients_id:
            patients_id.append(data_loader.patient_id)
            sessions_id.append([])
            patients_data.append([])
            patients_labels.append([])
            patients_epochs.append([])
        sessions_id[patients_id.index(data_loader.patient_id)].append(data_loader.session_id)
        patients_data[patients_id.index(data_loader.patient_id)].append(epochs_X_session)
        patients_labels[patients_id.index(data_loader.patient_id)].append(epochs_y_session)
        patients_epochs[patients_id.index(data_loader.patient_id)].append(epochs_all)

        if verbose:
            print(f'patient id: {data_loader.patient_id}')
            print(f'session id: {data_loader.session_id}')
            print(f'number of epochs: {epochs_X_session.shape[0]}')
            print(f'number of channels: {epochs_X_session.shape[1]}')
            print(f'number of time samples: {epochs_X_session.shape[2]}')

    if return_epochs:
        return patients_data, patients_labels, patients_id, sessions_id, patients_epochs
    else: 
        return patients_data, patients_labels, patients_id, sessions_id

In [None]:
%%time
X_patients, y_patients, patients_id, sessions_id, epochs = preproc(
    FILES_PATHS[:NB_SESSIONS], verbose=False, return_epochs=True)

### Visualization CSP patterns

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from mne.preprocessing import Xdawn

n_components = 5
n_patient_train = 0
n_session_train = 1

csp = mne.decoding.CSP(n_components=n_components)
sc  = StandardScaler()
pip = Pipeline([('csp', csp), ('sc2', sc)])

pip.fit(X_patients[n_patient_train][n_session_train],
        y_patients[n_patient_train][n_session_train]); 
# On fit avec le 1er patient et la première session

In [None]:
def plot_csp(csp_data, labels):
    # Définir la taille de chaque vignette
    figsize_per_subplot = 3  # Choisissez la taille appropriée en pouces

    # Calculer la taille totale de la figure en fonction du nombre de sous-plots
    figsize = (n_components * figsize_per_subplot, n_components * figsize_per_subplot)

    # Créer une figure avec une grille spécifique
    fig, axs = plt.subplots(n_components-1, 
                            n_components-1, 
                            figsize=figsize, gridspec_kw={'wspace': 0.4, 'hspace': 0.4})

    for i in range(n_components):
        for j in range(i+1, n_components):
            # Sélectionner la première composante CSP
            csp_component_y = csp_data[:, i]

            # Sélectionner la dernière composante CSP
            csp_component_x = csp_data[:, j]

            # Faire un graphe avec la première composante en ordonnées et la dernière en abscisses
            ax = axs[i, j-1]  # Utilisation de la grille spécifique
            ax.scatter(csp_component_y, csp_component_x, c=labels, cmap='viridis')
            ax.set_xlim(-2, 2)
            ax.set_ylim(-2, 2)
            ax.set_xlabel(f'Composante CSP n°{j}')
            ax.set_ylabel(f'Composante CSP n°{i}')

    # Ajuster automatiquement la disposition des sous-plots pour éviter le chevauchement
    # plt.tight_layout()

    plt.show()

In [None]:
# # Obtenir les données transformées par la CSP
n_patient = 0
n_session = 0
csp_data = pip.transform(X_patients[n_patient][n_session])
labels = y_patients[n_patient][n_session]
plot_csp(csp_data, labels)

# n_patient = 3
# n_session = 0
# csp_data = csp.transform(X_patients[n_patient][n_session])
# labels = y_patients[n_patient][n_session]
# plot_csp(csp_data, labels)

# n_patient = 4
# n_session = 0
# csp_data = csp.transform(X_patients[n_patient][n_session])
# labels = y_patients[n_patient][n_session]
# plot_csp(csp_data, labels)

# couple_patient_session = [(0, 0), (1, 0), (2, 0), (3, 0), (4, 0)]

# for n_patient, n_session in couple_patient_session:
#     csp_data = csp.transform(X_patients[n_patient][n_session])
#     labels = y_patients[n_patient][n_session]
#     plot_csp(csp_data, labels)


### Visualization xDAWN 

In [None]:
from mne.viz import plot_epochs_image

motor_electrods_d = [
    'FC2', 'FC4', 'FC6', 
     'C2',  'C4',  'C6',
    'CP2', 'CP4', 'CP6',]
motor_electrods_g = [
    'FC1', 'FC3', 'FC5',
     'C1',  'C3',  'C5',
    'CP1', 'CP3', 'CP5']

id_patient = 4
id_session = 0

motor_electrods = motor_electrods_d + motor_electrods_g
picked_channels = [e for e in motor_electrods if e in epochs[id_patient][id_session].ch_names]

plot_epochs_image(
    epochs=epochs[id_patient][id_session], 
    picks=picked_channels,
    vmin=-20,
    vmax=+20,
    colorbar=True, 
    show=True,
    cmap='interactive',);

In [None]:
xd = mne.preprocessing.Xdawn(n_components=2)
xd.fit(epochs[id_patient][id_session])

epochs_denoised = xd.apply(epochs[id_patient][:])

plot_epochs_image(
    epochs=epochs_denoised['0'], 
    picks=picked_channels,
    vmin=-2,
    vmax=+2,
    colorbar=True, 
    show=True,
    cmap='interactive',);

### Pipeline evaluation strategies

We will define 3 test strategies to verify the performance of our pipeline.

1. **[A]** test on the same patients and same sessions as view in the training. Of course, the epochs will not be the same, but the classifier will have already seen data from each patient and session considered.

2. **[B]** test on the same patients, but using different sessions. This is to check is the classifier is able to perform well on already seen patients but with data that can be quite different (improvement between sessions).

3. **[C]** test on completely new patients

In [None]:
from sklearn.metrics import (
    accuracy_score, 
    balanced_accuracy_score,)
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold

#### Splits

In [None]:
NB_SPLITS = 4

def split_folds_test_A(patients_data):
    """
    Split train/test on same patients, same sessions, just shuffling the epochs.

    split_folds     = [split_patient_1, split_patient_2, ...]
    split_patient_i = [split_session_1, split_session_2, ...]
    split_session_i = [train_idx, test_idx]
    """
    split_folds = []
    index = 0
    for patient in range(len(patients_data)):
        split_folds.append([])
        for session in range(len(patients_data[patient])):
            rs = RANDOM_STATE + index
            cv = KFold(n_splits=NB_SPLITS, shuffle=True, random_state=rs)
            split_folds[patient].append(cv.split(patients_data[patient][session]))
            index += 1 
    return split_folds

def split_folds_test_B(patients_data):
    """
    Split train/test on same patients, different sessions.

    split_folds     = [split_patient_1, split_patient_2, ...]
    split_patient_i = [all_sessions_1, all_sessions_2, ...]
    all_sessions_i  = [train_idx, test_idx]
    """
    split_folds = []
    index = 0
    for patient in range(len(patients_data)):
        split_folds.append([])
        for i_split in range(NB_SPLITS):
            # If only 1 session for the patient, go train
            if len(patients_data[patient]) < 2:
                test_idx  = []
                train_idx = [0]
            else:
                shuffle_i = np.arange(len(patients_data[patient]))
                rng = np.random.RandomState(RANDOM_STATE + index)
                rng.shuffle(shuffle_i)
                test_idx  = list(shuffle_i[-1:])
                train_idx = list(shuffle_i[:-1])
            split_folds[-1].append([train_idx, test_idx])
            index += 1
    return split_folds

def split_folds_test_C(patients_data):
    """
    Split train/test on different patients, different sessions.

    split_folds     = [split_fold_1, split_fold_2, ...]
    split_fold_i    = [train_idx, test_idx]
    """
    cv = KFold(n_splits=NB_SPLITS, shuffle=True, random_state=RANDOM_STATE)
    return cv.split(patients_data)

In [None]:
list(split_folds_test_A(X_patients)[0][0])

In [None]:
len(list(split_folds_test_B(X_patients)[0]))

In [None]:
list(split_folds_test_C(X_patients))

#### Evaluations

In [None]:
def evaluations_test_A(clf, data_X, data_y, verbose=False):
    split_folds = split_folds_test_A(data_X)
    scores = []

    for i in range(NB_SPLITS):
        X_train = []
        y_train = []
        X_test  = []
        y_test  = []
        for patient in range(len(data_X)):
            for session in range(len(data_X[patient])):
                train_idx, test_idx = next(split_folds[patient][session])
                X_train.append(data_X[patient][session][train_idx])
                y_train.append(data_y[patient][session][train_idx])
                X_test.append(data_X[patient][session][test_idx])
                y_test.append(data_y[patient][session][test_idx])
        X_train = np.concatenate(X_train, axis=0)
        y_train = np.concatenate(y_train, axis=0)
        X_test  = np.concatenate(X_test, axis=0)
        y_test  = np.concatenate(y_test, axis=0)

        print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
        clf.fit(X_train, y_train)

        y_pred = clf.predict(X_test)
        scores.append(balanced_accuracy_score(y_test, y_pred))

        if verbose:
            print(f'Split {i} - balanced accuracy: {scores[-1]}')
    if verbose:
        print(f'Mean balanced accuracy: {np.mean(scores)}')

    return scores

In [None]:
def evaluations_test_B(clf, data_X, data_y, verbose=False):
    split_folds = split_folds_test_B(data_X)
    scores = []

    for i in range(NB_SPLITS):
        X_train = []
        y_train = []
        X_test  = []
        y_test  = []
        for patient in range(len(data_X)):
            for session in split_folds[patient][i][0]:
                X_train.append(data_X[patient][session])
                y_train.append(data_y[patient][session])
            for session in split_folds[patient][i][1]:
                X_test.append(data_X[patient][session])
                y_test.append(data_y[patient][session])
        X_train = np.concatenate(X_train, axis=0)
        y_train = np.concatenate(y_train, axis=0)
        X_test  = np.concatenate(X_test, axis=0)
        y_test  = np.concatenate(y_test, axis=0)

        clf.fit(X_train, y_train)

        y_pred = clf.predict(X_test)
        scores.append(balanced_accuracy_score(y_test, y_pred))

        if verbose:
            print(f'Split {i} - balanced accuracy: {scores[-1]}')
    if verbose:
        print(f'Mean balanced accuracy: {np.mean(scores)}')

    return scores   

In [None]:
def evaluations_test_C(clf, data_X, data_y, verbose=False):
    split_folds = split_folds_test_C(data_X)
    scores = []
    index = 0

    for train_patient_i, test_patient_i in split_folds:
        train_X = [data_X[i][j] for i in train_patient_i for j in range(len(data_X[i]))]
        train_y = [data_y[i][j] for i in train_patient_i for j in range(len(data_y[i]))]
        test_X  = [data_X[i][j] for i in test_patient_i for j in range(len(data_X[i]))]
        test_y  = [data_y[i][j] for i in test_patient_i for j in range(len(data_y[i]))]

        train_X = np.concatenate(train_X, axis=0)
        train_y = np.concatenate(train_y, axis=0)
        test_X  = np.concatenate(test_X, axis=0)
        test_y  = np.concatenate(test_y, axis=0)

        # Shuffle
        train_idx = np.arange(train_X.shape[0])
        rng = np.random.RandomState(RANDOM_STATE + index)
        rng.shuffle(train_idx)
        train_X = train_X[train_idx]
        train_y = train_y[train_idx]
        
        test_idx = np.arange(test_X.shape[0])
        rng = np.random.RandomState(RANDOM_STATE + index)
        rng.shuffle(test_idx)
        test_X = test_X[test_idx]
        test_y = test_y[test_idx]
        index += 1

        clf.fit(train_X, train_y)

        y_pred = clf.predict(test_X)
        scores.append(balanced_accuracy_score(test_y, y_pred))

        if verbose:
            print(f'Split {index} - balanced accuracy: {scores[-1]}')
    if verbose:
        print(f'Mean balanced accuracy: {np.mean(scores)}')

    return scores

### Pipeline testing

#### Test pipeline: [CSP+classif] 

Train on one session, test on the other session of the same patient

In [None]:
motor_electrods_d = [
    'FC2', 'FC4', 'FC6', 
     'C2',  'C4',  'C6',
    'CP2', 'CP4', 'CP6',]
motor_electrods_g = [
    'FC1', 'FC3', 'FC5',
     'C1',  'C3',  'C5',
    'CP1', 'CP3', 'CP5']

id_patient = 5
id_session = 0

motor_electrods = motor_electrods_d + motor_electrods_g
picked_channels = [e for e in motor_electrods if e in epochs[id_patient][id_session].ch_names]


epochs[id_patient][1].plot_image(picks=picked_channels, vmin=-20, vmax=20);
epochs[id_patient][0].plot_image(picks=picked_channels, vmin=-20, vmax=20);

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from mne.decoding import CSP
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score

# i_patient = 1
# i_session = 0

for i_patient in range(len(X_patients)):
    print(f'Patient {i_patient}')
    if len(X_patients[i_patient]) < 2:
        print(f"  Abort, not enough sessions")
        continue
    for i_session in range(len(X_patients[i_patient])):
        X_patient = X_patients[i_patient][i_session]
        y_patient = y_patients[i_patient][i_session]
        X_test = X_patients[i_patient][(1+i_session)%2]
        y_test = y_patients[i_patient][(1+i_session)%2]

        csp = CSP(n_components=4)
        sc  = StandardScaler()
        svc = SVC(kernel='linear', C=.001)
        # rf  = RandomForestClassifier()
        pip = Pipeline([('CSP', csp), ('sc', sc), ('SVC', svc)])

        pip.fit(X_patient, y_patient)
        score = pip.score(X_test, y_test)

        print(f'  Session {i_session} on train - score: {score} on session {(1+i_session)%2}')



#### **Pipeline 1**: CSP + classifier

##### 1. Test

In [None]:
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from mne.decoding import CSP
from sklearn.ensemble import (
    BaggingClassifier
)
from sklearn.preprocessing import StandardScaler

# Define pipeline
csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
clf = SVC(C=1, kernel='linear', class_weight='balanced', random_state=RANDOM_STATE)

# Define pipeline
pip = Pipeline([('CSP', csp), ('SVC', clf)])

# Ensemble classifier
ens = BaggingClassifier(base_estimator=pip, n_estimators=10, random_state=RANDOM_STATE)

In [None]:
# Test A scores
scores_A = evaluations_test_A(pip, X_patients, y_patients, verbose=True)

In [None]:
# Test B scores
scores_B = evaluations_test_B(clf, X_patients, y_patients, verbose=True)

In [None]:
# Test C scores
scores_C = evaluations_test_C(clf, X_patients, y_patients, verbose=True)

##### 2. Test on multiple classifiers

In [None]:
from sklearn.pipeline import make_pipeline, Pipeline

from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

from mne.decoding import CSP

# Define classifier
classifier = [
    ('SVC', SVC(kernel='linear')),
    ('RF', RandomForestClassifier(random_state=RANDOM_STATE)),
    ('LR', LogisticRegression(random_state=RANDOM_STATE)),
    ('LDA', LinearDiscriminantAnalysis())]

In [None]:
%%time
import pandas as pd

accuracies = []
for clf in classifier:
    print(f"Testing classifier: {clf[0]} ...")
    csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
    pip = Pipeline([('CSP', csp), clf])

    # Test A scores
    scores_A = evaluations_test_A(pip, X_patients, y_patients)
    accuracies.append(scores_A)

df = pd.DataFrame({'clf': ['SVC', 'RF', 'LR', 'LDA'], 'acc': np.mean(accuracies, axis=1)})
df

In [None]:
%%time
import pandas as pd

accuracies = []
for clf in classifier:
    print(f"Testing classifier: {clf[0]} ...")
    csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
    pip = Pipeline([('CSP', csp), clf])

    # Test B scores
    scores_B = evaluations_test_B(pip, X_patients, y_patients)
    accuracies.append(scores_B)

df = pd.DataFrame({'clf': ['SVC', 'RF', 'LR', 'LDA'], 'acc': np.mean(accuracies, axis=1)})
df

In [None]:
%%time
import pandas as pd

accuracies = []
for clf in classifier:
    print(f"Testing classifier: {clf[0]} ...")
    csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
    pip = Pipeline([('CSP', csp), clf])

    # Test C scores
    scores_C = evaluations_test_C(pip, X_patients, y_patients)
    accuracies.append(scores_C)

df = pd.DataFrame({'clf': ['SVC', 'RF', 'LR', 'LDA'], 'acc': np.mean(accuracies, axis=1)})
df

#### **Pipeline 2**: xDAWN + Vectorizer + Std_Scaler + Classifier

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class MyVectorizer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        self.features_shape_ = X.shape[1:]
        return self

    def transform(self, X, y=None):
        return X.reshape(len(X), -1)

In [None]:
from pyriemann.estimation import XdawnCovariances
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

xd = XdawnCovariances()
sc = StandardScaler()
vc = MyVectorizer()

classifier = [
    ('SVC', SVC(kernel='linear')),
    ('RF', RandomForestClassifier(random_state=RANDOM_STATE)),
    ('LR', LogisticRegression(random_state=RANDOM_STATE, penalty='l1', solver='liblinear')),
    ('LDA', LinearDiscriminantAnalysis())]

In [None]:
%%time
import pandas as pd

accuracies = []
for clf in classifier:
    print(f"Testing classifier: {clf[0]} ...")
    pip = Pipeline([
        ('XD', xd),
        ('VC', vc),
        ('SC', sc), clf])

    # Test A scores
    scores_A = evaluations_test_A(pip, X_patients, y_patients)
    accuracies.append(scores_A)

df = pd.DataFrame({'clf': ['SVC', 'RF', 'LR', 'LDA'], 'acc': np.mean(accuracies, axis=1)})
df

In [None]:
%%time
import pandas as pd

accuracies = []
for clf in classifier:
    print(f"Testing classifier: {clf[0]} ...")
    pip = Pipeline([
        ('XD', xd),
        ('VC', vc),
        ('SC', sc),
        clf])

    # Test A scores
    scores_B = evaluations_test_B(pip, X_patients, y_patients)
    accuracies.append(scores_B)

df = pd.DataFrame({'clf': ['SVC', 'RF', 'LR', 'LDA'], 'acc': np.mean(accuracies, axis=1)})
df

In [None]:
%%time
import pandas as pd

accuracies = []
for clf in classifier:
    print(f"Testing classifier: {clf[0]} ...")
    pip = Pipeline([
        ('XD', xd),
        ('VC', vc),
        ('SC', sc),
        clf])

    # Test A scores
    scores_C = evaluations_test_C(pip, X_patients, y_patients)
    accuracies.append(scores_C)

df = pd.DataFrame({'clf': ['SVC', 'RF', 'LR', 'LDA'], 'acc': np.mean(accuracies, axis=1)})
df

#### **Pipeline 3**: Riemannian geom + tangent space + scaler

In [None]:
from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold

from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import AdaBoostClassifier

import pandas as pd

In [None]:
cov = Covariances(estimator='oas')
ts  = TangentSpace()
sc  = StandardScaler()
clf = SVC(kernel='linear')

pip = Pipeline([('Cov', cov), ('TS', ts), ('SC', sc), ('CLF', clf)])

scoreA = evaluations_test_A(pip, X_patients, y_patients, verbose=True)
scoreB = evaluations_test_B(pip, X_patients, y_patients, verbose=True)
scoreC = evaluations_test_C(pip, X_patients, y_patients, verbose=True)

In [None]:
cov = Covariances(estimator='oas')
ts  = TangentSpace()
sc  = StandardScaler()
clf = SVC()

pip = Pipeline([('Cov', cov), ('TS', ts), ('SC', sc), ('CLF', clf)])

scoreA = evaluations_test_A(pip, X_patients, y_patients, verbose=True)
scoreB = evaluations_test_B(pip, X_patients, y_patients, verbose=True)
scoreC = evaluations_test_C(pip, X_patients, y_patients, verbose=True)

#### **Pipeline 4**: CSP + Classifier (voting method)

In [None]:
from sklearn.model_selection import KFold
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.metrics import balanced_accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from mne.decoding import CSP

N_COMPONENTS = 5
N_SPLITS = 5


class VotingClassifier():
    def __init__(self) -> None:
        self.clfs = []
        self.y_true = None

    def fit_clf_session(self, X, y):
        """
        Fits the inner classifiers on sessions data. 
        NB: one session is a list of epochs.

        X: list of epochs
        y: list of labels
        """
        csp = CSP(n_components=N_COMPONENTS, reg=None, log=True, norm_trace=False)
        sc = StandardScaler()
        svc = SVC(kernel='linear', C=0.1)
        clf = Pipeline([('CSP', csp), ('SC', sc), ('SVC', svc)])

        clf.fit(X, y)
        return clf
        
    def fit(self, X, y):
        """
        Fits the outer classifier on sessions data.

        X: list of sessions, each session is a list of epochs
        y: list of sessions, each session is a list of labels
        """
        clfs = []
        for session in range(len(X)):
            clfs.append(self.fit_clf_session(X[session], 
                                             y[session]))
        self.clfs = clfs

    def predict(self, X, return_probas=False):
        """
        Predicts labels of data.

        X: list of epochs
        """
        X = [epoch for session in X for epoch in session]
        X = np.array(X)

        preds = np.zeros(X.shape[0])
        for i in range(len(self.clfs)):
            clf = self.clfs[i]
            y_pred = clf.predict(X)
            preds += y_pred


            # Affichage des résultats pour le classifieur n°i
            plt.figure(figsize=(5, 5))
            plt.scatter(np.arange(len(y_pred)), y_pred, c=self.y_true, cmap='viridis')
            plt.title(f'Result for classifier n°{i}')
            plt.show()


        preds = preds / len(self.clfs)

        if return_probas:
            return (preds > 0.5).astype(int), preds
        else:
            return (preds > 0.5).astype(int)
    
    def score(self, X, y, return_probas=False):
        """
        Scores the classifier.
        
        X: list of sessions, each session is a list of epochs
        y: list of sessions, each session is a list of labels
        """
        y_flat = np.array([label for session in y for label in session])
        y_pred = self.predict(X, return_probas=False)
        return balanced_accuracy_score(y_flat, y_pred)


def cv_test_alt(data_X, data_y, verbose=False, display=False):
    """
    data_X: list of sessions, each session is a list of epochs
    data_y: list of sessions, each session is a list of labels
    """
    # Define cross validation fold
    cv = KFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE)
    scores = []

    for train_idx, test_idx in cv.split(data_X):
        # Split data
        X_train = [data_X[i] for i in train_idx]
        y_train = [data_y[i] for i in train_idx]
        X_test  = [data_X[i] for i in test_idx]
        y_test  = [data_y[i] for i in test_idx]

        # Define pipelines + voting classifier
        pipeline = VotingClassifier()

        # Fit
        pipeline.fit(X_train, y_train)

        # Score
        score = pipeline.score(X_test, y_test)
        scores.append(score)

        if display:

            y_true = np.array([label for session in y_test for label in session])
            pipeline.y_true = y_true
            y_pred, probas = pipeline.predict(X_test, return_probas=True)

            plt.figure(figsize=(5, 5))
            plt.scatter(np.arange(len(probas)), probas, c=y_true, cmap='viridis')
            plt.show()

        if verbose:
            print(f'Balanced accuracy: {score}')
    
    if verbose:
        print(f'Mean balanced accuracy: {np.mean(scores)}')
    
    return scores

In [None]:
X_sessions = [session for patient in X_patients for session in patient]
y_sessions = [session for patient in y_patients for session in patient]

cv_test_alt(X_sessions, y_sessions, verbose=True, display=True)