In [16]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, Dataset, random_split
import numpy as np
from os.path import join
import mat73
import matplotlib.pyplot as plt
import optuna

In [17]:
# Load data into dictionary
DataPath = join("neuro_data","dataSubj10.mat")
data_dict = mat73.loadmat(DataPath, use_attrdict=True)

RAWDATA = data_dict["data"]
BATCH_SIZE=16
## CROPPING for data, 2.6s-5.6s, region of interest with audio
FS = 512
LO = int(2.6*FS) #1331
HI = int(5.5*FS) #2816
# Dif : 1485 = 3 * 3 * 3 * 5 * 11

In [18]:
class CreateDataset(Dataset):
    """Creates dataset for EEGNET, meaning move all relevant channel data into one matrix for x, and results into y

    Args:
        Dataset (_type_): Takes in data loaded from Matlab and formats appropriately.
    """
    
    def __init__(self, data, channels, crop=None):
        # Associates channel names with channel data in a dictionary
        datadicts = [dict(zip(np.squeeze(data["label"]),dat)) for dat in data["trial"]]

        x, y = [0]*len(data["trialinfo"]), np.array([0]*len(data["trialinfo"]))
        
        # Extract the y-values, i.e. which side the audio was played
        for i, trialinfo in enumerate(data["trialinfo"]):
            # side : left = 1, right = 0
            y[i] = int(trialinfo[0]["side"])==1
        
        self.y = y
        
        # Extract only information from relevant channels
        for i, dat in enumerate(datadicts):
            x[i] = [dat[ch] for ch in channels]
        x = np.array(x)
        
        # Include only specific section of time-series.
        if crop:
            x=x[:,:,crop[0]:crop[1]]
        
        self.x = x

    def __getitem__(self, index):
        feature = torch.tensor([self.x[index]], dtype=torch.float32)
        label = torch.tensor([self.y[index]], dtype=torch.float32)

        return feature, label
    
    def __len__(self):
        return len(self.x)
        

In [19]:
# https://towardsdatascience.com/convolutional-neural-networks-for-eeg-brain-computer-interfaces-9ee9f3dd2b81
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

class EEGNET(nn.Module):
    ''' Dimensions of key layers.
    in:
    [BATCH_SIZE,1,N_CHANNELS,HI-LO]
    temporal:
    [BATCH_SIZE,filter_sizing,N_CHANNELS,HI-LO]
    spatial:
    [BATCH_SIZE,filter_sizing*D,1,HI-LO]
    avgpool1:
    [BATCH_SIZE,filter_sizing*D,1,(HI-LO-mean_pool)/mean_pool + 1]
    avgpool2:
    [BATCH_SIZE,filter_sizing*D,1,floor((floor((HI-LO-mean_pool)/mean_pool + 1)-mean_pool)/mean_pool + 1)]
    '''
    def __init__(self, filter_sizing, dropout, D, channel_amount, receptive_field=512, mean_pool=15):
        super(EEGNET,self).__init__()
        self.temporal=nn.Sequential(
            nn.Conv2d(1,filter_sizing,kernel_size=[1,receptive_field],stride=1, bias=False,\
                padding='same'), 
            nn.BatchNorm2d(filter_sizing),
        )
        self.spatial=nn.Sequential(
            nn.Conv2d(filter_sizing,filter_sizing*D,kernel_size=[channel_amount,1],bias=False,\
                groups=filter_sizing),
            nn.BatchNorm2d(filter_sizing*D),
            nn.ELU(True),
        )

        self.seperable=nn.Sequential(
            nn.Conv2d(filter_sizing*D,filter_sizing*D,kernel_size=[1,16],\
                padding='same',groups=filter_sizing*D, bias=False),
            nn.Conv2d(filter_sizing*D,filter_sizing*D,kernel_size=[1,1], padding='same',groups=1, bias=False),
            nn.BatchNorm2d(filter_sizing*D),
            nn.ELU(True),
        )

        self.avgpool1 = nn.AvgPool2d([1, mean_pool], stride=[1, mean_pool], padding=0)   
        self.avgpool2 = nn.AvgPool2d([1, mean_pool], stride=[1, mean_pool], padding=0)
        self.dropout = nn.Dropout(dropout)
        self.view = nn.Sequential(Flatten())

        # Endsize calculated from dimensions given in documentation for Conv2d, AvgPool2d and Flatten.
        endsize = filter_sizing*D*np.floor((np.floor((HI-LO-mean_pool)/mean_pool + 1)-mean_pool)/mean_pool + 1)
        self.fc2 = nn.Linear(int(endsize), 1)

    def forward(self,x):
        out = self.temporal(x)
        out = self.spatial(out)
        out = self.avgpool1(out)
        out = self.dropout(out)
        out = self.seperable(out)
        out = self.avgpool2(out)
        out = self.dropout(out)
        out = out.view(out.size(0), -1)
        prediction = self.fc2(out)
        return torch.sigmoid(prediction)

In [20]:
def evaluate_loss(model, criterion, dataloader):
    model.eval()
    total_loss = 0.0
    for batch_X, batch_y in dataloader:
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        total_loss += loss.item()
        
    return total_loss / len(dataloader)

In [21]:
def evaluate_acc(model, dataloader):
    model.eval()
    total_acc = 0.0
    for batch_X, batch_y in dataloader:
        outputs = model(batch_X)
        predictions = 1.0*(outputs>0.5)
        total_acc += (predictions==batch_y).sum()
        
    return total_acc / len(dataloader.dataset)

In [22]:
def train(model, criterion, optimizer, train_loader, valid_loader, n_epochs):
    train_loss_list = []
    valid_loss_list = []
    train_acc_list = []
    valid_acc_list = []
    for epoch in range(1, n_epochs):
        model.train()
        for batch_X, batch_y in train_loader:
            ypred = model.forward(batch_X)
            loss = criterion(ypred, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        train_loss = evaluate_loss(model, criterion, train_loader)
        valid_loss = evaluate_loss(model, criterion, valid_loader)
        train_acc = evaluate_acc(model, train_loader)
        valid_acc = evaluate_acc(model, valid_loader)
        train_loss_list.append(train_loss)
        valid_loss_list.append(valid_loss)
        train_acc_list.append(train_acc)
        valid_acc_list.append(valid_acc)

        print(f"| epoch {epoch:2d} | train loss {train_loss:.6f} | train acc {train_acc:.6f} | valid loss {valid_loss:.6f} | valid acc {valid_acc:.6f} |")

    return train_loss_list, valid_loss_list, train_acc_list, valid_acc_list

# Training

In [23]:
# fix random seed
np.random.seed(293210931)
torch.manual_seed(293210931)

channels = ["T7","FT7","TP7","TP8","FT8","T8"]
N_CHANNELS = len(channels)

DATASET = CreateDataset(RAWDATA, channels, [LO, HI])


dat_train, dat_val, dat_test = random_split(DATASET, [0.7,0.1,0.2])

train_loader = DataLoader(dat_train, batch_size=BATCH_SIZE)
val_loader = DataLoader(dat_val, batch_size=BATCH_SIZE)
test_loader = DataLoader(dat_test, batch_size=BATCH_SIZE)

# Set up elements
model = EEGNET(4,0.33,2,6,mean_pool=15)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 8.5e-5)

# Train network
#train(model, criterion, optimizer, train_loader, val_loader, 100) #Should be val instead of test



# Hyperparam optimization Optuna

Here I attempt to optimize hyper parameter choice to maximize accuracy on validation data.

I save some test data that is untouched during hyperparameter tuning, this is used for evaluation.

In [24]:
# Partition into training and test data
dat_hyper, dat_test = random_split(DATASET, [0.8,0.2])
# Test data, cannot be used for training or tuning!
test_loader = DataLoader(dat_test, batch_size=BATCH_SIZE)

In [25]:
def build_model(params):
    fs = params["filter_sizing"]
    do = params["dropout"]
    D = params["D"]
    rf = params["receptive_field"]
    return EEGNET(fs, do, D, N_CHANNELS, receptive_field=rf, mean_pool=15)

def train_and_eval(params, model, n_epochs=2):

    # Load in the data
    dat_train, dat_val = random_split(dat_hyper, [0.8,0.2])
    train_loader = DataLoader(dat_train, batch_size=BATCH_SIZE)
    valid_loader = DataLoader(dat_val, batch_size=BATCH_SIZE)
    
    # Set Loss criterion and extract optimizer
    criterion = nn.BCELoss()
    #optimizer_options = params["optimizer"]
    #optimizer = getattr(torch.optim, optimizer_options)(model.parameters(), lr = params["learning_rate"])
    optimizer = torch.optim.Adam(model.parameters(), lr = params["learning_rate"])

    for epoch in range(1, n_epochs):
        model.train()
        for batch_X, batch_y in train_loader:
            ypred = model.forward(batch_X)
            loss = criterion(ypred, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        train_loss = evaluate_loss(model, criterion, train_loader)
        valid_loss = evaluate_loss(model, criterion, valid_loader)
        train_acc = evaluate_acc(model, train_loader)
        valid_acc = evaluate_acc(model, valid_loader)

        print(f"| epoch {epoch:2d} | train loss {train_loss:.6f} | train acc {train_acc:.6f} | valid loss {valid_loss:.6f} | valid acc {valid_acc:.6f} |")

    return valid_acc

def objective(trial):

    params = {
        "filter_sizing" : trial.suggest_int("filter_sizing", 2, 4), 
        "dropout" : trial.suggest_uniform("dropout", 0.2, 0.8), 
        "D" : trial.suggest_int("Depth Parameter", 1, 4), 
        "receptive_field" : trial.suggest_int("receptive_field", 256, 640, step=64),
        #mean_pool=15  #Potentially add if expression for end size found in EEGNET class
        "learning_rate" : trial.suggest_loguniform("learning_rate", 1e-6, 1e-1)
        #"optimizer" : trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    }

    model = build_model(params)

    accuracy = train_and_eval(params, model)

    return accuracy



In [26]:
# Run Optuna
study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler())
study.optimize(objective, n_trials=2)

[32m[I 2022-11-01 18:23:44,139][0m A new study created in memory with name: no-name-86c1efc9-5dfa-4b43-a9f9-917e380ea63c[0m

suggest_uniform has been deprecated in v3.0.0. This feature will be removed in v6.0.0. See https://github.com/optuna/optuna/releases/tag/v3.0.0. Use :func:`~optuna.trial.Trial.suggest_float` instead.


suggest_loguniform has been deprecated in v3.0.0. This feature will be removed in v6.0.0. See https://github.com/optuna/optuna/releases/tag/v3.0.0. Use :func:`~optuna.trial.Trial.suggest_float` instead.

[32m[I 2022-11-01 18:23:52,032][0m Trial 0 finished with value: 0.5 and parameters: {'filter_sizing': 4, 'dropout': 0.7377226879422212, 'Depth Parameter': 2, 'receptive_field': 576, 'learning_rate': 1.4387039350964286e-06}. Best is trial 0 with value: 0.5.[0m


| epoch  1 | train loss 0.693864 | train acc 0.535398 | valid loss 0.695826 | valid acc 0.500000 |


[32m[I 2022-11-01 18:23:59,248][0m Trial 1 finished with value: 0.4821428656578064 and parameters: {'filter_sizing': 4, 'dropout': 0.5250544582300809, 'Depth Parameter': 3, 'receptive_field': 512, 'learning_rate': 0.00028789113473168466}. Best is trial 0 with value: 0.5.[0m


| epoch  1 | train loss 0.708698 | train acc 0.464602 | valid loss 0.721034 | valid acc 0.482143 |


In [29]:
optuna.visualization.plot_param_importances(study)

ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed