In [1]:
import numpy as np
import os
import torch
from torch import Tensor
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.optim.lr_scheduler import ExponentialLR
import torch.optim as optim
import matplotlib.pyplot as plt
def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    #random.seed(seed)
    torch.backends.cudnn.deterministic = True

def read_bci_data():
    S4b_train = np.load('S4b_train.npz')
    X11b_train = np.load('X11b_train.npz')
    S4b_test = np.load('S4b_test.npz')
    X11b_test = np.load('X11b_test.npz')

    train_data = np.concatenate((S4b_train['signal'], X11b_train['signal']), axis=0)
    train_label = np.concatenate((S4b_train['label'], X11b_train['label']), axis=0)
    test_data = np.concatenate((S4b_test['signal'], X11b_test['signal']), axis=0)
    test_label = np.concatenate((S4b_test['label'], X11b_test['label']), axis=0)

    train_label = train_label - 1
    test_label = test_label -1
    train_data = np.transpose(np.expand_dims(train_data, axis=1), (0, 1, 3, 2))
    test_data = np.transpose(np.expand_dims(test_data, axis=1), (0, 1, 3, 2))

    mask = np.where(np.isnan(train_data))
    train_data[mask] = np.nanmean(train_data)

    mask = np.where(np.isnan(test_data))
    test_data[mask] = np.nanmean(test_data)

    print(train_data.shape, train_label.shape, test_data.shape, test_label.shape)

    return train_data, train_label, test_data, test_label

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class EEGNet(nn.Module):
    def __init__(self, activation='relu', dropout=0.25):
        super(EEGNet, self).__init__()
        self.act_type = activation
        if activation == 'relu':
            self.activation = nn.ReLU()
        elif activation == 'lrelu':
            self.activation = nn.LeakyReLU()
        else:
            self.activation = nn.ELU(alpha=1.0)

        self.conv2d = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=(1, 51), stride=(1, 1), padding=(0, 25), bias=False),
            nn.BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        self.depthwiseConv2d = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=(2, 1), stride=(1, 1), groups=16, bias=False),
            nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            self.activation,
            nn.AvgPool2d(kernel_size=(1, 4), stride=(1, 4), padding=0),
            nn.Dropout(p=dropout)
        )
        self.separableConv2d = nn.Sequential(
            nn.Conv2d(32, 32, kernel_size=(1, 15), stride=(1, 1), padding=(0, 7), bias=False),
            nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            self.activation,
            nn.AvgPool2d(kernel_size=(1, 8), stride=(1, 8), padding=0),
            nn.Dropout(p=dropout)
        )
        self.classify = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=736, out_features=2, bias=True)
        )
    def info(self):
        print(f"training EEGNet with {self.act_type}")
        
    def forward(self, x):
        
        out = self.conv2d(x)
        out = self.depthwiseConv2d(out)
        out = self.separableConv2d(out)
        out = self.classify(out)
        return out

In [3]:
class DeepConvNet(nn.Module):
    def __init__(self, activation='relu'):
        super(DeepConvNet, self).__init__()
        self.act_type = activation
        if activation == 'relu':
            self.activation = nn.ReLU()
        elif activation == 'lrelu':
            self.activation = nn.LeakyReLU()
        else:
            self.activation = nn.ELU(alpha=1.0)

        self.layer0 = nn.Sequential(
            nn.Conv2d(1, 25, kernel_size=(1, 5), stride=(1, 1), padding=(0, 0), bias=False),
            nn.Conv2d(25, 25, kernel_size=(2, 1), stride=(1, 1), padding=(0, 0), bias=False),
            nn.BatchNorm2d(25),
            self.activation,
            nn.MaxPool2d(kernel_size=(1, 2)),
            nn.Dropout(p=0.5))

        self.layer1 = nn.Sequential(
            nn.Conv2d(25, 50, kernel_size=(1, 5), stride=(1, 1), padding=(0, 0), bias=False),
            nn.BatchNorm2d(50),
            self.activation,
            nn.MaxPool2d(kernel_size=(1, 2)),
            nn.Dropout(p=0.5))

        self.layer2 = nn.Sequential(
            nn.Conv2d(50, 100, kernel_size=(1, 5), stride=(1, 1), padding=(0, 0), bias=False),
            nn.BatchNorm2d(100),
            self.activation,
            nn.MaxPool2d(kernel_size=(1, 2)),
            nn.Dropout(p=0.5))

        self.layer3 = nn.Sequential(
            nn.Conv2d(100, 200, kernel_size=(1, 5), stride=(1, 1), padding=(0, 0), bias=False),
            nn.BatchNorm2d(200),
            self.activation,
            nn.MaxPool2d(kernel_size=(1, 2)),
            nn.Dropout(p=0.5))

        self.layer4 = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=43*200, out_features=2, bias=True))
    def info(self):
        print(f"training DeepConvNet with {self.act_type}")
    def forward(self, x):
        out = self.layer0(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        return out

In [4]:
def test(model, device, test_loader):
    model.eval()
    correct = 0
    test_loss = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            out = model.forward(data)
            out = torch.argmax(out, dim=1)
            correct += (out==target).sum().cpu().item()
    acc = 100. * correct / len(test_loader.dataset)
    return acc

In [5]:
setup_seed(87)
train_x,train_y,test_x,test_y = read_bci_data()

use_cuda = torch.cuda.is_available()

device = torch.device("cuda" if use_cuda else "cpu")
batch_size = 32
train_kwargs = {'batch_size': batch_size}
test_kwargs = {'batch_size': len(train_y)}
if use_cuda:
    print("Use GPU for testing...")
    cuda_kwargs = {'num_workers': 1,
                   'pin_memory': True,
                   'shuffle': True}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)
else:
    print("Use CPU to testing...")


train_dataset = TensorDataset(torch.tensor(train_x, dtype=torch.float32),torch.tensor(train_y, dtype=torch.int64))
test_dataset = TensorDataset(torch.tensor(test_x, dtype=torch.float32), torch.tensor(test_y, dtype=torch.int64))
train_loader = DataLoader(train_dataset, **train_kwargs)
test_loader = DataLoader(test_dataset, **test_kwargs)
    
act_funcs = ['lrelu','relu','elu']

for i,a in enumerate(act_funcs):
    model = EEGNet(activation=a).to(device)
    model.load_state_dict(torch.load("models/EEG_"+a+".cpt"))
    print(f"Testing EEGNet with {a}")
    print(f"Final testing accuracy: {test(model,device,test_loader)}")
    print(" ")

for i,a in enumerate(act_funcs):
    model = DeepConvNet(activation=a).to(device)
    model.load_state_dict(torch.load("models/DeepConv_"+a+".cpt"))
    print(f"Testing DeepConvNet with {a}")
    print(f"Final testing accuracy: {test(model,device,test_loader)}")
    print(" ")

(1080, 1, 2, 750) (1080,) (1080, 1, 2, 750) (1080,)
Use GPU for testing...
Testing EEGNet with lrelu
Final testing accuracy: 87.03703703703704
 
Testing EEGNet with relu
Final testing accuracy: 86.66666666666667
 
Testing EEGNet with elu
Final testing accuracy: 82.4074074074074
 
Testing DeepConvNet with lrelu
Final testing accuracy: 81.29629629629629
 
Testing DeepConvNet with relu
Final testing accuracy: 81.01851851851852
 
Testing DeepConvNet with elu
Final testing accuracy: 80.64814814814815
 
