# Imports

In [1]:
import numpy as np
import matplotlib.pyplot as plt

from sklearn.pipeline import Pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import ShuffleSplit, cross_val_score

from mne import Epochs, pick_types, events_from_annotations, set_log_level
from mne.channels import make_standard_montage
from mne.io import concatenate_raws, read_raw_edf
from mne.datasets import eegbci
from mne.decoding import CSP
from tqdm.notebook import tqdm

set_log_level("WARNING")


# Get Data (1 subject)

In [2]:
epochs_list = []
for subject in range(1, 28):
    tmin, tmax = -1., 4.
    event_id = dict(hands=2, feet=3)
    runs = [6, 10, 14]  # motor imagery: hands vs feet

    noisy_channels = ['AF7', 'AF3', 'AFz', 'AF4', 'AF8', 'Fp1', 'Fpz', 'Fp2', 'P7', 'P5', 'P3', 'P1', 'P2', 'P4', 'P6',
                        'P8', 'PO7', 'PO3', 'POz', 'PO4', 'PO8', 'Iz']

    raw_fnames = eegbci.load_data(subject, runs)
    raw = concatenate_raws([read_raw_edf(f, preload=True) for f in raw_fnames])
    eegbci.standardize(raw)  # set channel names
    montage = make_standard_montage('standard_1005')
    raw.set_montage(montage)

    # Apply band-pass filter
    raw.filter(7., 30., fir_design='firwin', skip_by_annotation='edge')

    events, _ = events_from_annotations(raw, event_id=dict(T1=2, T2=3))

    picks = pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False,
                    exclude=noisy_channels)
    # Read epochs (train will be done only between 1 and 2s)
    # Testing will be done with a running classifier
    epochs = Epochs(raw, events, event_id, tmin, tmax, proj=True, picks=picks, baseline=None, preload=True)
    # epochs_train = epochs.copy().crop(tmin=1., tmax=2.)
    # epochs_data = epochs.get_data()
    # # epochs_data_train = epochs_train.get_data()
    # # labels = epochs.events[:, -1] - 2
    # labels = epochs.events[:, -1] - 2
    # print(epochs_data.shape)
    epochs_list.append(epochs)

In [37]:
import numpy as np

epochs_data_list = []
labels_list = []
for epochs in epochs_list:
    epochs_data = epochs.get_data()
    labels = epochs.events[:, -1]
    epochs_data_list.append(epochs_data)
    labels_list.append(labels)

epochs_data_list = np.array(epochs_data_list)
labels_list = np.array(labels_list)

# TODO THIS BREAKS TIMESTAMPLES
epochs_data_stak = np.vstack(epochs_data_list)
print(f"{epochs_data_list.shape = }\n{epochs_data_stak.shape = }")
print()
# TODO verify label to epoch correspondance
labels_stak = np.hstack(labels_list)

print(f"{labels_list.shape = }\n{labels_stak.shape = }")


epochs_data_list.shape = (27, 45, 42, 801)
epochs_data_stak.shape = (1215, 42, 801)

labels_list.shape = (27, 45)
labels_stak.shape = (1215,)


# Define Pipeline & Cross validation

In [44]:

# Define a monte-carlo cross-validation generator (reduce variance):
scores = []
cv = ShuffleSplit(10, test_size=0.2, random_state=42)
cv_split = cv.split(epochs_data)

# Assemble a classifier
lda = LinearDiscriminantAnalysis()
csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)

# Use scikit-learn Pipeline with cross_val_score function
clf = Pipeline([('CSP', csp), ('LDA', lda)])
# scores = cross_val_score(clf, epochs_data, labels, cv=cv, n_jobs=None)
scores = cross_val_score(clf, epochs_data_stak, labels_stak, cv=cv, n_jobs=None)

# Printing the results
class_balance = np.mean(labels == labels[0])
class_balance = max(class_balance, 1. - class_balance)
print("Classification accuracy: %f / Chance level: %f" % (np.mean(scores), class_balance))


Classification accuracy: 0.355556 / Chance level: 0.511111


# Training

In [45]:
lda = LinearDiscriminantAnalysis()
csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
epochs_data = epochs.get_data()
epochs_data.shape

(45, 42, 801)

In [49]:
from tqdm import tqdm

sfreq = raw.info['sfreq']
w_length = int(sfreq * 0.5)   # running classifier: window length
w_step = int(sfreq * 0.1)  # running classifier: window step size
w_start = np.arange(0, epochs_data.shape[2] - w_length, w_step)

scores_windows = []

cv_split = cv.split(epochs_data_stak)
train_indexes, test_indexes = next(cv_split)


for i in tqdm(range(10)):
    # y_train, y_test = labels[train_idx], labels[test_idx]

    X_train = csp.fit_transform(epochs_data_stak[train_indexes], labels_stak[train_indexes])
    X_test = csp.transform(epochs_data_stak[test_indexes])

    # fit classifier
    lda.fit(X_train, labels_stak[train_indexes])

    # running classifier: test classifier on sliding window
    score_this_window = []
    for n in w_start:
        X_test = csp.transform(epochs_data_stak[test_indexes][:, :, n:(n + w_length)])
        score_this_window.append(lda.score(X_test, labels_stak[test_indexes]))
    scores_windows.append(score_this_window)

# Plot scores over time
w_times = (w_start + w_length / 2.) / sfreq + epochs.tmin

plt.figure()
plt.plot(w_times, np.mean(scores_windows, 0), label='Score')
plt.axvline(0, linestyle='--', color='k', label='Onset')
plt.axhline(0.5, linestyle='-', color='k', label='Chance')
plt.xlabel('time (s)')
plt.ylabel('classification accuracy')
plt.title('Classification score over time')
plt.legend(loc='lower right')
plt.show()

  3%|▎         | 3/100 [00:09<05:06,  3.16s/it]