In [1]:
import braindecode

In [2]:
import mne
from scipy.io import loadmat
import scipy
import sklearn
import numpy as np
import pandas as pd
import glob
from mne.decoding import CSP
import os

In [3]:
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC, SVC
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, StratifiedShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as lda

In [4]:
import warnings
warnings.filterwarnings('ignore') # to ignore warnings

In [5]:
verbose = False                    # global variable to suppress output display of MNE functions
mne.set_log_level(verbose=verbose) # to suppress large info outputs

In [6]:
n_jobs = None  # for multicore parallel processing, set it to 1 if cause memory issues, for full utilization set to -1

## Data Loading and Conversion to MNE Datatypes
[Mike Cohen Tutorials link for EEG Preprocessing](https://www.youtube.com/watch?v=uWB5tjhataY&list=PLn0OLiymPak2gDD-VDA90w9_iGDgOOb2o)

In [7]:
current_folder = globals()['_dh'][0]  # a hack to get path of current folder in which jupyter file is located
data_path = os.path.join(current_folder, r"C:\Users\User\Documents\GitHub\Frequency-Adaptive-Temporal-Kernel-EEGNet\Data")

In [8]:
training_files   = glob.glob(data_path + '/*T.mat')
len(training_files)     # if  return zero,then no file is loaded

10

In [9]:
# we have modified the labels values from [1, 2] to [0, 1] as pytorch 
# expects labels/classes to be in [0, n_classes-1] format
def get_mne_epochs(filepath, verbose=verbose, t_start=2, fs=512, mode='train'):
    '''
    This function reads the EEG data from .mat file and convert it to MNE-Python Compatible epochs
    data structure. It takes data from [0, 8] sec range and return it by setting t = 0 at cue onset
    i.e. 3 seconds and dropping first two seconds so the output data is in [-1.0, 5.0] sec range. The
    Details can be found in the preprocessing section of the attached document
    '''
    mat_data = loadmat(filepath) # read .mat file
    eeg_data= mat_data['RawEEGData']
    idx_start = fs*t_start      
    eeg_data = eeg_data[:, :, idx_start:]
    event_id = {'left-hand': 0, 'right-hand': 1} # pytorch expects labels in [0, n_classes-1]
    channel_names = ['F3', 'FC3', 'C3', 'CP3', 'P3', 'FCz', 'CPz', 'F4', 'FC4', 'C4', 'CP4', 'P4']
    info = mne.create_info(ch_names=channel_names, sfreq=fs, ch_types='eeg')
    epochs = mne.EpochsArray(eeg_data, info, verbose=verbose, tmin=t_start-3.0)
    epochs.set_montage('standard_1020')
    epochs.filter(1., None) 
    epochs.apply_baseline(baseline=(-.250, 0)) # linear baseline correction
    
    if mode == 'train': # this in only applicable for training data
        epochs.event_id = event_id 
        epochs.events[:,2] = mat_data['Labels'].ravel() - 1    
    return epochs 

def get_labels(filepath):
    mat_data = loadmat(filepath) # read .mat file
    return mat_data['Labels'].ravel() - 1

In [10]:
epochs, labels = get_mne_epochs(training_files[0], verbose=verbose), get_labels(training_files[0])
data = epochs.get_data()
print('Shape of EEG Data: ', data.shape, '\t Shape of Labels: ', labels.shape) 

Shape of EEG Data:  (80, 12, 3072) 	 Shape of Labels:  (80,)


### Training Data

In [11]:
# loading original data
epochs_list_train = []
for i in training_files:
    epochs_list_train.append(get_mne_epochs(i, verbose=verbose))

## Deep Learning with Braindecode 

### It's Training Time with [0.5, 4.5] sec and 2sec window with 125ms stride

In [12]:
from braindecode.datautil import create_from_mne_epochs

window_size = 1024   # 2 sec windows
window_stride = 64   # 125 ms stride

windows_datasets_list = []

for epoch in epochs_list_train:
    # Create windows per subject
    windows_dataset = create_from_mne_epochs(
        [epoch.crop(tmin=0.5, tmax=4.5, include_tmax=False)],
        window_size_samples=window_size,
        window_stride_samples=window_stride,
        drop_last_window=False
    )
    # Add labels as a separate attribute
    windows_dataset.update_description = pd.DataFrame(
        data=np.concatenate([d.y for d in windows_dataset.datasets]),
        columns=['labels']
    )
    windows_datasets_list.append(windows_dataset)

print("Datasets per subject:", len(windows_datasets_list))
print("Total windows for first subject:", len(windows_datasets_list[0]))


Datasets per subject: 10
Total windows for first subject: 1360


In [14]:
from braindecode.preprocessing import exponential_moving_standardize

low_cut_hz = 8.   # low cut frequency for filtering
high_cut_hz = 32. # high cut frequency for filtering
# Parameters for exponential moving standardization
factor_new = 1e-3
init_block_size = 1000

def custom_exp_moving_std_fn(epochs, factor_new=factor_new, init_block_size=init_block_size):
    """Apply exponential moving standardization to MNE epochs inplace."""
    data = epochs.get_data()
    for i in range(len(data)):
        data[i] = exponential_moving_standardize(
            data[i], factor_new=factor_new, init_block_size=init_block_size
        )
    epochs._data = data
    return epochs

# Apply preprocessing to each dataset
for windows_dataset in windows_datasets_list:
    # Extract the underlying MNE Epochs object
    epochs = windows_dataset.datasets[0].windows
    epochs.load_data()  # Ensure data is loaded into memory

    # 1) Keep only EEG channels
    epochs.pick_types(eeg=True)

    # 2) Bandpass filter
    epochs.filter(l_freq=low_cut_hz, h_freq=high_cut_hz)

    # 3) Exponential moving standardization
    custom_exp_moving_std_fn(epochs, factor_new=factor_new, init_block_size=init_block_size)


In [15]:
'''from braindecode.preprocessing import exponential_moving_standardize
from braindecode.preprocessing import MNEPreproc, NumpyPreproc, preprocess

low_cut_hz = 8.  # low cut frequency for filtering
high_cut_hz = 32.  # high cut frequency for filtering
# Parameters for exponential moving standardization
factor_new = 1e-3
init_block_size = 1000

def custom_exp_moving_std_fn(epochs, factor_new=factor_new, init_block_size=init_block_size):
    data = epochs.get_data()
    for i in range(len(data)):
        epochs._data[i] = exponential_moving_standardize(data[i], 
                        factor_new=factor_new, init_block_size=init_block_size)
    return epochs

preprocessors = [
    # keep only EEG sensors
    MNEPreproc(fn='pick_types', eeg=True, meg=False, stim=False),
    # bandpass filter
    MNEPreproc(fn='filter', l_freq=low_cut_hz, h_freq=high_cut_hz),
    # exponential moving standardization
    MNEPreproc(fn=custom_exp_moving_std_fn, factor_new=factor_new,
        init_block_size=init_block_size)
]'''

"from braindecode.preprocessing import exponential_moving_standardize\nfrom braindecode.preprocessing import MNEPreproc, NumpyPreproc, preprocess\n\nlow_cut_hz = 8.  # low cut frequency for filtering\nhigh_cut_hz = 32.  # high cut frequency for filtering\n# Parameters for exponential moving standardization\nfactor_new = 1e-3\ninit_block_size = 1000\n\ndef custom_exp_moving_std_fn(epochs, factor_new=factor_new, init_block_size=init_block_size):\n    data = epochs.get_data()\n    for i in range(len(data)):\n        epochs._data[i] = exponential_moving_standardize(data[i], \n                        factor_new=factor_new, init_block_size=init_block_size)\n    return epochs\n\npreprocessors = [\n    # keep only EEG sensors\n    MNEPreproc(fn='pick_types', eeg=True, meg=False, stim=False),\n    # bandpass filter\n    MNEPreproc(fn='filter', l_freq=low_cut_hz, h_freq=high_cut_hz),\n    # exponential moving standardization\n    MNEPreproc(fn=custom_exp_moving_std_fn, factor_new=factor_new,\n  

In [16]:
'''for windows_dataset in windows_datasets_list: 
    preprocess(windows_dataset, preprocessors)'''

'for windows_dataset in windows_datasets_list: \n    preprocess(windows_dataset, preprocessors)'

In [17]:
batch_size = 32 #64
n_epochs = 25 #25 #20 #25 use few epochs for quick verification

In [18]:
# Creating a model
import torch
from braindecode.util import set_random_seeds
from braindecode.models import ShallowFBCSPNet, EEGNetv4

cuda = torch.cuda.is_available()  # check if GPU is available, if True chooses to use it
device = 'cuda' if cuda else 'cpu'
if cuda:
    torch.backends.cudnn.benchmark = True
seed = 20200220  # random seed to make results reproducible
# Set random seed to be able to reproduce results
set_random_seeds(seed=seed, cuda=cuda)

n_classes=2
# Extract number of chans and time steps from dataset
n_chans = windows_datasets_list[0][0][0].shape[0]
input_window_samples = windows_datasets_list[0][0][0].shape[1]

model = EEGNetv4(
    n_chans,
    n_classes,
    n_times = window_size, #input_window_samples,
    final_conv_length='auto',
)

# Send model to GPU
if cuda:
    model.cuda()

In [19]:
# Training time
from skorch.callbacks import LRScheduler
from skorch.helper import predefined_split
from braindecode import EEGClassifier

lr = 1 * 0.02 #0.01 
weight_decay = 0.5 * 0.001

clfs_list = []
for i in range(len(epochs_list_train)):
    clfs_list.append(
        EEGClassifier(
                    model,
                    criterion = torch.nn.CrossEntropyLoss(),
                    optimizer=torch.optim.AdamW,
                    #train_split=predefined_split(train_set),  # using valid_set for validation
                    optimizer__lr=lr,
                    optimizer__weight_decay=weight_decay,
                    batch_size=batch_size,
                    callbacks=[
                        "accuracy", ("lr_scheduler", LRScheduler('CosineAnnealingLR', T_max=n_epochs - 1)),
                    ],
                    device=device,
                    )
                )

In [20]:
'''def training_function(subject_index=0):
    print('\n', '#'*25, 'Training for Subject:', subject_index+1, '#'*25, '\n')
    dataset = windows_datasets_list[subject_index]
    clfs_list[subject_index].fit(dataset, y=dataset.description.labels, epochs=n_epochs);
    best_validation_acc = clfs_list[subject_index].callbacks_[4][1].best_score_ # a hack to get best validation accuracy
    best_validation_kappa = (2*best_validation_acc)-1
    print("Best Cross Validation Kappa Score: {:.2f}".format(best_validation_kappa))'''

'def training_function(subject_index=0):\n    print(\'\n\', \'#\'*25, \'Training for Subject:\', subject_index+1, \'#\'*25, \'\n\')\n    dataset = windows_datasets_list[subject_index]\n    clfs_list[subject_index].fit(dataset, y=dataset.description.labels, epochs=n_epochs);\n    best_validation_acc = clfs_list[subject_index].callbacks_[4][1].best_score_ # a hack to get best validation accuracy\n    best_validation_kappa = (2*best_validation_acc)-1\n    print("Best Cross Validation Kappa Score: {:.2f}".format(best_validation_kappa))'

In [21]:
def training_function(subject_index=0):
    print('\n', '#'*25, 'Training for Subject:', subject_index+1, '#'*25, '\n')
    
    dataset = windows_datasets_list[subject_index]
    
    # Get labels directly from the dataset
    y = np.concatenate([d.y for d in dataset.datasets])
    
    # Train the model
    clfs_list[subject_index].fit(dataset, y=y, epochs=n_epochs)
    
    # Get validation accuracy from history
    history = clfs_list[subject_index].history
    # Convert to a list of dicts
    valid_accs = [entry['valid_accuracy'] for entry in history if 'valid_accuracy' in entry]
    
    if valid_accs:
        best_val_acc = max(valid_accs)
        best_val_kappa = (2 * best_val_acc) - 1
        print(f"Best Cross Validation Kappa Score: {best_val_kappa:.2f}")
    else:
        print("Validation accuracy not available in history.")




In [22]:
print(clfs_list[0].module)        # show model architecture
print(clfs_list[0].criterion)     # show loss function


Layer (type (var_name):depth-idx)                            Input Shape               Output Shape              Param #                   Kernel Shape
EEGNetv4 (EEGNetv4)                                          [1, 12, 1024]             [1, 2]                    --                        --
├─Ensure4d (ensuredims): 1-1                                 [1, 12, 1024]             [1, 12, 1024, 1]          --                        --
├─Rearrange (dimshuffle): 1-2                                [1, 12, 1024, 1]          [1, 1, 12, 1024]          --                        --
├─Conv2d (conv_temporal): 1-3                                [1, 1, 12, 1024]          [1, 8, 12, 1025]          512                       [1, 64]
├─BatchNorm2d (bnorm_temporal): 1-4                          [1, 8, 12, 1025]          [1, 8, 12, 1025]          16                        --
├─ParametrizedConv2dWithConstraint (conv_spatial): 1-5       [1, 8, 12, 1025]          [1, 16, 1, 1025]          --                  

In [23]:
for subject_index in range(len(training_files)):
    training_function(subject_index)



 ######################### Training for Subject: 1 ######################### 

  epoch    train_accuracy    train_loss    valid_acc    valid_accuracy    valid_loss      lr     dur
-------  ----------------  ------------  -----------  ----------------  ------------  ------  ------
      1            [36m0.5000[0m        [32m0.7370[0m       [35m0.4375[0m            [31m0.4375[0m       [94m15.7079[0m  0.0200  1.3855
      2            0.5000        [32m0.5471[0m       0.4375            0.4375       44.3179  0.0199  1.4112
      3            0.5000        [32m0.4531[0m       0.4375            0.4375       35.3165  0.0197  1.3290
      4            0.5000        [32m0.4174[0m       0.4375            0.4375       51.6970  0.0192  1.3217
      5            0.5000        0.4339       0.4375            0.4375       29.9006  0.0187  1.2852
      6            0.5000        0.5354       0.4375            0.4375       [94m14.9388[0m  0.0179  1.2438
      7            0.5000      