## Env Setup

In [1]:
!pip install mne



You should consider upgrading via the 'c:\users\rock_\appdata\local\programs\python\python39\python.exe -m pip install --upgrade pip' command.


In [2]:
import os
import matplotlib
matplotlib.use('Qt5Agg')
import matplotlib.pyplot as plt
import numpy as np

import mne
from mne.datasets import eegbci
mne.__version__

'0.23.0'

## Get Data

In [34]:
data_dir = "data"
subjects = [1] # 1 indexed
#runs = [5, 9, 13] # hands versus foots imaginary
runs = [6, 10, 14] # hands versus foots real


raw_fnames = {}
for i, d in enumerate(os.listdir(data_dir)):
    if os.path.isdir(os.path.join(data_dir, d)) and i + 1 in subjects:
        raw_fnames[d] = os.listdir(os.path.join(data_dir, d))

dataset = []
sfreq = None
for d in raw_fnames:
    subject = []
    b = False
    for i, f in enumerate(raw_fnames[d]):
        if f.endswith(".edf") and int(f.split('R')[1].split(".")[0]) in runs:
            subject_data = mne.io.read_raw_edf(os.path.join(data_dir, d, f), preload=True)
            if sfreq == None:
                sfreq = subject_data.info["sfreq"]
            if subject_data.info["sfreq"] == sfreq:
                subject.append(subject_data)
            else:
                b = True
                break
    if b:
        continue
    dataset.append(mne.concatenate_raws(subject))
dataset = mne.concatenate_raws(dataset)

Extracting EDF parameters from C:\Users\Rock_\Desktop\Projects\Total_perspective_vortex\data\S001\S001R06.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Extracting EDF parameters from C:\Users\Rock_\Desktop\Projects\Total_perspective_vortex\data\S001\S001R10.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Extracting EDF parameters from C:\Users\Rock_\Desktop\Projects\Total_perspective_vortex\data\S001\S001R14.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...


In [35]:
print(dataset)
print(dataset.info)
print(dataset.info["ch_names"])
# events
print(dataset.annotations)

<RawEDF | S001R06.edf, 64 x 60000 (375.0 s), ~29.4 MB, data loaded>
<Info | 7 non-empty values
 bads: []
 ch_names: Fc5., Fc3., Fc1., Fcz., Fc2., Fc4., Fc6., C5.., C3.., C1.., ...
 chs: 64 EEG
 custom_ref_applied: False
 highpass: 0.0 Hz
 lowpass: 80.0 Hz
 meas_date: 2009-08-12 16:15:00 UTC
 nchan: 64
 projs: []
 sfreq: 160.0 Hz
>
['Fc5.', 'Fc3.', 'Fc1.', 'Fcz.', 'Fc2.', 'Fc4.', 'Fc6.', 'C5..', 'C3..', 'C1..', 'Cz..', 'C2..', 'C4..', 'C6..', 'Cp5.', 'Cp3.', 'Cp1.', 'Cpz.', 'Cp2.', 'Cp4.', 'Cp6.', 'Fp1.', 'Fpz.', 'Fp2.', 'Af7.', 'Af3.', 'Afz.', 'Af4.', 'Af8.', 'F7..', 'F5..', 'F3..', 'F1..', 'Fz..', 'F2..', 'F4..', 'F6..', 'F8..', 'Ft7.', 'Ft8.', 'T7..', 'T8..', 'T9..', 'T10.', 'Tp7.', 'Tp8.', 'P7..', 'P5..', 'P3..', 'P1..', 'Pz..', 'P2..', 'P4..', 'P6..', 'P8..', 'Po7.', 'Po3.', 'Poz.', 'Po4.', 'Po8.', 'O1..', 'Oz..', 'O2..', 'Iz..']
<Annotations | 94 segments: BAD boundary (2), EDGE boundary (2), T0 (45), ...>


In [36]:
# remove dots from channel's names
dataset = dataset.rename_channels(lambda s: s.strip("."))

In [37]:
# set montage
montage = mne.channels.make_standard_montage("standard_1020")

eegbci.standardize(dataset)  # set channel names
dataset.set_montage(montage)

0,1
Measurement date,"August 12, 2009 16:15:00 GMT"
Experimenter,Unknown
Digitized points,67 points
Good channels,"0 magnetometer, 0 gradiometer,  and 64 EEG channels"
Bad channels,
EOG channels,Not available
ECG channels,Not available
Sampling frequency,160.00 Hz
Highpass,0.00 Hz
Lowpass,80.00 Hz


In [38]:
montage = dataset.get_montage()
p = montage.plot()

Creating RawArray with float64 data, n_channels=64, n_times=1
    Range : 0 ... 0 =      0.000 ...     0.000 secs
Ready.


In [39]:
# plot data
p = mne.viz.plot_raw(dataset, scalings={"eeg": 75e-6})

Channels marked as bad: none


## Filter

In [40]:
# FirWin filter
dataset_tmp = dataset.copy()
# set montage again since its is not copyed
dataset_tmp.set_montage(montage)
dataset_tmp.filter(7, 30, fir_design='firwin', skip_by_annotation='edge')
filtered_dataset = dataset_tmp

Filtering raw data in 3 contiguous segments
Setting up band-pass filter from 7 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 7.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 6.00 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 265 samples (1.656 sec)



In [41]:
# plot data
p = mne.viz.plot_raw(filtered_dataset, scalings={"eeg": 75e-6})

Channels marked as bad: none


## Set Labels

In [42]:
event_id = dict(T1=0, T2=1)
# avoid classification of evoked responses by using epochs that start 1s after cue onset.
tmin, tmax = -1., 4.

events, _ = mne.events_from_annotations(filtered_dataset, event_id=event_id)
picks = mne.pick_types(filtered_dataset.info, meg=False, eeg=True, stim=False, eog=False, exclude='bads')
epochs = mne.Epochs(filtered_dataset, events, event_id, tmin, tmax, proj=True, picks=picks, baseline=None, preload=True)   
labels = epochs.events[:, -1]
epochs_train = epochs.copy().crop(tmin=1., tmax=2.)
labels_train = epochs_train.events[:, -1]
epochs.get_data().shape, labels

Used Annotations descriptions: ['T1', 'T2']
Not setting metadata
Not setting metadata
45 matching events found
No baseline correction applied
0 projection items activated
Loading data for 45 events and 801 original time points ...
0 bad epochs dropped


((45, 64, 801),
 array([1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0,
        1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0,
        1]))

## Pipelines

In [16]:
#from mne.decoding import CSP
from CSP import CSP
from sklearn.pipeline import Pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import ShuffleSplit, StratifiedKFold, cross_val_score
from mne.decoding import UnsupervisedSpatialFilter, Vectorizer, Scaler

https://mne.tools/stable/auto_examples/decoding/decoding_csp_eeg.html#sphx-glr-auto-examples-decoding-decoding-csp-eeg-py

Subject 1, runs 6, 10, 14

In [33]:
# Define a cross-validation generator (reduce variance):
epochs_data = epochs.get_data()
epochs_data_train = epochs_train.get_data()

print(epochs_data.shape, labels.shape)

n_splits = 5  # how many folds to use for cross-validation
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
#cv = ShuffleSplit(10, test_size=0.2, random_state=42)

# Assemble a classifier
lda = LinearDiscriminantAnalysis()
#csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
csp = CSP(4)
sca = Scaler(epochs.info)

# Use scikit-learn Pipeline with cross_val_score function
clf = Pipeline([('SCA', sca), ('CSP', csp), ('LDA', lda)])
clf = clf.fit(epochs_data_train, labels)
scores = cross_val_score(clf, epochs_data_train, labels, cv=cv, n_jobs=1)

# plot CSP patterns estimated on full data for visualization
#csp.fit_transform(epochs_data, labels)
#p = csp.plot_patterns(epochs.info, ch_type='eeg', units='Patterns (AU)', size=1.5)

# Printing the results
class_balance = np.mean(labels == 0)
class_balance = max(class_balance, 1. - class_balance)
print("Classification accuracy: %f / Chance level: %f" % (np.mean(scores), class_balance))

(45, 64, 801) (45,)
Classification accuracy: 0.933333 / Chance level: 0.533333


## Running Classification

In [28]:
w_length = int(sfreq * 0.5)   # running classifier: window length
w_step = int(sfreq * 0.1)  # running classifier: window step size
w_start = np.arange(0, epochs_data.shape[2] - w_length, w_step)

print("window settings= ", w_length, w_step, w_start)

scores_windows = []

print(epochs_data.shape)

for train_idx, test_idx in cv.split(epochs_data_train, labels_train):
    # get batch labels
    y_train, y_test = labels_train[train_idx], labels_train[test_idx]
    # fit the csp and get transformed data for test and train
    X_train = csp.fit_transform(epochs_data_train[train_idx], y_train)
    X_test = csp.transform(epochs_data_train[test_idx])
    
    # fit classifier on transformed data
    lda.fit(X_train, y_train)
    # running classifier: test classifier on sliding window
    score_this_window = []
    for n in w_start:
        print(epochs_data[test_idx][:, :, n:(n + w_length)].shape)
        X_test = csp.transform(epochs_data[test_idx][:, :, n:(n + w_length)])
        score_this_window.append(lda.score(X_test, y_test))
        print(clf.predict(epochs_data[test_idx][:, :, n:(n + w_length)]))
    scores_windows.append(score_this_window)

# Plot scores over time
w_times = (w_start + w_length / 2.) / sfreq + epochs.tmin

fig = plt.figure()
plt.plot(w_times, np.mean(scores_windows, 0), label='Score')
plt.axvline(0, linestyle='--', color='k', label='Onset')
plt.axhline(0.5, linestyle='-', color='k', label='Chance')
plt.xlabel('time (s)')
plt.ylabel('classification accuracy')
plt.title('Classification score over time')
plt.legend(loc='lower right')
fig.show()

window settings=  80 16 [  0  16  32  48  64  80  96 112 128 144 160 176 192 208 224 240 256 272
 288 304 320 336 352 368 384 400 416 432 448 464 480 496 512 528 544 560
 576 592 608 624 640 656 672 688 704 720]
(45, 64, 801)
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64, 80)
[1 1 1 1 1 1 1 1 1]
(9, 64

In [15]:
stop

NameError: name 'stop' is not defined

## Simpler Classifier

https://mne.tools/0.19/auto_tutorials/machine-learning/plot_sensors_decoding.html

In [None]:
from sklearn.decomposition import PCA, FastICA
from sklearn.linear_model import LogisticRegression

In [None]:
# Define a cross-validation generator (reduce variance):
X = epochs.get_data()
X_train = epochs_train.get_data()
y = labels

print(epochs_data.shape)

n_splits = 5  # how many folds to use for cross-validation
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
#cv = ShuffleSplit(10, test_size=0.2, random_state=42)

# Assemble a classifier
sca = Scaler(epochs.info)
pca = UnsupervisedSpatialFilter(PCA(n_components=4), average=False)
vec = Vectorizer()
lr = LogisticRegression(solver='lbfgs')

# Use scikit-learn Pipeline with cross_val_score function
clf = Pipeline([("SCA", sca), ("PCA", pca), ("VEC", vec), ('LR', lr)])
scores = cross_val_score(clf, X_train, y, cv=cv, n_jobs=1)
print(scores)
# Printing the results
class_balance = np.mean(labels == 0)
class_balance = max(class_balance, 1. - class_balance)
print("Classification accuracy: %f / Chance level: %f" % (np.mean(scores), class_balance))

## Sliding classifier

In [None]:
from sklearn.preprocessing import StandardScaler
from mne.decoding import SlidingEstimator, cross_val_multiscore

In [None]:
X = epochs.get_data()
X_train = epochs_train.get_data()
y = labels

# We will train the classifier on all left visual vs auditory trials on MEG

clf = Pipeline([("SCA", StandardScaler()), ("LR", LogisticRegression(solver='lbfgs'))])

time_decod = SlidingEstimator(clf, n_jobs=1, scoring='roc_auc', verbose=True)
scores = cross_val_multiscore(time_decod, X, y, cv=5, n_jobs=1)

# Mean scores across cross-validation splits
scores = np.mean(scores, axis=0)

# Plot
fig, ax = plt.subplots()
ax.plot(epochs.times, scores, label='score')
ax.axhline(.5, color='k', linestyle='--', label='chance')
ax.set_xlabel('Times')
ax.set_ylabel('AUC')  # Area Under the Curve
ax.legend()
ax.axvline(.0, color='k', linestyle='-')
ax.set_title('Sensor space decoding')
fig.show()

Channels marked as bad: none
Channels marked as bad: none
