In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.nn.functional as F
import os
import scipy
import random
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader as DL
from torch.utils.data import TensorDataset as TData
from tqdm import tqdm
import re
from sklearn.model_selection import train_test_split as tts
import pickle

import zipfile
import pandas as pd
from sklearn.preprocessing import LabelEncoder

In [2]:
!unzip "LHNT_EEG.zip"

Archive:  LHNT_EEG.zip
   creating: LHNT EEG/
   creating: LHNT EEG/Nandini_Senthilkumar_Session6/
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_2.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_20.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/left_9.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/left_1.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_16.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/left_7.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/left_11.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_14.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_10.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_4.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/left_19.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/left_3.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_Session6/right_8.pkl  
  inflating: LHNT EEG/Nandini_Senthilkumar_

In [3]:
def getAllPickles(directory="LHNT EEG"):
    folders = [drctry for drctry in os.listdir(directory) if os.path.isdir(os.path.join(directory, drctry))]
    files = []
    for folder in folders:
        folder_files = os.listdir(os.path.join(directory, folder))
        for file in folder_files:
            if ".pkl" in file:
                files.append(os.path.join(directory, folder, file))
    return files

def npFromPickle(pickle_files):
    np_data = []
    labels = [] # 0 is left, 1 is right
    for file in pickle_files:
        with open(file, "rb") as f:
            data1 = pickle.load(f)
            np_data.append(data1[0])
        if 'right' in file.split('/')[-1]:
            labels.append(1)
        else:
            labels.append(0)
    return np_data, labels

np_data, labels = npFromPickle(getAllPickles())
print(len(np_data), len(labels))

380 380


In [4]:
# applying a bandpass filter
def bandpass_filter(signal, crit_freq = [1, 40], sampling_freq = 125, plot = False, channel = 0):
  order = 4

  b, a = scipy.signal.butter(order, crit_freq, btype = 'bandpass', fs = sampling_freq)
  processed_signal = scipy.signal.filtfilt(b, a, signal, 1)

  if plot == True:
    plt.figure()
    plt.xlabel('Time')
    plt.ylabel(f'Normalized amplitude of channel {channel}')
    plt.title(f'{crit_freq[0]}-{crit_freq[1]}Hz bandpass filter')
    signal_min = np.full((signal.shape[1], signal.shape[0]), np.min(signal, 1)).transpose()
    signal_max = np.full((signal.shape[1], signal.shape[0]), np.max(signal, 1)).transpose()
    normed_signal = (signal - signal_min) / (signal_max - signal_min)
    filtered_min = np.full((processed_signal.shape[1], processed_signal.shape[0]), np.min(processed_signal, 1)).transpose()
    filtered_max = np.full((processed_signal.shape[1], processed_signal.shape[0]), np.max(processed_signal, 1)).transpose()
    normed_filt = (processed_signal - filtered_min) / (filtered_max - filtered_min)
    plt.plot(np.arange(normed_signal[channel].size), normed_signal[channel], label = 'Input')
    plt.plot(np.arange(normed_filt[channel].size), normed_filt[channel], label = 'Transformed')
    plt.legend()

  return processed_signal


# function to segment eeg data based on sampling freq(Hz), window_size(s), and window_shift(s)
def segmentation(signal, sampling_freq=125, window_size=1, window_shift=0.016):
  w_size = int(sampling_freq * window_size)
  w_shift = int(sampling_freq * window_shift)
  segments = []
  i = 0
  while i + w_size <= signal.shape[1]:
    segments.append(signal[:, i: i + w_size])
    i += w_shift
  return segments

def channel_rearrangment(sig, channel_order):
    channel_order = [channel - 1 for channel in channel_order]
    reindexed = np.zeros_like(sig)
    for i, ind in enumerate(channel_order):
        reindexed[i] = sig[ind]
    return reindexed

ordered_channels = [1, 9, 11, 3, 2, 12, 10, 4, 13, 5, 15, 7, 14, 16, 6, 8]

In [5]:
train_x, test_x, train_y, test_y = tts(np_data, labels, test_size = 0.25)
val_x, test_x = test_x[:len(test_x)//2], test_x[len(test_x)//2:]
val_y, test_y = test_y[:len(test_y)//2], test_y[len(test_y)//2:]

In [6]:
# applying all preprocessing steps to create train and test data
train_eeg = []
train_labels = []
valid_eeg = []
valid_labels = []
test_eeg = []
test_labels = []
for sig, label in zip(train_x, train_y):
  if sig.shape[1] == 0: # excluding empty sample elements
    #print(name)
    continue
  reindexed_signal = channel_rearrangment(sig, ordered_channels)
  filtered_sig = bandpass_filter(reindexed_signal, [5, 40], 125) # bandpass filter
  normed_sig = (filtered_sig - np.mean(filtered_sig, 1, keepdims=True)) / np.std(filtered_sig, 1, keepdims=True) # standard scaling
  if np.isnan(normed_sig).any(): # excluding sample elements with nans
    print("nan")
    continue
  signals = segmentation(normed_sig, 125, window_size = 1.5, window_shift = 0.0175) # segmentation
  labels = [label] * len(signals)
  train_eeg.extend(signals)
  train_labels.extend(labels)

for sig, label in zip(val_x, val_y):
  if sig.shape[1] == 0: # excluding empty sample elements
    #print(name)
    continue
  reindexed_signal = channel_rearrangment(sig, ordered_channels)
  filtered_sig = bandpass_filter(reindexed_signal, [5, 40], 125) # bandpass filter
  normed_sig = (filtered_sig - np.mean(filtered_sig, 1, keepdims=True)) / np.std(filtered_sig, 1, keepdims=True) # standard scaling
  if np.isnan(normed_sig).any(): # excluding sample elements with nans
    print("nan")
    continue
  signals = segmentation(normed_sig, 125, window_size = 1.5, window_shift = 0.0175) # segmentation
  labels = [label] * len(signals)
  valid_eeg.extend(signals)
  valid_labels.extend(labels)

for sig, label in zip(test_x, test_y):
  if sig.shape[1] == 0: # excluding empty sample elements
    #print(name)
    continue
  reindexed_signal = channel_rearrangment(sig, ordered_channels)
  filtered_sig = bandpass_filter(reindexed_signal, [5, 40], 125) # bandpass filter
  normed_sig = (filtered_sig - np.mean(filtered_sig, 1, keepdims=True)) / np.std(filtered_sig, 1, keepdims=True) # standard scaling
  if np.isnan(normed_sig).any(): # excluding sample elements with nans
    print("nan")
    continue
  signals = segmentation(normed_sig, 125, window_size = 1.5, window_shift = 0.0175) # segmentation, changed to 125
  labels = [label] * len(signals)
  test_eeg.extend(signals)
  test_labels.extend(labels)

In [7]:
columns_to_remove = [1, 2, 7, 8]
modified_train_eeg = []
modified_valid_eeg = []
modified_test_eeg = []
for array in train_eeg:
  modified_array = np.delete(array, columns_to_remove, axis=1)
  modified_train_eeg.append(modified_array)
for array in valid_eeg:
  modified_array = np.delete(array, columns_to_remove, axis=1)
  modified_valid_eeg.append(modified_array)
for array in test_eeg:
  modified_array = np.delete(array, columns_to_remove, axis=1)
  modified_test_eeg.append(modified_array)
train_eeg = modified_train_eeg
valid_eeg = modified_valid_eeg
test_eeg = modified_test_eeg

train_eeg_tensor = torch.zeros((len(train_eeg), train_eeg[0].shape[0], train_eeg[0].shape[1])) # untransposed dimensions 1 and 2
valid_eeg_tensor = torch.zeros((len(valid_eeg), valid_eeg[0].shape[0], valid_eeg[0].shape[1]))
test_eeg_tensor = torch.zeros((len(test_eeg), test_eeg[0].shape[0], test_eeg[0].shape[1]))
for i in range(len(train_eeg)):
  tens = torch.from_numpy(train_eeg[i].copy()) # no longer transposing before conversion to tensor
  train_eeg_tensor[i] = tens
for i in range(len(valid_eeg)):
  tens = torch.from_numpy(valid_eeg[i].copy())
  valid_eeg_tensor[i] = tens
for i in range(len(test_eeg)):
  tens = torch.from_numpy(test_eeg[i].copy())
  test_eeg_tensor[i] = tens
train_label_tensor = torch.zeros(len(train_labels), 2)
valid_label_tensor = torch.zeros(len(valid_labels), 2)
test_label_tensor = torch.zeros(len(test_labels), 2)
for i, val in enumerate(train_labels):
  train_label_tensor[i][val] = 1
for i, val in enumerate(valid_labels):
  valid_label_tensor[i][val] = 1
for i, val in enumerate(test_labels):
  test_label_tensor[i][val] = 1

train_ds = TData(train_eeg_tensor, train_label_tensor)
valid_ds = TData(valid_eeg_tensor, valid_label_tensor)
test_ds = TData(test_eeg_tensor, test_label_tensor)
train_dl = DL(train_ds, batch_size=64, shuffle= True, drop_last = True)
valid_dl = DL(valid_ds, batch_size=64, shuffle= True, drop_last = True)
test_dl = DL(test_ds, batch_size=64, shuffle = True, drop_last = True)

In [8]:
print(len(train_dl), len(valid_dl), len(test_dl))

1741 294 286


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import time
import numpy as np
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold
from sklearn.cluster import KMeans

# -----------------------------
# Fixed Gamma RBF Layer with Trainable Weight Parameter
# -----------------------------
class FixedGammaRBFLayer(nn.Module):
    def __init__(self, in_features, out_features, gamma=0.1):
        super(FixedGammaRBFLayer, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.gamma = gamma  # You can tune this value if needed.
        self.centers = nn.Parameter(torch.Tensor(out_features, in_features))
        self.weights = nn.Parameter(torch.ones(out_features))  # Trainable weight per RBF unit.
        self.reset_parameters()

    def reset_parameters(self):
        nn.init.xavier_uniform_(self.centers)
        nn.init.constant_(self.weights, 1.0)

    def forward(self, x):
        # x: (batch, in_features)
        x_expanded = x.unsqueeze(1)                 # (batch, 1, in_features)
        centers_expanded = self.centers.unsqueeze(0)  # (1, out_features, in_features)
        diff = x_expanded - centers_expanded
        dist_sq = torch.sum(diff ** 2, dim=2)         # (batch, out_features)
        # Compute RBF activations and scale by trainable weights.
        rbf_out = torch.exp(-self.gamma * dist_sq)    # (batch, out_features)
        rbf_out = rbf_out * self.weights              # Elementwise multiplication.
        return rbf_out

# -----------------------------
# Pure RBF Network Model
# -----------------------------
class RBFNetwork(nn.Module):
    def __init__(self, input_dim, num_rbf_units, num_classes,
                 gamma=0.1, hidden_dim=512, dropout_prob=0.5):
        super(RBFNetwork, self).__init__()
        # RBF layer: transforms the flattened input.
        self.rbf = FixedGammaRBFLayer(input_dim, num_rbf_units, gamma=gamma)
        self.dropout = nn.Dropout(dropout_prob)
        # Classifier: maps RBF activations to class scores.
        self.fc = nn.Sequential(
            nn.Linear(num_rbf_units, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout_prob),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        # x: (batch, channels, sequence_length)
        x = x.view(x.size(0), -1)  # Flatten the input.
        x_rbf = self.rbf(x)        # (batch, num_rbf_units)
        out = self.fc(x_rbf)       # (batch, num_classes)
        return out

# -----------------------------
# (Optional) Initialize RBF Centers via KMeans on Training Data
# -----------------------------
def initialize_rbf_centers(model, data_loader, num_samples=1000):
    model.eval()
    all_features = []
    total_samples = 0
    for features, _ in data_loader:
        features = features.to(device)
        # Flatten input to match the RBF layer's expected input dimension.
        features = features.view(features.size(0), -1)
        with torch.no_grad():
            all_features.append(features.cpu())
        total_samples += features.size(0)
        if total_samples >= num_samples:
            break
    all_features = torch.cat(all_features, dim=0)
    kmeans = KMeans(n_clusters=model.rbf.out_features, random_state=0)
    kmeans.fit(all_features.numpy())
    centers = torch.tensor(kmeans.cluster_centers_, dtype=torch.float32, device=device)
    model.rbf.centers.data.copy_(centers)
    print("RBF centers initialized using KMeans on input features.")

# -----------------------------
# Training and Evaluation Functions
# -----------------------------
def train_model(model, train_loader, valid_loader, criterion, optimizer, num_epochs=20):
    print("Starting training...")
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        start_time = time.time()
        for features, labels in train_loader:
            features = features.to(device)
            # Convert one-hot labels to class indices.
            labels = labels.argmax(dim=1).to(device)
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        # Validation phase.
        model.eval()
        valid_loss = 0.0
        with torch.no_grad():
            for features, labels in valid_loader:
                features = features.to(device)
                labels = labels.argmax(dim=1).to(device)
                outputs = model(features)
                loss = criterion(outputs, labels)
                valid_loss += loss.item()
        elapsed = time.time() - start_time
        print(f"Epoch {epoch+1}/{num_epochs} | Time: {elapsed:.2f}s | "
              f"Train Loss: {train_loss/len(train_loader):.4f} | "
              f"Val Loss: {valid_loss/len(valid_loader):.4f}")

def evaluate_model(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for features, labels in data_loader:
            features = features.to(device)
            labels = labels.argmax(dim=1).to(device)
            outputs = model(features)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f"Accuracy: {accuracy:.2f}%")
    return accuracy

# -----------------------------
# K-Fold Cross Validation on EEG Training Data
# -----------------------------
# Assumes you have already created `train_ds` (a TensorDataset with EEG tensors and one-hot labels)
# For example, from your preprocessing steps:
#    train_ds = TData(train_eeg_tensor, train_label_tensor)
#
# Set number of folds.
k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
fold_accuracies = {}

# Use the device already defined (e.g., cuda if available).
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Determine the input dimension and number of classes from one sample.
sample_feature, sample_label = train_ds[0]
input_dim = sample_feature.numel()  # Flattened dimension (channels * sequence_length)
num_classes = sample_label.shape[0]

for fold, (train_idx, val_idx) in enumerate(kfold.split(train_ds)):
    print(f"\n--- Fold {fold+1} ---")
    # Create train and validation subsets.
    train_subset = Subset(train_ds, train_idx)
    val_subset = Subset(train_ds, val_idx)
    train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=64, shuffle=False)

    # Initialize a new instance of the RBF network for this fold.
    model_rbf = RBFNetwork(input_dim=input_dim,
                           num_rbf_units=256,
                           num_classes=num_classes,
                           gamma=0.1,
                           hidden_dim=512,
                           dropout_prob=0.5).to(device)

    # Optionally initialize the RBF centers using KMeans on the training fold.
    initialize_rbf_centers(model_rbf, train_loader, num_samples=500)

    # Set up optimizer and loss function.
    optimizer = optim.Adam(model_rbf.parameters(), lr=0.001, weight_decay=1e-4)
    criterion = nn.CrossEntropyLoss()

    # Train the model for this fold.
    train_model(model_rbf, train_loader, val_loader, criterion, optimizer, num_epochs=20)

    # Evaluate on the validation fold.
    fold_accuracy = evaluate_model(model_rbf, val_loader)
    fold_accuracies[fold] = fold_accuracy

# Display k-fold results.
print("\nK-Fold Cross Validation results:")
for fold in fold_accuracies:
    print(f"Fold {fold+1}: {fold_accuracies[fold]:.2f}%")
avg_accuracy = np.mean(list(fold_accuracies.values()))
print(f"Average Accuracy over {k_folds} folds: {avg_accuracy:.2f}%")

# (Optional) After cross-validation, you can train on the entire train_ds
# and then evaluate on valid_ds or test_ds as desired.


Using device: cuda

--- Fold 1 ---
RBF centers initialized using KMeans on input features.
Starting training...
Epoch 1/20 | Time: 19.72s | Train Loss: 0.6932 | Val Loss: 0.6929
Epoch 2/20 | Time: 19.75s | Train Loss: 0.6930 | Val Loss: 0.6928
Epoch 3/20 | Time: 19.81s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 4/20 | Time: 19.72s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 5/20 | Time: 19.80s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 6/20 | Time: 19.84s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 7/20 | Time: 19.72s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 8/20 | Time: 19.78s | Train Loss: 0.6929 | Val Loss: 0.6929
Epoch 9/20 | Time: 19.74s | Train Loss: 0.6929 | Val Loss: 0.6929
Epoch 10/20 | Time: 19.74s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 11/20 | Time: 19.77s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 12/20 | Time: 19.69s | Train Loss: 0.6929 | Val Loss: 0.6929
Epoch 13/20 | Time: 19.77s | Train Loss: 0.6929 | Val Loss: 0.6928
Epoch 14/20 | Time: 19.87s