# Instruction

**Run each cell one by one**

# Read data

In [1]:
from google.colab import drive
drive.mount('/content/drive')

!pip install mne



import numpy as np
import scipy.io as sio
import mne
def indexget(total,select):
  return [i for i, e in enumerate(total) if e in select]



def inputmat(fp1,fp2,fp3):
    """load .mat file and return m as a dict"""
    mat = sio.loadmat(fp1, squeeze_me=True)
    label_mat=sio.loadmat(fp2, squeeze_me=True)
    m = {}  # create a dict

    # Numpy array of size channel_num * points.
    select_channel=fp3
    channelindex= indexget(mat['nfo']['clab'][True][0],select_channel)
    m['ch_names'] = mat['nfo']['clab'][True][0][channelindex]
    


    m['data'] = mat['cnt'].T[channelindex]
    m['freq'] = mat['nfo']['fs'][True][0]  # Sampling frequency

    # channel names are necessary information for creating a rawArray.


    # Position of channels
    m['electrode_x'] = mat['nfo']['xpos'][True][0]
    m['electrode_y'] = mat['nfo']['ypos'][True][0]

    # find trials and its data
    m['cue'] = mat['mrk']['pos'][True][0]  # time of cue
    m['labels'] = label_mat['true_y']  
    # m['labels'] = np.nan_to_num(mat['mrk']['y'][True][0]).astype(int)  # convert NaN to 0
    m['test_idx'] = len(label_mat['test_idx'])
    m['n_trials'] = len(m['labels'])  # Number of the total useful trials
    return m


def creatEventsArray(fp1,fp2,fp3):
    """Create events array. The second column default to zero."""
    m = inputmat(fp1,fp2,fp3)
    events = np.zeros((m['n_trials'], 3), int)
    events[:, 0] = m['cue'][:m['n_trials']]  # The first column is the sample number of the event.
    events[:, 2] = m['labels'][:m['n_trials']]  # The third column is the new event value.
    return events, m['labels']


def creatRawArray(fp1,fp2,fp3):
    """Create a mne.io.RawArray object, data: array, shape (n_channels, n_times)"""
    m = inputmat(fp1,fp2,fp3)
    ch_names = m['ch_names'].tolist()
    info = mne.create_info(ch_names, m['freq'], 'eeg')  # Create info for raw
    raw = mne.io.RawArray(m['data'], info, first_samp=0, copy='auto', verbose=None)
    return raw

Mounted at /content/drive
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mne
  Downloading mne-1.0.3-py3-none-any.whl (7.5 MB)
[K     |████████████████████████████████| 7.5 MB 5.3 MB/s 
Installing collected packages: mne
Successfully installed mne-1.0.3


In [2]:
print(__doc__)

fp = {
    'aa': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/data_set_IVa_aa.mat',
    'al': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/data_set_IVa_al.mat',
    'av': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/data_set_IVa_av.mat',
    'aw': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/data_set_IVa_aw.mat',
    'ay': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/data_set_IVa_ay.mat',
}


fplabel={
    'aa': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/true_labels_aa.mat',
    'al': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/true_labels_al.mat',
    'av': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/true_labels_av.mat',
    'aw': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/true_labels_aw.mat',
    'ay': '/content/drive/MyDrive/fewshotonlineBCI/CompetitionIII_IVa/true_labels_ay.mat',

}

channel_set=['C1',  'C3',  'Cz',  'C2',  'C4',  'CFC3',  'CFC4','CFC5',  'CFC6',  'CCP3', 
             'CCP4',  'CCP5',  'CCP6',  'T7',  'T8',  'P3',  'Pz',  'P4']
pick_chan = {
    'aa':  channel_set,
    'al':  channel_set,
    'av':  channel_set,
    'aw':  channel_set,
    'ay':  channel_set,

}

low_freq, high_freq = 7., 30.
tmin, tmax = 0., 3.5
# event_id
event_id = {'right': 1, 'foot': 2}

Automatically created module for IPython interactive environment


# Processing the data

In [3]:
from sklearn.model_selection import ShuffleSplit, cross_val_score
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.pipeline import Pipeline

from mne.decoding import CSP
from mne.channels import read_layout

def cal_acc(pred,real):
  return [1 if pred[j]==real[j] else 0 for j in range(len(real))]

total_length=280
train_num=10
test_num=10


dic={}
for f,fla,cs in zip(fp,fplabel,pick_chan):
    mat_dic=inputmat(fp[f],fplabel[fla],pick_chan[cs])
    raw = creatRawArray(fp[f],fplabel[fla],pick_chan[cs])
    events, labels = creatEventsArray(fp[f],fplabel[fla],pick_chan[cs])


    # Apply band-pass filter
    raw.filter(low_freq, high_freq, fir_design='firwin', skip_by_annotation='edge')

    # event_train = eventsTrain(fp[f])
    epochs = mne.Epochs(raw, events=events, event_id=event_id, tmin=tmin, tmax=tmax, baseline=None, preload=True,
                        verbose=False)

    epochs_train = epochs.copy().crop(tmin=0.5, tmax=2.5)
    labels = epochs.events[:, -1]

    index1=np.where(labels == 1)[0]
    index2=np.where(labels == 2)[0]
    choice1=np.random.choice(index1,train_num,replace=False)
    choice2=np.random.choice(index2,train_num,replace=False)
    choice_total=np.hstack((choice1,choice2))


    # # Define a monte-carlo cross-validation generator (reduce variance):
    epochs_data= epochs_train.get_data()
    test_index=np.delete(list(range(total_length)),choice_total)
    train_x,train_y=epochs_data[choice_total],labels[choice_total]
    test_x,test_y=epochs_data[test_index],labels[test_index]
    temp_xy=[train_x,train_y,test_x,test_y]
    dic[f]=temp_xy

Creating RawArray with float64 data, n_channels=18, n_times=298458
    Range : 0 ... 298457 =      0.000 ...  2984.570 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 7 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 7.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 6.00 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 165 samples (1.650 sec)

Creating RawArray with float64 data, n_channels=18, n_times=283574
    Range : 0 ... 283573 =      0.000 ...  2835.730 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 7 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal 

In [4]:
for i in dic:
  print(dic[i][0].shape,dic[i][1].shape,dic[i][2].shape,dic[i][3].shape)


(20, 18, 201) (20,) (260, 18, 201) (260,)
(20, 18, 201) (20,) (260, 18, 201) (260,)
(20, 18, 201) (20,) (260, 18, 201) (260,)
(20, 18, 201) (20,) (260, 18, 201) (260,)
(20, 18, 201) (20,) (260, 18, 201) (260,)


# Network

In [5]:
import numpy as np
import scipy.io as sio
from tensorflow.keras.utils import to_categorical
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd.function import Function
from torch.autograd import Variable
import torch.nn.functional as F
from torch.utils.data import Dataset,DataLoader,TensorDataset
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plot
from sklearn.decomposition import PCA
import torch.utils.data as Data
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from torchvision import models
from torchsummary import summary
import torch.optim.lr_scheduler as lr_scheduler


In [6]:
class Constrainedlinear(nn.Linear):
    def _max_norm(self,w,max_val=0.25,eps=1e-8):
        norm = w.norm(2, dim=1, keepdim=True)
        desired = torch.clamp(norm, 0, max_val)
        return w * (desired / (eps + norm))
    def forward(self, input):
        return F.linear(input, self._max_norm(self.weight),self.bias)


class Constrainedlinear111(nn.Linear):
    def _max_norm(self,w,max_val=0.5,eps=1e-8):
        norm = w.norm(2, dim=1, keepdim=True)
        desired = torch.clamp(norm, 0, max_val)
        return w * (desired / (eps + norm))
    def forward(self, input):
        return F.linear(input, self._max_norm(self.weight),self.bias)


class EEGNet(nn.Module):
    def __init__(self, classes_num):
        super(EEGNet, self).__init__()
        self.numC=18

        self.drop_out = 0.2
        
        self.block_1 = nn.Sequential(
            # Pads the input tensor boundaries with zero
            # left, right, up, bottom
            nn.ZeroPad2d((15, 16, 0, 0)),
            nn.Conv2d(
                in_channels=1,          # input shape (1, C, T)
                out_channels=8,         # num_filters
                kernel_size=(1, 32),    # filter size
                bias=False
            ),                          # output shape (8, C, T)
            nn.BatchNorm2d(8)           # output shape (8, C, T)
        )
        
        # block 2 and 3 are implementations of Depthwise Convolution and Separable Convolution
        self.block_2 = nn.Sequential(
            nn.Conv2d(
                in_channels=8,          # input shape (8, C, T)
                out_channels=16,        # num_filters
                kernel_size=(self.numC, 1),    # filter size
                groups=8,
                bias=False
            ),                          # output shape (16, 1, T)
            nn.BatchNorm2d(16),         # output shape (16, 1, T)
            nn.ELU(),
            nn.AvgPool2d((1, 8)),       # output shape (16, 1, T//4)
            nn.Dropout(self.drop_out)   # output shape (16, 1, T//4)
        )
        
        self.block_3 = nn.Sequential(
            nn.ZeroPad2d((7, 8, 0, 0)),
            nn.Conv2d(
               in_channels=16,          # input shape (16, 1, T//4)
               out_channels=16,         # num_filters
               kernel_size=(1, 16),     # filter size
               groups=16,
               bias=False
            ),                          # output shape (16, 1, T//4)
            nn.Conv2d(
                in_channels=16,         # input shape (16, 1, T//4)
                out_channels=16,        # num_filters
                kernel_size=(1, 1),     # filter size
                bias=False
            ),                          # output shape (16, 1, T//4)
            nn.BatchNorm2d(16),         # output shape (16, 1, T//4)
            nn.ELU(),
            nn.AvgPool2d((1, 8)),       # output shape (16, 1, T//32)
            nn.Dropout(self.drop_out)
        )


        self.preluip1 = nn.PReLU()
        self.out = Constrainedlinear((48), classes_num)
    
    def forward(self, x):
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.block_3(x)
        
        x = x.view(x.size(0), -1)
        x = self.out(x)
        return F.log_softmax(x, dim=1) #x   # return x for visualization



# Training

In [7]:
def train(subject_string):
  #Prepare the raw feature data for training and testing#
  #You can change the "Subject_number" (0-8 corresponding to B01-B09, respectively) to try training and testing between different subjects
  dataset=dic[subject_string]

  x_train=dataset[0].reshape(dataset[0].shape[0],1,dataset[0].shape[1],-1)
  y_train=dataset[1]
  x_test= dataset[2].reshape(dataset[2].shape[0],1,dataset[2].shape[1],-1)
  y_test=dataset[3]





  x_train=torch.from_numpy(x_train)
  y_train=torch.from_numpy(y_train-1)
  x_test=torch.from_numpy(x_test)
  y_test=torch.from_numpy(y_test-1)
  x_train=x_train.float()
  x_test=x_test.float()
  y_train=y_train.long()
  y_test=y_test.long()
  train_dataset=Data.TensorDataset(x_train,y_train)
  val_dataset=Data.TensorDataset(x_test,y_test)

  BATCH_SIZE=10
  #Construct the training Loader
  train_loader= Data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
  test_loader = Data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)


  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  eeg_net = EEGNet(classes_num=2).to(device)


  nllloss = nn.NLLLoss().to(device) #CrossEntropyLoss = log_softmax + NLLLoss

# optimzer4nn
  optimizer4nn = optim.Adam(eeg_net.parameters(),lr=0.001)

  #Define the number of epochs
  EPOCH=100

#Define the device
  DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  for epoch in range(EPOCH):
    loss_total=[]

  #Update x_i using the gradient descent
    for step, (b_x, b_y) in enumerate(train_loader):
      b_x, b_y = b_x.to(DEVICE), b_y.to(DEVICE)
      output = eeg_net(b_x)
      loss = nllloss(output,b_y) 
      loss_total.append(loss.item())
      optimizer4nn.zero_grad()
      loss.backward()
      optimizer4nn.step()
    if (epoch+1)%50==0:
      print('Epoch: ', epoch, '| train loss: %.4f' % (np.mean(loss_total)),len(y_test))
 
      eeg_net.eval()
      train_out=eeg_net(x_train.to(DEVICE))
      trpre_y = torch.max(train_out.cpu(), 1)[1].data.numpy()
      tra_acc = float((trpre_y == y_train.data.numpy()).astype(int).sum()) / float(y_train.size(0))
      print("Subject "+str(subject_string)+' train accuracy: %.4f' % tra_acc,len(train_loader))


      test_out = eeg_net(x_test.to(DEVICE))
      pred_y = torch.max(test_out.cpu(), 1)[1].data.numpy()
      accuracy = float((pred_y == y_test.data.numpy()).astype(int).sum()) / float(y_test.size(0))
      print("Subject "+str(subject_string)+' test accuracy: %.4f' % accuracy,len(y_test))
      print('')
      eeg_net.train()
  return accuracy


# Run

In [8]:
str_list=['aa','al','av','aw','ay']
i=2
print(str_list[i])
acc=train(str_list[i])


av
Epoch:  49 | train loss: 0.3393 260
Subject av train accuracy: 0.8500 2
Subject av test accuracy: 0.6154 260

Epoch:  99 | train loss: 0.1168 260
Subject av train accuracy: 1.0000 2
Subject av test accuracy: 0.5962 260

