<a href="https://colab.research.google.com/github/Kiarashmo/Stroke-Patient-Motor-Imagery-Classification-System-by-Utilizing-Transfer-Learning/blob/main/Customized_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install git+https://github.com/tczhangzhi/torcheeg.git

Collecting git+https://github.com/tczhangzhi/torcheeg.git
  Cloning https://github.com/tczhangzhi/torcheeg.git to /tmp/pip-req-build-bhrv2l5e
  Running command git clone --filter=blob:none --quiet https://github.com/tczhangzhi/torcheeg.git /tmp/pip-req-build-bhrv2l5e
  Resolved https://github.com/tczhangzhi/torcheeg.git to commit b6def3ab295fc9df9fc5dce495634e639def398b
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting lmdb>=1.3.0 (from torcheeg==1.0.11)
  Downloading lmdb-1.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (299 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m299.2/299.2 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting einops>=0.4.1 (from torcheeg==1.0.11)
  Downloading einops-0.6.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mne>=1.0.3 (from torcheeg==1.0.11)
  Downloading mne-1.4.2-py3-none-a

In [None]:
from google.colab import drive
drive.mount('/content/drive')

base_path = '/content/drive/MyDrive/NMA2023_DL/Healthy_Dataset'
data_path = f'{base_path}/mi_channels/'

Mounted at /content/drive


In [None]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
import numpy as np

class EEGDataset(Dataset):
    def __init__(self, data_folder_path: str, subjects: list, shuffle: bool = True):
      """
      `data_folder_path` should have following structure:
        - subject001.npz
        - subject001.npz
        ...

      Each npz file contains both "data" and "labels" for one subject.
      """

      print("----")
      print(f"Creating dataset for {len(subjects)} subjects...")
      data_list = []
      labels_list = []

      for i in subjects:
        data_and_labels = np.load(f"{data_folder_path}/subject{i:03}.npz")

        data = data_and_labels['data']
        data = data.reshape((data.shape[0], 1, *data.shape[1:])) # N C W -> N 1 C W

        labels = data_and_labels['labels']
        data_list.append(data)
        labels_list.append(labels)

      self.data = torch.tensor(np.concatenate(data_list).astype(np.float32))
      self.labels = torch.tensor(np.concatenate(labels_list).astype(np.long))

      if shuffle:
        idx = torch.randperm(self.data.shape[0])
        self.data = self.data[idx]
        self.labels = self.labels[idx]

      print("Dataset created:")
      print("Data shape:", self.data.shape)
      print("Labels shape:", self.labels.shape)
      print("----")


    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]


In [None]:
from torch.utils.data import random_split

NUMBER_OF_SUBJECTS = len([name for name in os.listdir(data_path) if name.endswith("npz")])
print("NUMBER_OF_SUBJECTS:", NUMBER_OF_SUBJECTS)

subjects = range(1, NUMBER_OF_SUBJECTS)

split_size = [0.6, 0.2, 0.2]

splits = random_split(subjects, split_size)
train_subjects = list(splits[0])
val_subjects = list(splits[1])
test_subjects = list(splits[2])

NUMBER_OF_SUBJECTS: 109


In [None]:
train_data = EEGDataset(data_path, train_subjects, shuffle=True)
val_data = EEGDataset(data_path, val_subjects, shuffle=True)
test_data = EEGDataset(data_path, test_subjects, shuffle=True)

----
Creating dataset for 65 subjects...


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.labels = torch.tensor(np.concatenate(labels_list).astype(np.long))


Dataset created:
Data shape: torch.Size([2937, 1, 19, 640])
Labels shape: torch.Size([2937])
----
----
Creating dataset for 22 subjects...
Dataset created:
Data shape: torch.Size([990, 1, 19, 640])
Labels shape: torch.Size([990])
----
----
Creating dataset for 21 subjects...
Dataset created:
Data shape: torch.Size([945, 1, 19, 640])
Labels shape: torch.Size([945])
----


In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 16

train_loader = DataLoader(train_data, batch_size=BATCH_SIZE)
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE)

In [None]:
# @title Set random seed

# @markdown Executing `set_seed(seed=seed)` you are setting the seed

# For DL its critical to set the random seed so that students can have a
# baseline to compare their results to expected results.
# Read more here: https://pytorch.org/docs/stable/notes/randomness.html

# Call `set_seed` function in the exercises to ensure reproducibility.
import random
import torch

def set_seed(seed=None, seed_torch=True):
  """
  Function that controls randomness. NumPy and random modules must be imported.

  Args:
    seed : Integer
      A non-negative integer that defines the random state. Default is `None`.
    seed_torch : Boolean
      If `True` sets the random seed for pytorch tensors, so pytorch module
      must be imported. Default is `True`.

  Returns:
    Nothing.
  """
  if seed is None:
    seed = np.random.choice(2 ** 32)
  random.seed(seed)
  np.random.seed(seed)
  if seed_torch:
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

  print(f'Random seed {seed} has been set.')


# In case that `DataLoader` is used
def seed_worker(worker_id):
  """
  DataLoader will reseed workers following randomness in
  multi-process data loading algorithm.

  Args:
    worker_id: integer
      ID of subprocess to seed. 0 means that
      the data will be loaded in the main process
      Refer: https://pytorch.org/docs/stable/data.html#data-loading-randomness for more details

  Returns:
    Nothing
  """
  worker_seed = torch.initial_seed() % 2**32
  np.random.seed(worker_seed)
  random.seed(worker_seed)

In [None]:
SEED = 0
set_seed(seed=SEED)

Random seed 0 has been set.


#Model Costumization

In [None]:
import torch.nn as nn
from torcheeg.models import EEGNet

class MyEEGNet(EEGNet):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.FC3 = nn.Linear(80, 32)
        self.relu3 = nn.ReLU()
        self.FC4 = nn.Linear(32, 2)
        self.soft = nn.Softmax()

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.FC3(x)
        x = self.relu3(x)
        x = self.FC4(x)
        x = self.soft(x)
        return x

In [None]:
from torch.types import Device
NUM_ELECTRODES = 19
NUM_CLASSES = 2
DATA_POINTS = 640

model = MyEEGNet(chunk_size=DATA_POINTS, # nu/mber of time points
      num_electrodes=NUM_ELECTRODES, # number of channels
      dropout=0.5,
      kernel_1=150,
      kernel_2=19,
      F1=8, # number of temporal filters
      F2=40, # number of pointwise filters
      D=40, # depth multiplier (number of spatial filters)
      num_classes=NUM_CLASSES)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

In [None]:
n_epochs = 100
for epoch in range(n_epochs):
    for i, (x, y) in enumerate(train_loader):

        # Forward pass
        y_pred = model(x)
        loss = criterion(y_pred, y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}/{n_epochs}: Loss = {loss.item()}')

RuntimeError: ignored