In [1]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import torch
from torch import nn
import torch.nn.functional as F
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms
import pandas as pd
from brainflow.data_filter import DataFilter, FilterTypes, DetrendOperations, WindowOperations

In [2]:
class ToTensor(object):
    def __call__(self, sample):
        window, labels = sample['window'], sample['labels']
        return {'window': torch.tensor(window.values).unsqueeze(0).to(torch.float32), 
                'labels': torch.tensor(labels).to(torch.float32)}#.values)}

class AbsoluteValue(object):
    def __call__(self, sample):
        window, labels = sample['window'], sample['labels']
        return {'window': torch.abs(window), 
                'labels': labels}

class Normalize(object):
    def __call__(self, sample):
        window, labels = sample['window'], sample['labels']
        return {'window': F.normalize(window), 
                'labels': labels}

class MinNormalize(object):
    def __call__(self, sample):
        window, labels = sample['window'], sample['labels']
        min = torch.min(window)
        window = window - min
        window = window / torch.max(window)
        return {'window': (window), 
                'labels': labels}

class EEGDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.csv_file = pd.read_csv(csv_file, delimiter='\t', header=None)
        self.transform = transform
    
    def __len__(self):
        return len(self.csv_file)
    
    def __getitem__(self, idx):
        # if idx >= len(self.csv_file) / 1250:
        #     return

        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        window = self.csv_file.iloc[idx*1250:idx*1250+1250, 7]
        labels = [self.csv_file.iloc[idx*1250, 32]]#:idx*1250+1250, 32] # remove brackets for timestep prediction
        sample = {'window': window, 'labels': labels}

        if self.transform:
            sample = self.transform(sample)
        
        return sample

In [3]:
eeg_dataset = EEGDataset(csv_file='./fullData.csv', transform=transforms.Compose([ToTensor(),
                                                                                  AbsoluteValue(),
                                                                                  MinNormalize()]))

kuba_dataset = EEGDataset(csv_file='./data/kuba.csv', transform=transforms.Compose([ToTensor(),
                                                                                  AbsoluteValue(),
                                                                                  MinNormalize()]))

eeg_dataset = torch.utils.data.Subset(eeg_dataset, range(0, 104))
kuba_dataset = torch.utils.data.Subset(eeg_dataset, range(0, 48))

torch.manual_seed(80)
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(eeg_dataset, [0.6, 0.2, 0.2])

print(len(train_dataset), len(val_dataset), len(test_dataset))

zerocount = 0
onecount = 0

for i in range(len(train_dataset)):
    if train_dataset[i]['labels'][0] == 1:
        onecount += 1
    else:
        zerocount += 1

print(onecount, zerocount)

zerocount = 0
onecount = 0

for i in range(len(val_dataset)):
    if val_dataset[i]['labels'][0] == 1:
        onecount += 1
    else:
        zerocount += 1

print(onecount, zerocount)

zerocount = 0
onecount = 0

for i in range(len(test_dataset)):
    if test_dataset[i]['labels'][0] == 1:
        onecount += 1
    else:
        zerocount += 1

print(onecount, zerocount)

63 21 20
33 30
8 13
11 9


In [None]:
class EEGEyesOpenCloseClassifier(nn.Module):
    def __init__(self):
        super(EEGEyesOpenCloseClassifier, self).__init__()
        self.conv1 = nn.Conv1d(1, 32, 5, stride=5)
        self.conv2 = nn.Conv1d(32, 64, 5, stride=5)
        self.pool1 = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(32, 1)

    def forward(self, x):
        print(x.shape)
        x = self.conv1(x)
        print(x.shape)
        x = F.relu(x)
        x = self.conv2(x)
        print(x.shape)
        x = F.relu(x)
        x = self.pool1(x)
        print(x.shape)
        x = x.squeeze(1)
        x = self.fc1(x)
        print(x.shape)
        x = F.relu(x)
        print(x.shape)
        x = self.fc2(x)
        print(x.shape)
        x = F.sigmoid(x)
        return x


model = EEGEyesOpenCloseClassifier()
model(torch.randn((1250,)).unsqueeze(0))

torch.Size([1250])


RuntimeError: Expected 2D (unbatched) or 3D (batched) input to conv1d, but got input of size: [1250]

In [5]:
optimizer = torch.optim.SGD(model.parameters())
criterion = nn.BCELoss()

In [6]:
# lossi = []

# model.train()
# for epoch in tqdm(range(10000)):
#     optimizer.zero_grad()
#     y_pred = model(train_dataset[0]['window'])

#     # if epoch == 0:
#     #     print(y_pred)

#     loss = criterion(y_pred, train_dataset[0]['labels'])
#     lossi.append(loss.item())

#     loss.backward()
#     optimizer.step()
    
#     # if epoch <= 1:
#     #     for p in model.parameters():
#     #         print(p.grad)
    

# plt.plot(lossi)

# print(lossi)

In [7]:
losses = []
lossi = []
vallosses = []
vallossi = []

In [8]:
minValLoss = 9999

for epoch in tqdm(range(10000)):
    for i in range(len(train_dataset)):
        optimizer.zero_grad()
        model.train()
        y_pred = model(train_dataset[i]['window'])
        # print(y_pred)

        # if epoch == 0:
        #     print(y_pred)

        loss = criterion(y_pred, train_dataset[i]['labels'])
        lossi.append(loss.item())
        loss.backward()
        optimizer.step()

        model.eval()
        for i in range(len(val_dataset)):
            val_pred = model(val_dataset[i]['window'])
            loss = criterion(val_pred, val_dataset[i]['labels'])
            vallossi.append(loss.item())
        
        # if epoch <= 1:
        #     for p in model.parameters():
        #         print(p.grad)
    
    losses.append(np.mean(lossi))
    avgvalloss = np.mean(vallossi)
    if avgvalloss < minValLoss:
        minValLoss = avgvalloss
        torch.save(model.state_dict(), 'bestvalmodel.pt')
    vallosses.append(avgvalloss)
    if epoch % 2 == 0:
        plt.plot(losses, "b")
        plt.plot(vallosses, "g")
        plt.savefig('./valloss.png')
        plt.close()
    lossi = []
    vallossi = []
    

plt.plot(losses)
plt.plot(vallosses)

print(losses)

print(losses[-1])
torch.save(model.state_dict(), 'finaltrainmodel.pt')



 41%|████      | 4108/10000 [20:29<29:23,  3.34it/s]  


KeyboardInterrupt: 

In [None]:
torch.argmin(torch.tensor(vallosses))

In [14]:
modelWeights = torch.load('./bestvalmodel.pt', weights_only=True)
model = EEGEyesOpenCloseClassifier()
model.load_state_dict(modelWeights)
model.eval()
truepreds = 0
falsepreds = 0
truepositives = 0
falsepositives = 0
truenegatives = 0
falsenegatives = 0

with torch.no_grad():
    for i in range(len(test_dataset)):
        if i < 3:
            print(test_dataset[i]['window'])
        y_pred = model(test_dataset[i]['window'])
        # print(y_pred, test_dataset[i]['labels'])
        pred = (y_pred >= 0.5)
        if pred == test_dataset[i]['labels']:
            truepreds += 1
            if pred == 1:
                truepositives += 1
            else:
                truenegatives += 1
        else:
            falsepreds += 1
            if pred == 1:
                falsepositives += 1
            else:
                falsenegatives += 1

print(truepreds / (truepreds + falsepreds))
print(truepositives, truenegatives, falsepositives, falsenegatives)

f1Score = 2 * truepositives / (2 * truepositives + falsepositives + falsenegatives)
print(f1Score)
        

tensor([[0.5527, 0.3384, 0.5840,  ..., 0.2091, 0.4370, 0.3190]])
tensor([[0.7282, 0.5178, 0.8467,  ..., 0.3269, 0.3479, 0.2564]])
tensor([[0.4265, 0.5154, 0.4710,  ..., 0.4580, 0.1503, 0.4008]])
0.8
9 7 2 2
0.8181818181818182


In [None]:
print((eeg_dataset[0]['window']))
#print(torch.min((no_normalize[0]['window'] - torch.min(no_normalize[0]['window']))/(torch.max(no_normalize[0]['window']))))

In [None]:
# torch.save(model, 'oct7model.pt')