<a href="https://colab.research.google.com/github/Guo-Weiqiang/Master-Project/blob/main/SEED.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
from tqdm import tqdm
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary

import matplotlib.pyplot as plt

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class SeedDataset(Dataset):
    def __init__(self, file_path):
        self.data = np.load(file_path)
        self.labels = [i + 1 for i in [1,0,-1,-1,0,1,-1,0,1,1,0,-1,0,1,-1] ] * 3
        assert self.data.shape[1:] == (1, 62, 37001), "Invalid shape of data"

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # Load the .npy file at the specified index
        sample = self.data[idx]

        # Convert the data to a PyTorch tensor
        sample = torch.from_numpy(sample).float()  # Ensure data is float

        # Return the data and a dummy label or the actual label if you have it
        label = self.labels[idx]

        return sample, label

In [3]:
# train_features, train_labels = next(iter(train_dataloader))
# print(f"Feature batch shape: {train_features.size()}")
# print(f"Labels batch shape: {train_labels.size()}")

In [8]:
class EEGNet_ReLU(torch.nn.Module):
    def __init__(self, n_output):
        super(EEGNet_ReLU, self).__init__()
        self.firstConv = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=(1,51), stride=(1,1), padding=(0,25),bias=False),
            nn.BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        self.depthwiseConv = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=(2,1), stride=(1,1), groups=8,bias=False),
            nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(1,4), stride=(1,4),padding=0),
            nn.Dropout(p=0.35)
        )
        self.separableConv = nn.Sequential(
            nn.Conv2d(32, 32, kernel_size=(1,15), stride=(1,1), padding=(0,7),bias=False),
            nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(1,8), stride=(1,8),padding=0),
            nn.Dropout(p=0.35),
            nn.Flatten()
        )

        self.classify = nn.Sequential(
            nn.Linear(2256512, 256, bias=True),
            nn.ReLU(),
            nn.Linear(256, n_output, bias=True)
        )

    def forward(self, x):
        out = self.firstConv(x)
        out = self.depthwiseConv(out)
        features = self.separableConv(out)
        # print('the shape of features before the classifier is ', features.shape)
        out = self.classify(features)
        return out, features




def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(DEVICE), y.to(DEVICE)

        # Compute prediction and loss
        pred, features = model(X)
        loss = loss_fn(pred, y)
        correct = 0
        loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 1 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            correct /= size
            print(f"Train Error: \n Accuracy: {(100*correct):>0.1f}%, loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(DEVICE), y.to(DEVICE)
            pred, features = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

def main():
    model = EEGNet_ReLU(3)
    model.to(DEVICE)
    summary(model,(1, 62, 37001))

    train_data = SeedDataset('drive/MyDrive/EEGNet/processed_seed_dataset/subject1.npy')
    test_data = SeedDataset('drive/MyDrive/EEGNet/processed_seed_dataset/subject2.npy')

    train_dataloader = DataLoader(train_data, batch_size=5, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=5, shuffle=True)

    # train_features, train_labels = next(iter(train_dataloader))
    # print(f"Feature batch shape: {train_features.size()}")
    # print(f"Labels batch shape: {train_labels.size()}")

    # The CrossEntropy loss function in PyTorch expects the target values to be in the range
    # [0, C-1] where C is the number of classes.
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=1, momentum=0.5, weight_decay=5e-4)

    epochs = 100
    for epoch in tqdm(range(1, epochs + 1)):
        print(f"Epoch {epoch}\n-------------------------------")
        train(train_dataloader, model, loss_fn, optimizer)
        test(test_dataloader, model, loss_fn)
    print("Done!")

In [10]:
main()