# install the mne and braindecode librarie 

In [1]:
# for Open-source Python software for exploring, visualizing, 
# and analyzing human neurophysiological data: MEG, EEG, sEEG, ECoG, and more.
!pip install mne 
# A deep learning toolbox to decode raw time-domain EEG.
!pip install https://github.com/TNTLFreiburg/braindecode/archive/master.zip 


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting https://github.com/TNTLFreiburg/braindecode/archive/master.zip
  Using cached https://github.com/TNTLFreiburg/braindecode/archive/master.zip


In [2]:
import mne
from mne.io import concatenate_raws
import numpy as np
import torch as th
from torch import nn
import torch.nn.functional as F
from braindecode.torch_ext.util import set_random_seeds
from braindecode.models.util import to_dense_prediction_model
from braindecode.datautil.iterators import CropsFromTrialsIterator
from braindecode.torch_ext.util import np_to_var, var_to_np
from braindecode.experiments.monitors import compute_preds_per_trial_from_crops
from braindecode.torch_ext.optimizers import AdamW
from braindecode.torch_ext.schedulers import ScheduledOptimizer, CosineAnnealing
from braindecode.datautil.iterators import get_balanced_batches
from braindecode.datautil.signal_target import SignalAndTarget
from braindecode.models.shallow_fbcsp import ShallowFBCSPNet
from numpy.random import RandomState


In [None]:
# (open and close the left or right fist)               ---> 3,7,11 

# (imagine opening and closing the left or right fist)  ---> 4,8,12 <--- I chose this

# (open and close both fists or both feet)              ---> 5,9,13

# (imagine opening and closing both fists or both feet) ---> 6,10,14

# subject_id = [] # carefully cherry-picked to give nice results on such limited data :)
# This will download the files if you don't have them yet,and then return the paths to the files.
physionet_paths = [mne.datasets.eegbci.load_data(sub_id,[4,8,12]) for sub_id in range(1,80)]
physionet_paths = np.concatenate(physionet_paths)

# Load each of the files
parts = [mne.io.read_raw_edf(path, preload=True,stim_channel='auto',verbose='WARNING')
         for path in physionet_paths]

# Concatenate them
raw = concatenate_raws(parts)

# Find the events in this dataset
events, _ = mne.events_from_annotations(raw) #<--- no use the event_id for that is the "_"

# Use only EEG channels
eeg_channel_inds = mne.pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False,
                   exclude='bads')

# Each annotation includes one of three codes (T0=1, T1=2, or T2=3):

# T0 corresponds to rest
# T1 corresponds to onset of motion (real or imagined) of
# the left fist (in runs 3, 4, 7, 8, 11, and 12)
# both fists (in runs 5, 6, 9, 10, 13, and 14)
# T2 corresponds to onset of motion (real or imagined) of
# the right fist (in runs 3, 4, 7, 8, 11, and 12)
# both feet (in runs 5, 6, 9, 10, 13, and 14)

# Extract trials, only using EEG channels
epoched = mne.Epochs(raw, events, dict(left=2, right=3), tmin=1, tmax=4.1, proj=False, picks=eeg_channel_inds,
                baseline=None, preload=True)

In [4]:
# Convert data from volt to millivolt
# Pytorch expects float32 for input and int64 for labels.
X = (epoched.get_data() * 1e6).astype(np.float32)
y = (epoched.events[:,2] - 2).astype(np.int64) #2,3 -> 0,1 

In [None]:
epoched

In [None]:
epoched.events[:,2]-2 #---> the 0 represent the left and the 1 represent the right 

In [7]:
train_set = SignalAndTarget(X[:2500], y=y[:2500]) 
valid_set = SignalAndTarget(X[2700:3000], y=y[2700:3000]) 

In [None]:
train_set.X.shape[2]

In [None]:
train_set.X.shape[1]

In [10]:
# Set if you want to use GPU
# You can also use torch.cuda.is_available() to determine if cuda is available on your machine.

cuda = th.cuda.is_available()
set_random_seeds(seed=20170629, cuda=cuda)

# This will determine how many crops are processed in parallel
input_time_length = train_set.X.shape[2]
n_classes = 2
in_chans = train_set.X.shape[1]

# final_conv_length determines the size of the receptive field of the ConvNet
model = ShallowFBCSPNet(in_chans=in_chans, n_classes=n_classes, 
                        input_time_length=input_time_length,final_conv_length = "auto").create_network()
to_dense_prediction_model(model)

if cuda:
    model.cuda()

In [None]:
cuda

In [None]:
# determine output size
test_input = np_to_var(np.ones((2, in_chans, input_time_length, 1), dtype=np.float32))
if cuda:
    test_input = test_input.cuda()
out = model(test_input)
n_preds_per_input = out.cpu().data.numpy().shape[2]
print("{:d} predictions per input/trial".format(n_preds_per_input))

In [13]:
# The iterator has the method get_batches, which can be used to get randomly shuffled training batches 
# with shuffle=True or ordered batches (i.e. first from trial 1, then from trial 2, etc.) with shuffle=False
iterator = CropsFromTrialsIterator(batch_size=64,input_time_length=input_time_length,
                                  n_preds_per_input=n_preds_per_input)

In [14]:
# Diferents values for the optimizer, uncoment and try ;) 
#optimizer = AdamW(model.parameters(), lr=2*0.01, weight_decay=0.01) # these are good values for the deep model
#optimizer = AdamW(model.parameters(), lr=1*0.001, weight_decay=0.0001)
optimizer = AdamW(model.parameters(), lr=0.06255 * 0.015, weight_decay=0)
#optimizer = AdamW(model.parameters(), lr=1*0.01, weight_decay=0.5*0.001) # these are good values for the deep model
#optimizer = AdamW(model.parameters(), lr=0.0625 * 0.01, weight_decay=0)

# Need to determine number of batch passes per epoch for cosine annealing

n_updates_per_epoch = len([None for b in iterator.get_batches(train_set, True)])
n_epochs = 50
scheduler = CosineAnnealing(n_epochs * n_updates_per_epoch)

# schedule_weight_decay must be True for AdamW
optimizer = ScheduledOptimizer(scheduler, optimizer, schedule_weight_decay=True)

In [None]:
#lets go to train !!! 
for i_epoch in range(50):
    # Set model to training mode
    model.train()
    for batch_X, batch_y in iterator.get_batches(train_set, shuffle=True):
        net_in = np_to_var(batch_X)
        if cuda:
            net_in = net_in.cuda()
        net_target = np_to_var(batch_y)
        if cuda:
            net_target = net_target.cuda()
        # Remove gradients of last backward pass from all parameters
        optimizer.zero_grad()
        outputs = model(net_in)
        # Mean predictions across trial
        # Note that this will give identical gradients to computing
        # a per-prediction loss (at least for the combination of log softmax activation
        # and negative log likelihood loss which we are using here)
        outputs = th.mean(outputs, dim=2, keepdim=False)
        loss = F.nll_loss(outputs, net_target)
        loss.backward()
        optimizer.step()

    # Print some statistics each epoch
    model.eval()
    print("Epoch {:d}".format(i_epoch))
    for setname, dataset in (('Train', train_set),('Valid', valid_set)):
        # Collect all predictions and losses
        all_preds = []
        all_losses = []
        batch_sizes = []
        for batch_X, batch_y in iterator.get_batches(dataset, shuffle=False):
            net_in = np_to_var(batch_X)
            if cuda:
                net_in = net_in.cuda()
            net_target = np_to_var(batch_y)
            if cuda:
                net_target = net_target.cuda()
            outputs = model(net_in)
            all_preds.append(var_to_np(outputs))
            outputs = th.mean(outputs, dim=2, keepdim=False)
            loss = F.nll_loss(outputs, net_target)
            loss = float(var_to_np(loss))
            all_losses.append(loss)
            batch_sizes.append(len(batch_X))
        # Compute mean per-input loss
        loss = np.mean(np.array(all_losses) * np.array(batch_sizes) /
                       np.mean(batch_sizes))
        print("{:6s} Loss: {:.5f}".format(setname, loss))
        # Assign the predictions to the trials
        preds_per_trial = compute_preds_per_trial_from_crops(all_preds,
                                                          input_time_length,
                                                          dataset.X)
        # preds per trial are now trials x classes x timesteps/predictions
        # Now mean across timesteps for each trial to get per-trial predictions
        meaned_preds_per_trial = np.array([np.mean(p, axis=1) for p in preds_per_trial])
        predicted_labels = np.argmax(meaned_preds_per_trial, axis=1)
        accuracy = np.mean(predicted_labels == dataset.y)
        print("{:6s} Accuracy: {:.1f}%".format(
            setname, accuracy * 100))

In [None]:
# now we will test the model
test_set = SignalAndTarget(X[2500:2700], y=y[2500:2700])

model.eval()
# Collect all predictions and losses
all_preds = []
all_losses = []
batch_sizes = []
for batch_X, batch_y in iterator.get_batches(test_set, shuffle=False):
    net_in = np_to_var(batch_X)
    if cuda:
        net_in = net_in.cuda()
    net_target = np_to_var(batch_y)
    if cuda:
        net_target = net_target.cuda()
    outputs = model(net_in)
    all_preds.append(var_to_np(outputs))
    outputs = th.mean(outputs, dim=2, keepdim=False)
    loss = F.nll_loss(outputs, net_target)
    loss = float(var_to_np(loss))
    all_losses.append(loss)
    batch_sizes.append(len(batch_X))
# Compute mean per-input loss
loss = np.mean(np.array(all_losses) * np.array(batch_sizes) /
               np.mean(batch_sizes))
print("Test Loss: {:.5f}".format(loss))
# Assign the predictions to the trials
preds_per_trial = compute_preds_per_trial_from_crops(all_preds,
                                                  input_time_length,
                                                  test_set.X)
# preds per trial are now trials x classes x timesteps/predictions
# Now mean across timesteps for each trial to get per-trial predictions
meaned_preds_per_trial = np.array([np.mean(p, axis=1) for p in preds_per_trial])
predicted_labels = np.argmax(meaned_preds_per_trial, axis=1)
accuracy = np.mean(predicted_labels == test_set.y)
print("Test Accuracy: {:.1f}%".format(accuracy * 100))

In [None]:
# print the model arquitecture 
model

# re training?
### lets go!!! 


In [None]:
# load the data
physionet_paths_v2 = [mne.datasets.eegbci.load_data(sub_id,[4,8,12]) for sub_id in range(81,108)]
physionet_paths_v2 = np.concatenate(physionet_paths_v2)

In [None]:
# Load each of the files
parts_v2 = [mne.io.read_raw_edf(path, preload=True,stim_channel='auto',verbose='WARNING')
         for path in physionet_paths_v2]

# Concatenate them
raw_v2 = concatenate_raws(parts)

# Find the events in this dataset
events_v2, _ = mne.events_from_annotations(raw_v2) #<--- no use the event_id for that is the "_"

# Use only EEG channels
eeg_channel_inds_v2 = mne.pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False,
                   exclude='bads')

#Each annotation includes one of three codes (T0=1, T1=2, or T2=3):

#T0 corresponds to rest
#T1 corresponds to onset of motion (real or imagined) of
#the left fist (in runs 3, 4, 7, 8, 11, and 12)
#both fists (in runs 5, 6, 9, 10, 13, and 14)
#T2 corresponds to onset of motion (real or imagined) of
#the right fist (in runs 3, 4, 7, 8, 11, and 12)
#both feet (in runs 5, 6, 9, 10, 13, and 14)

# Extract trials, only using EEG channels
epoched_2 = mne.Epochs(raw_v2, events_v2, dict(left=2, right=3), tmin=1, tmax=4.1, proj=False, picks=eeg_channel_inds_v2,
                baseline=None, preload=True)


In [None]:
# Convert data from volt to millivolt
# Pytorch expects float32 for input and int64 for labels.
X = (epoched_2.get_data() * 1e6).astype(np.float32)
y = (epoched_2.events[:,2] - 2).astype(np.int64) #2,3 -> 0,1

In [None]:
epoched_2

In [None]:
epoched_2.events[:,2]-2 #---> the 0 represent the left and the 1 represent the right 

In [None]:
train_set = SignalAndTarget(X[:2535], y=y[:2535]) # we have 3377 events and we will use 1636 for the training
valid_set = SignalAndTarget(X[2535:3100], y=y[2535:3100]) #and for valid we will use 2450-1636 = 814 

In [None]:
# Set if you want to use GPU
# You can also use torch.cuda.is_available() to determine if cuda is available on your machine.
cuda = torch.cuda.is_available()
set_random_seeds(seed=20170629, cuda=cuda)

# This will determine how many crops are processed in parallel
input_time_length = train_set.X.shape[2]
n_classes = 2
in_chans = train_set.X.shape[1]
# final_conv_length determines the size of the receptive field of the ConvNet
model = ShallowFBCSPNet(in_chans=in_chans, n_classes=n_classes, input_time_length=input_time_length,
                        final_conv_length=8).create_network()
to_dense_prediction_model(model)

if cuda:
    model.cuda()

In [None]:
# determine output size
test_input = np_to_var(np.ones((2, in_chans, input_time_length, 1), dtype=np.float32))
if cuda:
    test_input = test_input.cuda()
out = model(test_input)
n_preds_per_input = out.cpu().data.numpy().shape[2]
print("{:d} predictions per input/trial".format(n_preds_per_input))

In [None]:
iterator = CropsFromTrialsIterator(batch_size=64,input_time_length=input_time_length,
                                  n_preds_per_input=n_preds_per_input)

optimizer = AdamW(model.parameters(), lr=1*0.001, weight_decay=0.0001)


In [None]:
# Need to determine number of batch passes per epoch for cosine annealing

n_updates_per_epoch = len([None for b in iterator.get_batches(train_set, True)])
n_epochs = 50
scheduler = CosineAnnealing(n_epochs * n_updates_per_epoch)

# schedule_weight_decay must be True for AdamW
optimizer = ScheduledOptimizer(scheduler, optimizer, schedule_weight_decay=True)

In [None]:
# lets go to train again xD!!!
# but no with the same data  
for i_epoch in range(50):
    # Set model to training mode
    model.train()
    for batch_X, batch_y in iterator.get_batches(train_set, shuffle=True):
        net_in = np_to_var(batch_X)
        if cuda:
            net_in = net_in.cuda()
        net_target = np_to_var(batch_y)
        if cuda:
            net_target = net_target.cuda()
        # Remove gradients of last backward pass from all parameters
        optimizer.zero_grad()
        outputs = model(net_in)
        # Mean predictions across trial
        # Note that this will give identical gradients to computing
        # a per-prediction loss (at least for the combination of log softmax activation
        # and negative log likelihood loss which we are using here)
        outputs = th.mean(outputs, dim=2, keepdim=False)
        loss = F.nll_loss(outputs, net_target)
        loss.backward()
        optimizer.step()

    # Print some statistics each epoch
    model.eval()
    print("Epoch {:d}".format(i_epoch))
    for setname, dataset in (('Train', train_set),('Valid', valid_set)):
        # Collect all predictions and losses
        all_preds = []
        all_losses = []
        batch_sizes = []
        for batch_X, batch_y in iterator.get_batches(dataset, shuffle=False):
            net_in = np_to_var(batch_X)
            if cuda:
                net_in = net_in.cuda()
            net_target = np_to_var(batch_y)
            if cuda:
                net_target = net_target.cuda()
            outputs = model(net_in)
            all_preds.append(var_to_np(outputs))
            outputs = th.mean(outputs, dim=2, keepdim=False)
            loss = F.nll_loss(outputs, net_target)
            loss = float(var_to_np(loss))
            all_losses.append(loss)
            batch_sizes.append(len(batch_X))
        # Compute mean per-input loss
        loss = np.mean(np.array(all_losses) * np.array(batch_sizes) /
                       np.mean(batch_sizes))
        print("{:6s} Loss: {:.5f}".format(setname, loss))
        # Assign the predictions to the trials
        preds_per_trial = compute_preds_per_trial_from_crops(all_preds,
                                                          input_time_length,
                                                          dataset.X)
        # preds per trial are now trials x classes x timesteps/predictions
        # Now mean across timesteps for each trial to get per-trial predictions
        meaned_preds_per_trial = np.array([np.mean(p, axis=1) for p in preds_per_trial])
        predicted_labels = np.argmax(meaned_preds_per_trial, axis=1)
        accuracy = np.mean(predicted_labels == dataset.y)
        print("{:6s} Accuracy: {:.1f}%".format(
            setname, accuracy * 100))

In [None]:
# :( no improve much