In [None]:
import os
import torch
import numpy as np
from torch import nn
from torch.utils.data import DataLoader
from dataset import EEGDataset
from torch.utils.data import random_split

In [None]:
# load in the dataset

raw_data_dir = '//uni.au.dk/dfs/Tech_EarEEG/Students/RD2022_Artefact_AkselStark/data/1A/study_1A_mat_simple'

ds1 = EEGDataset(raw_data_dir,4, 250) #Instantiate a dataset using the directory of data, amount of night to include and amount of samples in a segment

In [41]:
#Calculate class imbalance
balancing_dataloader = DataLoader(ds1, drop_last = True) # Drop_last, to avoid incomplete batches, which won't work with weighted loss
artefacts = 0
good_samples = 0
for batch, (X, y) in enumerate(balancing_dataloader):
    if y == 1:
        artefacts += 1
    else:
        good_samples += 1
    if batch > 100000:
        break

class_ratio = int(good_samples/artefacts)



In [48]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' #Check for cuda 
print(f'Using {device} device')

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        #self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.BatchNorm1d(250),
            nn.Linear(250, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        #x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

Using cuda device


In [49]:
learning_rate = 1e-2
batch_size = 64
epochs = 5

model = NeuralNetwork()

#Weighting artefacts 10x
#loss = nn.BCELoss(weight = torch.tensor([0.1,0.9]))

#weights = [1,10]
#class_weights = torch.FloatTensor(weights)#.cuda()

#loss = nn.CrossEntropyLoss(weight=class_weights) # Using cross entropy and not binary cross entropy because weight implementation is better :/
#loss = nn.CrossEntropyLoss(class_weights)

#pos_weight: amount of positive examples compared to negative examples. Calculate as: negative_examples/positive_examples
loss = nn.BCEWithLogitsLoss(pos_weight = class_ratio*torch.ones([batch_size])) 

optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [50]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X).reshape(-1)# Reshape to 1 dimension if using binary classification, otherwise keep dimensions from model output
        yFloat = y.type(torch.FloatTensor)

        try:
            loss = loss_fn(pred, yFloat)
        except:
            print("loss function failed")
            break


        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 1000 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            print("Predicted values:")
            print(pred)
            print("Actual values:")
            print(yFloat)



def test_loop(dataloader_test, model, loss_fn, test_set = True):
    size = len(dataloader_test.dataset)
    num_batches = len(dataloader_test)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader_test:
            pred = model(X).reshape(-1) # Reshape to 1 dimension if using binary classification, otherwise keep dimensions from model output
            test_loss += loss_fn(pred, y.type(torch.FloatTensor)).item()
            correct += (pred.round() == y).type(torch.float).sum().item()
            

    test_loss /= num_batches
    correct /= size
    if test_set:
        print(f"Test set Error: \n Test Set Accuracy: {(100*correct):>0.5f}%, Avg Test Set loss: {test_loss:>8f} \n")
    else:
        print(f"Training Set Error: \n Training Set Accuracy: {(100*correct):>0.5f}%, Avg Training Set loss: {test_loss:>8f} \n")


In [51]:
# Split data into train and test data
trainSamples = int(ds1.__len__()*0.7)
testSamples = int(ds1.__len__() - trainSamples)
training_data, test_data = random_split(ds1, (trainSamples,testSamples), generator=torch.Generator().manual_seed(42))

train_dataloader = DataLoader(training_data, batch_size=64, drop_last = True) # Drop_last, to avoid incomplete batches, which won't work with weighted loss
test_dataloader = DataLoader(test_data, batch_size=64, drop_last = True) # Drop_last, to avoid incomplete batches, which won't work with weighted loss

In [52]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    
    train_loop(train_dataloader, model, loss, optimizer)    
    test_loop(test_dataloader, model, loss)
    test_loop(train_dataloader, model, loss, test_set = False)

print("Done!")

Epoch 1
-------------------------------
loss: 0.973649  [    0/1906271]
Predicted values:
tensor([0.5015, 0.4999, 0.4972, 0.4983, 0.4975, 0.4999, 0.4992, 0.4973, 0.4964,
        0.4971, 0.4978, 0.5004, 0.4987, 0.4983, 0.4986, 0.4985, 0.5003, 0.4971,
        0.5018, 0.4980, 0.4976, 0.4988, 0.5015, 0.5015, 0.5006, 0.4987, 0.5007,
        0.5004, 0.5016, 0.5015, 0.4999, 0.5020, 0.5009, 0.5003, 0.4980, 0.4969,
        0.4992, 0.4985, 0.4983, 0.5005, 0.4988, 0.4996, 0.4988, 0.4987, 0.5016,
        0.5013, 0.4998, 0.4974, 0.4982, 0.5017, 0.4987, 0.4983, 0.5010, 0.5003,
        0.4965, 0.5004, 0.4970, 0.4970, 0.4999, 0.4999, 0.5002, 0.5004, 0.5015,
        0.4980], grad_fn=<ReshapeAliasBackward0>)
Actual values:
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
loss

KeyboardInterrupt: 

In [None]:
print('debug')

debug
