In [1]:
%matplotlib qt

# Preamble on MNE

For this assignment we'll be using MNE (https://mne.tools). MNE is the go-to library for working with EEG or MEG data in Python. 
It has many things already implemented for us, but it also has some quirks that make it a bit difficult. 
The main quirk that is relevant for us: MNE generally modifies data in-place, and returns the modified version. This means that when you call `cleaner_eeg = my_raw_eeg.drop_channels(['T7', 'T8'])` that these channels are now removed in both `cleaner_eeg` as well as in `my_raw_eeg`. 

Please have a look at the documentation provided by MNE here https://mne.tools/stable/api/python_reference.html whenever you get stuck, confused or curious. 

## Part 1: Questions 1 & 2

We offer some code to help you with this assignment. The first cell we include loads the EEG data and manages the metadata from the EEGBCI dataset.

It downloads the EEG recordings of subject 7, and selects the trials where this subject did left and right hand motor imagery/execution. 
All EEG data is concatenated resulting in one object with the raw EEG data called `raw`. 

For question 1. you need to inspect this raw data. 
For question 2. you'll need to apply a band-pass filter to this raw data.


In [2]:
import mne

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning) # Suppresses an annoying warning
mne.set_log_level('warning')

from mne.io import concatenate_raws, read_raw_edf
from mne.datasets import eegbci
from mne.channels import make_standard_montage


subject = 7 # Here we pick an individual subject with good performance. In total there are 109 subjects.
runs = [3, 4, 7, 8, 11, 12] # We're actually selecting both Motor Imagery and Motor Execution trials here!
raw_fnames = eegbci.load_data(subject, runs)
raws = [read_raw_edf(f, preload=True) for f in raw_fnames]
original_raw = concatenate_raws(raws)

eegbci.standardize(original_raw)  # set channel names
montage = make_standard_montage("standard_1005") # Defines position of electrodes on head
original_raw.set_montage(montage)

raw = original_raw.copy()

In [28]:
raw.plot() # This will visualise the EEG data. T0 = Rest, T1 = Left, T2 = Right
# Tip: Look under Help to see the controls

KeyboardInterrupt: 

In [3]:
# This cell extracts epochs containing the EEG data while the subject is performing the task.
# The epochs object can be inspected with epochs.plot()
# X and y are taken here to analyse the EEG data.


from mne import Epochs
from mne import events_from_annotations

event_id = dict(left=0, right=1)
events, _ = events_from_annotations(raw, event_id=dict(T1=0, T2=1))


epochs = Epochs(
    raw,
    events,
    event_id,
    tmin=0,
    tmax=4,
    baseline=None
)


X = epochs.get_data() # X is a numpy array of trials x channels x samples
y = epochs.events[:, -1]
print(y) # 0 = left, 1 = right

[1 0 1 0 0 1 0 1 0 1 0 1 1 0 1 0 1 1 0 1 0 0 1 1 0 0 1 0 1 0 1 0 1 0 1 0 1
 0 1 0 1 0 1 0 0 0 1 0 1 1 0 0 1 1 0 1 0 0 1 0 0 1 0 1 0 1 1 0 1 0 1 0 0 1
 0 0 1 0 1 1 0 1 0 0 1 0 1 0 1 1]


## Part 1: Questions 2+3

The cell below shows a first attempt at a Machine Learning model using CSP and LDA.
Before solving question 2. you may observe the model performance printed here.
After solving implementing the temporal filter you may see a change in performance here.

In question 3 here you may also modify the number of CSP filters, and inspect the learned filters. 

In [4]:
# This cell provides a first attempt at making a CSP-LDA classifier. First CSP is applied to the data, then the results of CSP are passed on to LDA.
# The data is split into a train and test section as cross-validation. 
from mne.decoding import CSP
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.pipeline import make_pipeline
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

csp = CSP(n_components=1)
lda = LDA()

csp.fit(X_train, y_train)

features = csp.transform(X_train)

lda.fit(features, y_train)

y_prediction = lda.predict(csp.transform(X_test))
print("AUROC:", roc_auc_score(y_test, y_prediction))

AUROC: 0.474025974025974


**Filter raw data between 13 and 30 Hz**

In [5]:
### Filter between 8 and 25 Hz ###
raw_filtered = raw.copy()
raw_filtered.filter(8, 25)

Unnamed: 0,General,General.1
,Filename(s),S007R03.edf  S007R04.edf  S007R07.edf  S007R08.edf  S007R11.edf  S007R12.edf
,MNE object type,RawEDF
,Measurement date,2009-08-12 at 16:15:00 UTC
,Participant,X
,Experimenter,Unknown
,Acquisition,Acquisition
,Duration,00:12:30 (HH:MM:SS)
,Sampling frequency,160.00 Hz
,Time points,120000
,Channels,Channels


In [6]:
### Create dataset ###
event_id = dict(left=0, right=1)
events, _ = events_from_annotations(raw_filtered, event_id=dict(T1=0, T2=1))

epochs = Epochs(
    raw_filtered,
    events,
    event_id,
    tmin=0,
    tmax=4,
    baseline=None
)

X = epochs.get_data() # X is a numpy array of trials x channels x samples
y = epochs.events[:, -1]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

### Train model ###
csp = CSP(n_components=1)
lda = LDA()

csp.fit(X_train, y_train)

csp.plot_filters(info=raw.info)

features = csp.transform(X_train)

lda.fit(features, y_train)

y_prediction = lda.predict(csp.transform(X_test))
print("AUROC:", roc_auc_score(y_test, y_prediction))

AUROC: 0.8376623376623378


## Part 1: Question 4
For question 4 you will need to do hyperparameter optimisation. This requires quite a bit more coding than the previous questions.
We offer a condensed `train_test_loop` that you may use as a starting point. 

In [None]:
# This is code we offer for 1.4 
# You don't need to look at this too much before then
from sklearn.model_selection import cross_val_score
from tqdm import tqdm # tqdm gives us these nice progress bars
import numpy as np
import matplotlib.pyplot as plt

def plot_acc_over_param(results, param_range, title, x_label):
    plt.plot(param_range, results)
    plt.title(title)
    plt.xlabel(x_label)
    plt.ylabel("Mean CV Accuracy")
    plt.savefig(title)
    # plt.show()
    plt.close()

def train_test_loop(freq_min = 8, freq_max = 25, num_csp_filters = 2): # add parameters here
    raw = original_raw.copy()

    # Add filtering here
    raw.filter(freq_min, freq_max)
    
    epochs = Epochs(
        raw,
        events,
        event_id,
        tmin=0,
        tmax=4,
        baseline=None
    )
    X = epochs.get_data()
    y = epochs.events[:, -1]

    pipeline = make_pipeline(CSP(n_components=int(num_csp_filters)), LDA())
    
    return cross_val_score(pipeline, X, y, scoring='accuracy', cv=5)

### Perform hyperparameter optimisation here ###

# Min Frequency Filter Tuning
# freq_min_range = np.linspace(5, 15, 11)
# freq_min_avg_acc = np.mean([train_test_loop(freq_min=freq_min_range[i]) for i in tqdm(range(11))], axis=1)
# plot_acc_over_param(freq_min_avg_acc, freq_min_range, "Cross Validation Accuracy for Minimum Frequency Filter", "Minimum Frequency (Hz)")

# # Max Frequency Filter Tuning
# freq_max_range = np.linspace(15, 30, 16)
# freq_max_avg_acc = np.mean([train_test_loop(freq_max=freq_max_range[i]) for i in tqdm(range(16))], axis=1)
# plot_acc_over_param(freq_max_avg_acc, freq_max_range, "Cross Validation Accuracy for Maximum Frequency Filter", "Maximum Frequency (Hz)")

csp_filter_range = np.linspace(1, 10, 10, dtype=int)
csp_filter_avg_acc = np.mean([train_test_loop(num_csp_filters=csp_filter_range[i]) for i in tqdm(range(10))], axis=1)
plot_acc_over_param(csp_filter_avg_acc, csp_filter_range, "Cross Validation Accuracy for the Number of CSP Filters", "Number of Filters")






[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




[A[A[A[A[A




100%|██████████| 10/10 [00:53<00:00,  5.34s/it]


: 

# Part 2

For Part 2 you'll need to write all the code yourself. 
The only thing that we'll give you is a function that determines the distance between 2 covariance matrices.

```
riemannian_distance = covariance_distance(your_matrix_a, your_matrix_b, metric='riemann')
euclidean_distance = covariance_distance(your_matrix_a, your_matrix_b, metric='euclid')
```


With this distance function you will implement a custom K-Nearest Neighbours classifier.

In [None]:
from pyriemann.utils.distance import distance as covariance_distance

