<a href="https://colab.research.google.com/github/NoahIslam/ShinozakiLabEEGMLModels/blob/main/LSTM_RawEEGDeliriumClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import glob
import pandas as pd
import os
import datetime
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split, WeightedRandomSampler
import torchvision.models as models
from torchvision import transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import pickle
dtype = torch.float32

In [None]:
with open('/content/drive/MyDrive/data_cropped.pkl', 'rb') as f:
    data = pickle.load(f)

with open('/content/drive/MyDrive/labels_cropped.pkl', 'rb') as f:
    labels = pickle.load(f)

In [None]:
for i in range(len(data)):
  data[i] = data[i][:64001]

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.utils import resample

X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42, stratify=labels)

# Combine the data and labels for easier resampling
combined_train = list(zip(X_train, y_train))
combined_test = list(zip(X_test, y_test))

# Separate classes
class_0_train = [item for item in combined_train if item[1] == 0]
class_1_train = [item for item in combined_train if item[1] == 1]

class_0_test = [item for item in combined_test if item[1] == 0]
class_1_test = [item for item in combined_test if item[1] == 1]

# Upsample the minority (this assumes class_1 is the minority. If not, switch them around.)
class_1_upsampled_train = resample(class_1_train, replace=True, n_samples=len(class_0_train), random_state=42)
class_1_upsampled_test = resample(class_1_test, replace=True, n_samples=len(class_0_test), random_state=42)

# Combine and shuffle
balanced_data_train = class_0_train + class_1_upsampled_train
balanced_data_test = class_0_test + class_1_upsampled_test
np.random.shuffle(balanced_data_train)
np.random.shuffle(balanced_data_test)

X_train_balanced = [item[0] for item in balanced_data_train]
y_train_balanced = [item[1] for item in balanced_data_train]

X_test_balanced = [item[0] for item in balanced_data_test]
y_test_balanced = [item[1] for item in balanced_data_test]

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Convert to PyTorch tensors
X_train_tensors = [torch.tensor(d, dtype=torch.float32).unsqueeze(1) for d in X_train_balanced]  # Adding channel dimension
y_train_tensors = torch.tensor(y_train_balanced, dtype=torch.long)


X_test_tensors = [torch.tensor(d, dtype=torch.float32).unsqueeze(1) for d in X_test_balanced]  # Adding channel dimension
y_test_tensors = torch.tensor(y_test_balanced, dtype=torch.long)

# Create datasets
train_dataset = list(zip(X_train_tensors, y_train_tensors))
test_dataset = list(zip(X_test_tensors, y_test_tensors))

# Create dataloaders
BATCH_SIZE = 16
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=BATCH_SIZE)


In [None]:
class EEG_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(EEG_LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


In [None]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(GRU, self).__init__()

        self.gru_layer = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bias=True,
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, segment_length, no_feature)
        )

        self.out = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        r_out, (h_n, h_c) = self.gru_layer(x.float(), None)
        r_out = F.dropout(r_out, 0.3)
        test_output = self.out(r_out[:, -1, :]) # choose r_out at the last time step
        return test_output

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm_layer = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,         # LSTM hidden unit
            num_layers=num_layers,           # number of LSTM layer
            bias=True,
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, segment_length, no_feature)
        )

        self.out = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        r_out, (h_n, h_c) = self.lstm_layer(x.float(), None)
        r_out = F.dropout(r_out, 0.3)

        test_output = self.out(r_out[:, -1, :]) # choose r_out at the last time step
        return test_output

In [None]:
class TestModel(nn.Module):
  def __init__(self, in_features, num_channels):
    super().__init__()
    self.resnet = models.resnet18(pretrained=True)
    self.resnet.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)

    self.l1 = nn.Sequential(
      nn.Dropout(p=0.2),
      nn.Linear(in_features=1000, out_features=32),
      nn.ReLU(),
      nn.Linear(in_features=32, out_features=1),
      nn.Sigmoid()
    )

  def forward(self, x):
    x = self.resnet(x)
    x = torch.flatten(x, 1)

    return self.l1(x)


In [None]:
def train(model, train_loader, test_loader, criterion, optimizer, num_epochs):
    best_accuracy = 0
    for epoch in range(num_epochs):
        model.train() # Ensure the model is in training mode
        correct_train = 0
        total_train = 0
        for i, (data, labels) in enumerate(train_loader):
            data, labels = data.to(device), labels.to(device)
            print(data.shape)
            data = data.permute(1, 0, 2, 3)
            print(data.shape)

            # Forward pass
            outputs = model(data)
            loss = criterion(outputs, labels.long())

            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Evaluate on validation set
        model.eval()
        with torch.no_grad():
            correct_valid = 0
            total_valid = 0
            for data, labels in test_loader:
                data, labels = data.to(device), labels.to(device)
                data = data.permute(1, 0, 2, 3)

                outputs = model(data)
                _, predicted = torch.max(outputs.data, 1)
                total_valid += labels.size(0)
                correct_valid += (predicted == labels).sum().item()

        val_accuracy = 100 * correct_valid / total_valid
        train_accuracy = 100 * correct_train / total_train
        best_accuracy = max(best_accuracy, val_accuracy)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}, Train Accuracy: {train_accuracy:.2f}%, Validation Accuracy: {val_accuracy:.2f}%")
    return best_accuracy


In [None]:
from pandas.core.groupby.generic import GroupByApply

input_size = 1  # since you have one channel
hidden_size = 64
num_layers = 3
num_classes = 2
num_epochs = 5

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
# model = GRU(input_size, hidden_size, num_layers, num_classes).to(device)
model = TestModel(64001, 1).to(device)
# model = EEG_LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
# model = torcheeg.models.LSTM(num_electrodes=1, hid_channels=hidden_size, num_classes=num_classes).to(device)
# model = torcheeg.models.GRU(num_electrodes=1, hid_channels=hidden_size, num_classes=num_classes).to(device)

lrs = [.01, .001, 1e-4, .1e-5, 1e-6, 1e-7]
accuracies = {}
for lr in lrs:
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    accuracy = train(model, train_loader, test_loader, criterion, optimizer, num_epochs)
    accuracies[lr] = accuracy

In [None]:
print(accuracies)