In [1]:
from __future__ import print_function

import glob
from itertools import chain
import os
import random
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from linformer import Linformer
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm.notebook import tqdm
from util import data_creator
from vit_pytorch.efficient import ViT


In [2]:
inpath='data_in/train/positive'
outpath='data_out/train/positive'
#outpath=os.path.join("data_out", "train")
data_creator(inpath=inpath,outpath=outpath,interval=1250,aug=3,num_file=2)

(1048575, 8)
(1048575, 8)


1

In [3]:
class EEGDataset(Dataset):

    class_to_id = {"positive": 0, "negative": 1}
    
    def __init__(self, data_root, transform=None):
        super().__init__()
        self.data_root = data_root
        self.transform = transform
        self.eeg_files = glob.glob(os.path.join(self.data_root, "**", "*.npy"))
        # print(self.eeg_files)
 
    def __getitem__(self, index):
        eeg_file = self.eeg_files[index]
        eeg_data = np.load(eeg_file)
        label = os.path.basename(os.path.dirname(eeg_file))

        if self.transform is not None:
            eeg_data = self.transform(eeg_data)

        return eeg_data, self.class_to_id[label]
 
    def __len__(self):
        return len(self.eeg_files)

In [5]:
train_transforms = transforms.Compose([
        transforms.ToTensor()
    ])
eeg_dataset = EEGDataset(os.path.join("raw_data_full_chop", "train"), transform=train_transforms)

train_loader = DataLoader(dataset=eeg_dataset, batch_size=1, shuffle=True)



In [6]:
train_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]
)


test_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]
)

In [7]:
data_train = EEGDataset(os.path.join("raw_data_full_chop", "train"), transform=train_transforms)

train_loader = DataLoader(dataset=data_train, batch_size=64, shuffle=True)
data_test= EEGDataset(os.path.join("raw_data_full_chop", "test"), transform=train_transforms)

test_loader = DataLoader(dataset=data_test, batch_size=64, shuffle=True)

In [17]:
batch_size = 64
epochs = 5
lr = 3e-5
gamma = 0.7
seed = 42

In [9]:
efficient_transformer = Linformer(
    dim=128,
    seq_len=49+1,  
    depth=12,
    heads=8,
    k=64
)

In [15]:
def get_device():
    if torch.cuda.is_available():
        device = torch.device('cuda:0')
    else:
        device = torch.device('cpu') # don't have GPU 
    return device
device = get_device()

In [16]:
model = ViT(
    dim=128,
    image_size=224,
    patch_size=32,
    num_classes=2,
    transformer=efficient_transformer,
    channels=3,
).to(device)

In [18]:
# loss function
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = optim.Adam(model.parameters(), lr=lr)
# scheduler
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

In [21]:
for epoch in range(epochs):
    epoch_loss = 0
    epoch_accuracy = 0

    for data, label in train_loader:
        data = data.to(device)
        label = label.to(device)

        output = model(data)
        loss = criterion(output, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        acc = (output.argmax(dim=1) == label).float().mean()
        epoch_accuracy += acc / len(train_loader)
        epoch_loss += loss / len(train_loader)

    with torch.no_grad():
        epoch_val_accuracy = 0
        epoch_val_loss = 0
        for data, label in test_loader:
            data = data.to(device)
            label = label.to(device)

            val_output = model(data)
            val_loss = criterion(val_output, label)

            acc = (val_output.argmax(dim=1) == label).float().mean()
            epoch_val_accuracy += acc / len(test_loader)
            epoch_val_loss += val_loss / len(test_loader)

    print(
        f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
    )


TypeError: Unexpected type <class 'numpy.ndarray'>