In [17]:
import os
# List files in the specified directory
dataset_path = '/scratch/connectome/dyhan316/ECOG_PILOT/data_rearranged/subject-aggregated'
files = os.listdir(dataset_path)
print(files)

['stft', 'raw_timeseries_label.npy', 'raw_timeseries_data.npy', 'normalized_timeseries', 'masks']


In [18]:
import numpy as np
mask_path = os.path.join(dataset_path, 'masks/mask_0.npy')
mask = np.load(mask_path)
print(mask.shape)

(360,)


In [19]:
label_path=os.path.join(dataset_path, 'raw_timeseries_label.npy')
labels=np.load(label_path)
print(labels.shape)

(360,)


In [20]:
data_path=os.path.join(dataset_path, 'normalized_timeseries/whole_trial_denormed_timeseries_data.npy')
data=np.load(data_path)
print(data.shape)

(360, 10000)


In [24]:
import torch
import numpy as np
import os
from torch.utils.data import Dataset, DataLoader
#from torchvision import transforms, utils
import pandas as pd

class MyDataset(Dataset):
    def __init__(self, root_dir, data, label, transform=None):
        self.root_dir = root_dir
        self.data = np.load(os.path.join(root_dir, data))
        self.label = np.load(os.path.join(root_dir, label))
        self.transform = transform
        
    def init_io(self):
        self.info = pd.DataFrame({
                "subject_id": [1],
                "trial_id": [1],
                "duration": [10000],
                "_record_id": ["_record_0"]
            })
        
    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        sample_data = torch.tensor(self.data[idx], dtype=torch.float32)  # Ensure data is float32
        sample_label = torch.tensor(self.label[idx], dtype=torch.float32)  # Ensure label is float32
        
        if self.transform:
            sample_data = self.transform(sample_data)
        
        return sample_data, sample_label



In [25]:
#from torch import float32
from torchvision import transforms

transform = transforms.Compose([
    transforms.Lambda(lambda x: x.to(torch.float32))  # Ensure data is float32
])

dataset = MyDataset(dataset_path,  'normalized_timeseries/whole_trial_denormed_timeseries_data.npy', 'raw_timeseries_label.npy', transform=transform)
dataset.init_io()

In [23]:
data_tensor, label_tensor = dataset[0]
print(f"Data tensor type: {data_tensor.dtype}") #should be float32 
print(f"Label tensor type: {label_tensor.dtype}")

Data tensor type: torch.float32
Label tensor type: torch.float32


In [37]:
from torch.utils.data import Subset

mask_path=os.path.join(dataset_path, 'masks/mask_0.npy')
mask = np.load(mask_path)

# Create train and test indices based on the mask
train_indices = np.where(mask == 0)[0]
test_indices = np.where(mask == 1)[0]

def collate_fn(batch):
    data, label = zip(*batch)
    data = torch.stack([torch.tensor(d, dtype=torch.float32) for d in data]).unsqueeze(1)  # Add electrode dimension
    #data = data.unsqueeze(1) #only for EEGNet
    #data = data[:,:,:,::4]
    data = data.unsqueeze(-1) #only for Conformer ; [32,1,10000] -> [32,1,10000,1]
    label = torch.stack([torch.tensor(l, dtype=torch.long) for l in label])
    return data, label


# Create train and test subsets
train_dataset = Subset(dataset, train_indices)
test_dataset = Subset(dataset, test_indices)

# Create DataLoader for train and test sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Print the sizes of the train and test sets
print(f"Train set size: {len(train_loader.dataset)}")
print(f"Test set size: {len(test_loader.dataset)}")

Train set size: 252
Test set size: 108


In [38]:
#checking the first batch size
for batch in train_loader:
    inputs, targets = batch
    print(f"Inputs shape: {inputs.shape}")
    print(f"Targets shape: {targets.shape}")
    break  # Inspect only the first batch

Inputs shape: torch.Size([32, 1, 10000, 1])
Targets shape: torch.Size([32])


  data = torch.stack([torch.tensor(d, dtype=torch.float32) for d in data]).unsqueeze(1)  # Add electrode dimension
  label = torch.stack([torch.tensor(l, dtype=torch.long) for l in label])


# SELECT MODEL
- EEGNet, Conformer, ... 

In [10]:
# model definition, make sure to start from here when new training is needed
from torcheeg.models import EEGNet

model = EEGNet(chunk_size=2500,
               num_electrodes=1,
               dropout=0.5,
               kernel_1=64,
               kernel_2=16,
               F1=8,
               F2=16,
               D=2,
               num_classes=2)
x, y = next(iter(train_loader))
model(x)

  data = torch.stack([torch.tensor(d, dtype=torch.float32) for d in data]).unsqueeze(1)  # Add electrode dimension
  label = torch.stack([torch.tensor(l, dtype=torch.long) for l in label])


tensor([[ 1.0586, -0.4181],
        [ 0.5104, -0.2978],
        [ 0.1750,  0.4061],
        [ 0.1509, -0.3642],
        [-0.1279,  0.3282],
        [-0.4743, -0.3656],
        [ 0.3800,  0.7158],
        [ 0.3142, -0.1223],
        [ 0.1418,  0.0324],
        [ 0.3105, -0.0989],
        [ 0.1427, -0.7733],
        [-0.0926,  0.5425],
        [ 0.0385, -0.2724],
        [-0.1617,  0.1390],
        [-0.2566, -0.2032],
        [ 0.7706,  0.2829],
        [ 0.3506,  0.0988],
        [ 0.3781, -0.1606],
        [ 0.6032, -0.1246],
        [-0.1617, -0.3197],
        [-0.0868, -0.8267],
        [-1.3394, -0.2059],
        [-0.1805,  0.0027],
        [ 0.2835,  0.2025],
        [ 0.2394, -0.2079],
        [ 0.1609, -0.2330],
        [-0.0209,  0.0512],
        [-0.2421,  0.3475],
        [ 0.4854, -1.0294],
        [-0.0884,  0.3530],
        [ 0.1291, -0.0427],
        [ 0.4217, -0.1320]], grad_fn=<MmBackward0>)

In [42]:
# model definition, make sure to start from here when new training is needed
from torcheeg.models import Conformer

model = Conformer(num_electrodes=1,
                  sampling_rate=501,
                  hid_channels=150,
                  depth=6,
                  heads=10,
                  dropout=0.5,
                  forward_expansion=4,
                  forward_dropout=0.5,
                  num_classes=2)

x, y = next(iter(train_loader))
model(x)

  data = torch.stack([torch.tensor(d, dtype=torch.float32) for d in data]).unsqueeze(1)  # Add electrode dimension
  label = torch.stack([torch.tensor(l, dtype=torch.long) for l in label])


RuntimeError: Calculated padded input size per channel: (10000 x 1). Kernel size: (1 x 25). Kernel size can't be greater than actual input size

In [11]:
# Training the model
import torch

import torch.nn as nn
import torch.optim as optim

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.float(), labels.long()  # Ensure proper types

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

print("Finished Training")

  data = torch.stack([torch.tensor(d, dtype=torch.float32) for d in data]).unsqueeze(1)  # Add electrode dimension
  label = torch.stack([torch.tensor(l, dtype=torch.long) for l in label])


Epoch [1/100], Loss: 0.7409
Epoch [2/100], Loss: 0.6786
Epoch [3/100], Loss: 0.6606
Epoch [4/100], Loss: 0.6333
Epoch [5/100], Loss: 0.6164
Epoch [6/100], Loss: 0.5875
Epoch [7/100], Loss: 0.5636
Epoch [8/100], Loss: 0.5966
Epoch [9/100], Loss: 0.5670
Epoch [10/100], Loss: 0.5413
Epoch [11/100], Loss: 0.5223
Epoch [12/100], Loss: 0.5200
Epoch [13/100], Loss: 0.5221
Epoch [14/100], Loss: 0.4936
Epoch [15/100], Loss: 0.5120
Epoch [16/100], Loss: 0.4839
Epoch [17/100], Loss: 0.5066
Epoch [18/100], Loss: 0.5162
Epoch [19/100], Loss: 0.5030
Epoch [20/100], Loss: 0.4640
Epoch [21/100], Loss: 0.4450
Epoch [22/100], Loss: 0.4775
Epoch [23/100], Loss: 0.4360
Epoch [24/100], Loss: 0.4755
Epoch [25/100], Loss: 0.4700
Epoch [26/100], Loss: 0.4535
Epoch [27/100], Loss: 0.4674
Epoch [28/100], Loss: 0.4753
Epoch [29/100], Loss: 0.4393
Epoch [30/100], Loss: 0.4676
Epoch [31/100], Loss: 0.4593
Epoch [32/100], Loss: 0.4486
Epoch [33/100], Loss: 0.4271
Epoch [34/100], Loss: 0.4883
Epoch [35/100], Loss: 0

In [12]:
import torch
from torch.nn.functional import softmax

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    label_sum=0

    with torch.no_grad():  # No gradients needed for evaluation
        for inputs, labels in test_loader:
            inputs, labels = inputs.float(), labels.long()  # Ensure proper types
            outputs = model(inputs)  # Forward pass
            predictions = torch.argmax(outputs, dim=1)  # Get predicted class
            correct += (predictions == labels).sum().item()  # Count correct predictions
            total += labels.size(0)  # Total samples
            label_sum += labels.sum().item()

    accuracy = correct / total
    print(label_sum/len(test_loader))
    print(labels)
    return accuracy

# Calculate accuracy
accuracy = evaluate_model(model, test_loader)
print(f"Test Accuracy: {accuracy:.2%}")


14.75
tensor([1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1])
Test Accuracy: 39.81%


  data = torch.stack([torch.tensor(d, dtype=torch.float32) for d in data]).unsqueeze(1)  # Add electrode dimension
  label = torch.stack([torch.tensor(l, dtype=torch.long) for l in label])
