# Importing the Data


In [1]:
import numpy as np
import pandas as pd
import torch
import scipy
from torch.utils.data import TensorDataset as TData
from torch.utils.data import DataLoader as DL
import matplotlib.pyplot as plt
import pickle
import os

import torch
import torch.nn as nn
import torch.optim as optim

#cuda allows developers to use GPU for computing
#
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

root = '/content/drive/MyDrive/LHNT_pickles/'

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Data Preprocessing


In [3]:
# Base folder containing the files ( took out end if we need to go back and add it)
# right_session_folder = os.path.join(root, "alan_f_right/session_1")

# Initialize a dictionary to store the loaded signals
right_eeg_data = {}

# Iterate through files labeled in increments of two
for j in range(1, 5):
  right_session_folder = os.path.join(root, f"alan_f_right/session_{j}")
  for i in range(2, 21, 2):  # Adjust the range as needed
      file_path = os.path.join(right_session_folder, f"right_{i}.pkl")

      # Check if the file exists
      if os.path.exists(file_path):
          with open(file_path, "rb") as file:
              data = pickle.load(file)
              right_eeg_data[f"right_{j}_{i}"] = data  # Store the data with the corresponding label
      else:
          print(f"File not found: {file_path}")
          break  # Stop if the file sequence ends


In [4]:
# Base folder containing the files
# left_session_folder = os.path.join(root, "alan_f_left/session_1")

# Initialize a dictionary to store the loaded signals
left_eeg_data = {}

for j in range(1, 5):
  left_session_folder = os.path.join(root, f"alan_f_left/session_{j}")
  # Iterate through files labeled in increments of two
  for i in range(1, 20, 2):  # Adjust the range as needed
      file_path = os.path.join(left_session_folder, f"left_{i}.pkl")

      # Check if the file exists
      if os.path.exists(file_path):
          with open(file_path, "rb") as file:
              data = pickle.load(file)
              left_eeg_data[f"left_{j}_{i}"] = data  # Store the data with the corresponding label
      else:
          print(f"File not found: {file_path}")
          break  # Stop if the file sequence ends




In [7]:
right_signal_data = {label: signal[0] for label, signal in right_eeg_data.items()}
right_metadata = {label: signal[1] for label, signal in right_eeg_data.items()}

In [8]:
left_signal_data = {label: signal[0] for label, signal in left_eeg_data.items()}
metadata = {label: signal[1] for label, signal in left_eeg_data.items()}


In [9]:
from scipy.signal import butter, filtfilt
from sklearn.preprocessing import StandardScaler

def baseline_correction(signal):
    """
    Removes baseline offset by subtracting the mean of each channel.

    Parameters:
        signal (np.ndarray): EEG signal with shape (channels, samples).

    Returns:
        np.ndarray: Baseline-corrected signal.
    """
    return signal - np.mean(signal, axis=1, keepdims=True)


def bandpass_filter(signal, lowcut=13, highcut=30, fs=125, order=4):
    """
    Band-pass filters the signal for the specified frequency range.

    Parameters:
        signal (np.ndarray): EEG signal with shape (channels, samples).
        lowcut (float): Lower cutoff frequency (Hz).
        highcut (float): Upper cutoff frequency (Hz).
        fs (float): Sampling rate (Hz).
        order (int): Order of the filter.

    Returns:
        np.ndarray: Band-pass filtered signal.
    """
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype="band")
    return filtfilt(b, a, signal, axis=1)

def normalize_signal(signal):
    """
    Normalizes the signal for each channel (z-score normalization).

    Parameters:
        signal (np.ndarray): EEG signal with shape (channels, samples).

    Returns:
        np.ndarray: Normalized signal.
    """
    scaler = StandardScaler()
    return scaler.fit_transform(signal.T).T  # Transpose to normalize each channel

def preprocess_eeg(signal, fs=125):
    """
    Preprocess EEG signal with baseline correction, band-pass filtering, and normalization.

    Parameters:
        signal (np.ndarray): EEG signal with shape (channels, samples).
        fs (float): Sampling rate (Hz).

    Returns:
        np.ndarray: Preprocessed EEG signal.
    """
    # Step 1: Baseline Correction
    signal_corrected = baseline_correction(signal)

    # Step 2: Band-Pass Filter (Beta Frequencies)
    signal_filtered = bandpass_filter(signal_corrected, lowcut=4, highcut=40, fs=fs)

    # Step 3: Normalization
    signal_normalized = normalize_signal(signal_filtered)

    return signal_normalized


In [10]:
# Preprocess each signal in the dictionary
right_preprocessed_signals = {label: preprocess_eeg(signal, fs=125) for label, signal in right_signal_data.items()}

# Inspect the shapes of the processed signals
for label, signal in right_preprocessed_signals.items():
    print(f"{label}: RIGHT Processed Signal Shape = {signal.shape}")


right_1_2: RIGHT Processed Signal Shape = (16, 875)
right_1_4: RIGHT Processed Signal Shape = (16, 875)
right_1_6: RIGHT Processed Signal Shape = (16, 875)
right_1_8: RIGHT Processed Signal Shape = (16, 875)
right_1_10: RIGHT Processed Signal Shape = (16, 875)
right_1_12: RIGHT Processed Signal Shape = (16, 875)
right_1_14: RIGHT Processed Signal Shape = (16, 875)
right_1_16: RIGHT Processed Signal Shape = (16, 875)
right_1_18: RIGHT Processed Signal Shape = (16, 875)
right_1_20: RIGHT Processed Signal Shape = (16, 875)
right_2_2: RIGHT Processed Signal Shape = (16, 875)
right_2_4: RIGHT Processed Signal Shape = (16, 875)
right_2_6: RIGHT Processed Signal Shape = (16, 875)
right_2_8: RIGHT Processed Signal Shape = (16, 875)
right_2_10: RIGHT Processed Signal Shape = (16, 875)
right_2_12: RIGHT Processed Signal Shape = (16, 875)
right_2_14: RIGHT Processed Signal Shape = (16, 875)
right_2_16: RIGHT Processed Signal Shape = (16, 875)
right_2_18: RIGHT Processed Signal Shape = (16, 875)
r

In [11]:
# Preprocess each signal in the dictionary
left_preprocessed_signals = {label: preprocess_eeg(signal, fs=125) for label, signal in left_signal_data.items()}

# Inspect the shapes of the processed signals
for label, signal in left_preprocessed_signals.items():
    print(f"{label}: LEFT Processed Signal Shape = {signal.shape}")


left_1_1: LEFT Processed Signal Shape = (16, 875)
left_1_3: LEFT Processed Signal Shape = (16, 875)
left_1_5: LEFT Processed Signal Shape = (16, 875)
left_1_7: LEFT Processed Signal Shape = (16, 875)
left_1_9: LEFT Processed Signal Shape = (16, 875)
left_1_11: LEFT Processed Signal Shape = (16, 875)
left_1_13: LEFT Processed Signal Shape = (16, 875)
left_1_15: LEFT Processed Signal Shape = (16, 875)
left_1_17: LEFT Processed Signal Shape = (16, 875)
left_1_19: LEFT Processed Signal Shape = (16, 875)
left_2_1: LEFT Processed Signal Shape = (16, 875)
left_2_3: LEFT Processed Signal Shape = (16, 875)
left_2_5: LEFT Processed Signal Shape = (16, 875)
left_2_7: LEFT Processed Signal Shape = (16, 875)
left_2_9: LEFT Processed Signal Shape = (16, 875)
left_2_11: LEFT Processed Signal Shape = (16, 875)
left_2_13: LEFT Processed Signal Shape = (16, 875)
left_2_15: LEFT Processed Signal Shape = (16, 875)
left_2_17: LEFT Processed Signal Shape = (16, 875)
left_2_19: LEFT Processed Signal Shape = 

## Data Loading

In [14]:
# Combine left and right signals into a single dataset with labels
X = []  # Feature data
y = []  # Labels

# Add left signals with label 0
for signal in left_preprocessed_signals.values():
    X.append(signal)  # Add the signal (channels x samples)
    y.append(0)  # Label for left signals

# Add right signals with label 1
for signal in right_preprocessed_signals.values():
    X.append(signal)
    y.append(1)  # Label for right signals

# Convert to NumPy arrays
X = np.array(X)  # Shape: (num_samples, num_channels, num_timepoints)
y = np.array(y)  # Shape: (num_samples,)
print(f"Combined dataset shape: X={X.shape}, y={y.shape}")

Combined dataset shape: X=(80, 16, 875), y=(80,)


In [15]:
from sklearn.model_selection import train_test_split

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print(f"Training set: X_train={X_train.shape}, y_train={y_train.shape}")
print(f"Testing set: X_test={X_test.shape}, y_test={y_test.shape}")


Training set: X_train=(64, 16, 875), y_train=(64,)
Testing set: X_test=(16, 16, 875), y_test=(16,)


In [16]:
X_train_eegnet = X_train[:, np.newaxis, :, :]  # Shape: (num_samples, 1, num_channels, num_timepoints)
X_test_eegnet = X_test[:, np.newaxis, :, :]

In [17]:
import torch

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)  # Shape: (num_samples, num_channels, num_timepoints)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)    # Shape: (num_samples,)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

print(f"Training tensors: X_train_tensor={X_train_tensor.shape}, y_train_tensor={y_train_tensor.shape}")
print(f"Testing tensors: X_test_tensor={X_test_tensor.shape}, y_test_tensor={y_test_tensor.shape}")

Training tensors: X_train_tensor=torch.Size([64, 16, 875]), y_train_tensor=torch.Size([64])
Testing tensors: X_test_tensor=torch.Size([16, 16, 875]), y_test_tensor=torch.Size([16])


In [18]:
from torch.utils.data import DataLoader, TensorDataset

# Combine tensors into a TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)  # Adjust batch_size as needed
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


## CNN Model 🥟


In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

# Hyperparameters
input_size = 80  # Assuming input is a sequence of 80 elements
output_size = 2  # Number of classes for classification
num_channels = [16, 32, 64]  # Number of channels per layer (adjustable)
kernel_size = 3  # Kernel size for convolution
dropout = 0.2  # Dropout rate for regularization
learning_rate = 0.0001  # Learning rate
batch_size = 32  # Batch size
num_epochs = 10  # Number of training epochs
num_channels_in_data = 16  # Input channels (feature map depth)
signal_length = 875  # Length of the input signal sequence

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# CNN Model
class CNN(nn.Module):
    def __init__(self, in_channels, n_classes):
        super(CNN, self).__init__()

        # Convolutional layers with BatchNorm and ReLU activations
        self.conv1 = nn.Conv1d(in_channels, num_channels[0], kernel_size=kernel_size, padding=1)
        self.conv2 = nn.Conv1d(num_channels[0], num_channels[1], kernel_size=kernel_size, padding=1)
        self.conv3 = nn.Conv1d(num_channels[1], num_channels[2], kernel_size=kernel_size, padding=1)

        self.bn1 = nn.BatchNorm1d(num_channels[0])
        self.bn2 = nn.BatchNorm1d(num_channels[1])
        self.bn3 = nn.BatchNorm1d(num_channels[2])

        self.pool = nn.MaxPool1d(2)  # Max pooling layer

        # Fully connected layer to map to output classes
        self.fc1 = nn.Linear(num_channels[2] * (signal_length // 8), 128)  # Assuming a downsampling factor of 8
        self.fc2 = nn.Linear(128, n_classes)  # Output layer

        self.dropout = nn.Dropout(dropout)  # Dropout layer for regularization

    def forward(self, x):
        # Pass through the first convolutional block
        x = self.pool(torch.relu(self.bn1(self.conv1(x))))

        # Pass through the second convolutional block
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))

        # Pass through the third convolutional block
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)

        # Apply dropout for regularization
        x = self.dropout(x)

        # Fully connected layers
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Initialize model
model = CNN(num_channels_in_data, output_size).to(device)

# Training setup
criterion = nn.CrossEntropyLoss()  # Cross entropy loss for classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)  # Adam optimizer
epochs = num_epochs

train_losses = []
val_losses = []
accs = []
max_acc = 0

# Assume train_loader and test_loader are already defined and contain the data

# Training loop
for epoch in range(epochs):
    total_train_loss = 0.0
    model.train()
    pbar = tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}")

    # Iterate over batches in the training data
    for sig, labels in train_loader:
        sig = sig.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        pred = model(sig)

        # Compute loss
        loss = criterion(pred, labels)

        # Backpropagation
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        pbar.set_description(f"Epoch {epoch+1} loss={total_train_loss / (pbar.n + 1):.4f}")
        pbar.update(1)
    pbar.close()

    train_losses.append(total_train_loss / len(train_loader))

    # Validation loop
    total_val_loss = 0.0
    total_accuracy = 0.0
    model.eval()

    with torch.no_grad():
        pbar = tqdm(total=len(test_loader), desc="Validation")

        for sig, labels in test_loader:
            sig = sig.to(device)
            labels = labels.to(device)

            # Forward pass
            pred = model(sig)

            # Compute loss
            loss = criterion(pred, labels)

            # Calculate accuracy
            prob_pred = torch.nn.functional.softmax(pred, dim=-1)
            acc = (prob_pred.argmax(-1) == labels.argmax(-1)).float().mean()

            total_val_loss += loss.item()
            total_accuracy += acc.item()

            pbar.set_description(f"val loss={total_val_loss / (pbar.n + 1):.4f} val acc={total_accuracy / (pbar.n + 1):.4f}")
            pbar.update(1)
        pbar.close()

    val_losses.append(total_val_loss / len(test_loader))
    accs.append(total_accuracy / len(test_loader))

    # Print accuracy and loss for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}, Val Accuracy: {accs[-1]:.4f}")


# After training, print results for all epochs:
print("\nTraining and Validation Results for All Epochs:")  # Add a newline for better readability
for epoch in range(num_epochs):  # Use num_epochs for the loop
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_losses[epoch]:.4f}, Val Loss: {val_losses[epoch]:.4f}, Val Accuracy: {accs[epoch]:.4f}")



# After training, you can inspect train_losses, val_losses, and accs to analyze the training process

Epoch 1:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 1/10, Train Loss: 0.7393, Val Loss: 0.6947, Val Accuracy: 1.0000


Epoch 2:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 2/10, Train Loss: 0.6711, Val Loss: 0.6932, Val Accuracy: 1.0000


Epoch 3:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 3/10, Train Loss: 0.5866, Val Loss: 0.6901, Val Accuracy: 0.9375


Epoch 4:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 4/10, Train Loss: 0.5506, Val Loss: 0.6891, Val Accuracy: 1.0000


Epoch 5:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 5/10, Train Loss: 0.4853, Val Loss: 0.6911, Val Accuracy: 1.0000


Epoch 6:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 6/10, Train Loss: 0.4129, Val Loss: 0.6945, Val Accuracy: 1.0000


Epoch 7:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 7/10, Train Loss: 0.3614, Val Loss: 0.6923, Val Accuracy: 1.0000


Epoch 8:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 8/10, Train Loss: 0.3125, Val Loss: 0.6911, Val Accuracy: 0.6250


Epoch 9:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 9/10, Train Loss: 0.2672, Val Loss: 0.6905, Val Accuracy: 0.6875


Epoch 10:   0%|          | 0/2 [00:00<?, ?it/s]

Validation:   0%|          | 0/1 [00:00<?, ?it/s]

Epoch 10/10, Train Loss: 0.2256, Val Loss: 0.6955, Val Accuracy: 0.8750

Training and Validation Results for All Epochs:
Epoch 1/10, Train Loss: 0.7393, Val Loss: 0.6947, Val Accuracy: 1.0000
Epoch 2/10, Train Loss: 0.6711, Val Loss: 0.6932, Val Accuracy: 1.0000
Epoch 3/10, Train Loss: 0.5866, Val Loss: 0.6901, Val Accuracy: 0.9375
Epoch 4/10, Train Loss: 0.5506, Val Loss: 0.6891, Val Accuracy: 1.0000
Epoch 5/10, Train Loss: 0.4853, Val Loss: 0.6911, Val Accuracy: 1.0000
Epoch 6/10, Train Loss: 0.4129, Val Loss: 0.6945, Val Accuracy: 1.0000
Epoch 7/10, Train Loss: 0.3614, Val Loss: 0.6923, Val Accuracy: 1.0000
Epoch 8/10, Train Loss: 0.3125, Val Loss: 0.6911, Val Accuracy: 0.6250
Epoch 9/10, Train Loss: 0.2672, Val Loss: 0.6905, Val Accuracy: 0.6875
Epoch 10/10, Train Loss: 0.2256, Val Loss: 0.6955, Val Accuracy: 0.8750


In [23]:
overall_accuracy = accs[-1]
print(f"Overall Accuracy: {overall_accuracy:.4f}")

Overall Accuracy: 0.8750


In [24]:
overall_loss = val_losses[-1]
print(f"Overall Loss: {overall_loss:.4f}")

Overall Loss: 0.6955
