In [2]:
from dataloader import read_bci_data
from  torch.utils.data import DataLoader, Dataset, TensorDataset
from torch import tensor
import torch
import torch.nn as nn
import numpy as np

In [3]:
hp = {
    'bs': 64,
    'lr':1e-2,
    'epochs':500,
    'optimizer': 'sgd', # optimizer  
    'NN':'EEGNet',
    'act':'ELU'
}

def hp_2_str(hp):
    return f"{hp['NN']}|{hp['act']}|lr={hp['lr']}|opt={hp['optimizer']}|ep={hp['epochs']}"

In [4]:
def gen_loader(hp):
    batch_size = hp['bs']
    #read data from file
    train_x, train_y, test_x, test_y =  read_bci_data()
    # Dataloader 
    train_dl = DataLoader(TensorDataset(tensor(train_x,dtype=torch.float), tensor(train_y,dtype=torch.long)),         batch_size=batch_size, shuffle= True)
    test_dl = DataLoader(TensorDataset(tensor(test_x,dtype=torch.float), tensor(test_y,dtype=torch.long)),            batch_size=batch_size, shuffle = False)
    return train_dl, test_dl

def get_device():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using {} device".format(device))
    return device

In [5]:
def get_activation(act_name):
    if act_name == 'ELU':
        return nn.ELU()
    elif act_name == 'ReLU':
        return nn.ReLU()
    else:
        return nn.LeakyReLU()

class EEGNet(nn.Module):
    def __init__(self,hp):
        super(EEGNet, self).__init__()
        self.firstConv = nn.Sequential(
            nn.Conv2d(1,16,kernel_size=(1,51), stride=(1,1), padding=(0,25), bias=False),
            nn.BatchNorm2d(16,eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        self.depthwiseConv = nn.Sequential(
            nn.Conv2d(16,32, kernel_size=(2,1), stride=(1,1), groups=16, bias=False),
            nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            get_activation(hp['act']),
            nn.AvgPool2d(kernel_size=(1,4), stride=(1,4), padding=0),
            nn.Dropout(p=0.25)
        )
        self.separableConv = nn.Sequential(
            nn.Conv2d(32,32,kernel_size=(1,15), stride=(1,1), padding=(0,7), bias=False),
            nn.BatchNorm2d(32,eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            get_activation(hp['act']),
            nn.AvgPool2d(kernel_size=(1,8), stride=(1,8),padding=0),
            nn.Dropout(p=0.25),
            nn.Flatten()
        )
        self.classify = nn.Sequential(
            nn.Linear(32*23, 2, bias=False)
        )
    def forward(self,x):
        x = self.firstConv(x)
        x = self.depthwiseConv(x)
        x = self.separableConv(x)
        x = self.classify(x)
        return x


In [6]:
def get_loss(hp):
    return nn.CrossEntropyLoss()

def get_optimizer(hp,model):
    return torch.optim.SGD(model.parameters(),lr=hp['lr'])

def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # turn model into training mode
    model.train()
    epoch_loss = 0.0
    correct = 0.0
    for batch, (X,y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred,y)
        epoch_loss += loss
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        """
        if batch %10 == 0:
            loss, current = loss.item(), batch*len(X)
            print(f" train loss: {loss:>7f} \t\t [{current:>5d}/{size:>5d}]")
        """
    epoch_loss /= num_batches
    correct/=size
    #print(f"epoch loss: {epoch_loss:>7f}")
    return correct, epoch_loss

In [7]:
def test(dataloader, model, loss_fn, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # turn model into eval mode
    model.eval()
    test_loss, correct = 0,0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            #print(y.dtype)
            pred = model(X)
            #print(pred.dtype)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    #print(f'Test Accuracy: {(100 * correct):>0.1f}%, test loss:{test_loss:>8f} \n')
    return correct, test_loss

In [8]:
def train_with_hp(hp,save_result=False):
    
   
    # get dataloader
    train_dl, test_dl = gen_loader(hp)
    # get device
    device = get_device()
    # generate model
    if hp['NN'] == "EEGNet":
        model = EEGNet(hp).to(device)
    else:
        model = DeepCNN(hp).to(device)
    print(model)
    
    loss_fn = get_loss(hp)
    optimizer = get_optimizer(hp,model)

    test_acc_history = []
    test_loss_history = []
    train_acc_history = []
    train_loss_history = []
    
    epochs = hp['epochs']
    for t in range(epochs):
        print(f"Epoch {t+1}  ",end='')
        acc, loss = train(train_dl, model, loss_fn, optimizer,device)
        train_acc_history.append(acc)
        train_loss_history.append(loss)
        print(f"train_acc: {acc:<5.4f}  ",end='')

        acc, loss = test(test_dl, model, loss_fn, device)
        test_acc_history.append(acc)
        test_loss_history.append(loss)
        print(f"test_acc: {acc:<5.4f}  ",end='')

        print('\n',end='')

    # save model parameters
    if save_result:
        pth_name = 'checkpoints/'+hp_2_str(hp)+'model.pth'
        torch.save(model.state_dict(), pth_name)
        print(f"Save PyTorch Model State to {pth_name}")

        npz_name = 'learning_curves/'+hp_2_str(hp)+'curves.npz'
        np.savez_compressed(
            npz_name,
            train_acc=np.array(train_acc_history),
            train_loss=np.array(train_loss_history),
            test_acc_hisory=np.array(test_acc_history),
            test_loss_history=np.array(test_loss_history)
        )
        print(f"Save learning curves to {npz_name}")
    print("Done!")
    history = {
        'train_acc' : np.array(train_acc_history),
        'train_loss' : np.array(train_loss_history),
        'test_acc_hisory' : np.array(test_acc_history),
        'test_loss_history' : np.array(test_loss_history)

    }
    return model, history

In [11]:
hp = {
    'bs': 32,
    'lr':1e-3,
    'epochs':1000,
    'optimizer': 'sgd', # optimizer  
    'NN':'EEGNet',
    'act':'ReLU'
}
model , hist = train_with_hp(hp,save_result=True)

0.8241  test_acc: 0.7491  
Epoch 297  train_acc: 0.8083  test_acc: 0.7481  
Epoch 298  train_acc: 0.8083  test_acc: 0.7500  
Epoch 299  train_acc: 0.8130  test_acc: 0.7519  
Epoch 300  train_acc: 0.8306  test_acc: 0.7481  
Epoch 301  train_acc: 0.8148  test_acc: 0.7500  
Epoch 302  train_acc: 0.8148  test_acc: 0.7528  
Epoch 303  train_acc: 0.8157  test_acc: 0.7519  
Epoch 304  train_acc: 0.8130  test_acc: 0.7500  
Epoch 305  train_acc: 0.8120  test_acc: 0.7509  
Epoch 306  train_acc: 0.8083  test_acc: 0.7528  
Epoch 307  train_acc: 0.8130  test_acc: 0.7491  
Epoch 308  train_acc: 0.8204  test_acc: 0.7556  
Epoch 309  train_acc: 0.8120  test_acc: 0.7519  
Epoch 310  train_acc: 0.8102  test_acc: 0.7537  
Epoch 311  train_acc: 0.8167  test_acc: 0.7491  
Epoch 312  train_acc: 0.8315  test_acc: 0.7481  
Epoch 313  train_acc: 0.8222  test_acc: 0.7528  
Epoch 314  train_acc: 0.8083  test_acc: 0.7491  
Epoch 315  train_acc: 0.8185  test_acc: 0.7481  
Epoch 316  train_acc: 0.8185  test_acc: 0.

In [10]:
hp = {
    'bs': 64,
    'lr':1e-2,
    'epochs':300,
    'optimizer': 'sgd', # optimizer  
    'NN':'EEGNet',
    'act':'ReLU'
}
def read_history(hp):
    

SyntaxError: unexpected EOF while parsing (<ipython-input-10-d87d6fbfe091>, line 10)