<a href="https://colab.research.google.com/github/Flo909/GraspandLiftDetection/blob/main/Copy_of_Assignment_3_Xanthe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
import torch
import torchvision
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim

import tensorflow

from matplotlib import pyplot as plt


# Training will be significantly faster if GPU is available. In Colab, go to Runtime -> Change runtime type -> Hardware accelerator -> GPU
if torch.cuda.is_available():
    print("GPU is available")
else:
    print("GPU is not available")
# set the device to GPU if available, else CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


GPU is available


Import data:
Download the train.zip file from https://www.kaggle.com/c/grasp-and-lift-eeg-detection/data and
explore the dataset. This file contains the first 8 series for each subject. (We will be only using
train.zip for the project.)
There are two files for each subject + series combination:
● the *_data.csv files contain the raw 32 channels EEG data (sampling rate 500Hz)
● the *_events.csv files contains the ground truth frame-wise labels for all events

NOTE: to import the data you have to log into kaggle and create an API token. From there you upload the kaggle.json file that will be downloaded when creating the API token and upload it into the connected google drive account for colab. Then give colab permission to access the files when running the below lines.

In [3]:
! pip install kaggle



In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
! mkdir ~/.kaggle

mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [6]:
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/kaggle.json

In [7]:
! kaggle competitions download grasp-and-lift-eeg-detection --force

Downloading grasp-and-lift-eeg-detection.zip to /content
100% 1.02G/1.02G [00:05<00:00, 237MB/s]
100% 1.02G/1.02G [00:05<00:00, 214MB/s]


In [8]:
import zipfile
import pandas as pd
import os

# Open the zip file
with zipfile.ZipFile('grasp-and-lift-eeg-detection.zip', 'r') as zip_ref:
    # Extract all files
    zip_ref.extractall('grasp-and-lift-eeg-detection')

# List the extracted files
extracted_files = zip_ref.namelist()

display(extracted_files)

['sample_submission.csv.zip', 'test.zip', 'train.zip']

In [9]:
# Extracted folder name will be the same as the zip file name without the extension
extracted_folder = os.path.splitext('grasp-and-lift-eeg-detection/train.zip')[0]

with zipfile.ZipFile('grasp-and-lift-eeg-detection/train.zip', 'r') as zip_ref:
    # Extract all files
    zip_ref.extractall(extracted_folder)

extracted_files = zip_ref.namelist()

In [10]:
# Function to read data and labels
def read_data_and_labels(extracted_files, extracted_folder):
    train_data = []
    test_data = []
    test_labels = []
    train_labels = []

    for file_name in extracted_files:
        if file_name.endswith('_data.csv'):
            subject_id, series = file_name.split('_')[:2]
            df = pd.read_csv(os.path.join(extracted_folder, file_name))
            df.drop(columns = ['id'], inplace=True)
            if series == 'series7' or series == 'series8':
                test_data.append(df.T.astype(np.float32))
            else:
                train_data.append(df.T.astype(np.float32))

        elif file_name.endswith('_events.csv'):
            subject_id, series = file_name.split('_')[:2]
            labels_df = pd.read_csv(os.path.join(extracted_folder, file_name))
            labels_df.drop(columns =['id'], inplace=True)
            if series == 'series7' or series == 'series8':
              test_labels.append(labels_df.T.astype(np.float32))
            else:
              train_labels.append(labels_df.T.astype(np.float32))


    return train_data, test_data, test_labels, train_labels

# Read data and labels
train_data, test_data, test_labels, train_labels = read_data_and_labels(extracted_files, extracted_folder)



# Create dataset class
Note: example gets rid of long chunks of data with no events-> could be something to look into

In [14]:
import torch
from torch.utils.data import Dataset, DataLoader

# Custom Dataset Class
class EEGDataset(Dataset):
    def __init__(self, data, labels, in_len, train=True):
        self.data = data
        self.labels = labels
        self.in_len = in_len

        self.index = [(i, j) for i in range(len(data)) for j in range(data[i].shape[1])]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        i, j = self.index[idx]

        data, labels = self.data[i][:, max(0, j - self.in_len + 1):j + 1], self.labels[i][:, j]

        data, label = torch.from_numpy(data.astype(np.float32)), torch.from_numpy(labels.astype(np.float32))
        return data, label

# Create Dataset instances
# final argument must be an ingeger between 1 and 33 -> the larger the number the more computationally complex
in_len = 15
train_dataset = EEGDataset(train_data, train_labels, in_len)
test_dataset = EEGDataset(test_data, test_labels, in_len)

dataLoader = DataLoader(train_dataset, batch_size=32)



In [22]:
print(train_dataset.shape)

AttributeError: 'EEGDataset' object has no attribute 'shape'

# Create a Neural Network


In [15]:
# Define the CNN architecture
class EEGCNN(nn.Module):
    def __init__(self):
        super(EEGCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(32 * 125, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 32 * 125)  # Flattening
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x



In [18]:
from tqdm import tqdm

# Define model
model = EEGCNN().to(device)

# Define your loss function
criterion = nn.CrossEntropyLoss()

# Define your optimizer
optimizer = optim.Adam(model.parameters())

# Set the number of epochs
num_epochs = 20

# Lists to store the training and validation losses and accuracies
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

# Training loop
for epoch in range(num_epochs):
    # Set the model to training mode
    model.train()

    # Initialize the training loss and accuracy for this epoch
    train_loss = 0
    train_correct = 0
    train_total = 0

    # Iterate over the training data
    for inputs, labels in dataLoader:

        # Move the inputs and labels to the GPU if available
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass
        loss.backward()

        # Update the weights
        optimizer.step()
        # Update the training loss and accuracy
        train_loss += loss.item() * inputs.size(0)
        _, labels_reshape = torch.max(labels, 1)
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels_reshape.size(0)
        train_correct += (predicted == labels_reshape).sum().item()


    # Compute the average training loss and accuracy for this epoch
    train_loss /= len(dataLoader.dataset)
    train_accuracy = 100 * train_correct / train_total

    # Append the training loss and accuracy to the lists
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # Set the model to evaluation mode
    model.eval()

    # Initialize the validation loss and accuracy for this epoch
    val_loss = 0
    val_correct = 0
    val_total = 0

    # EXERCISE - Calculate the validation loss and accuracy and append to the lists

    # Compute the average validation loss and accuracy for this epoch
    val_loss /= len(val_loader.dataset)
    val_accuracy = 100 * val_correct / val_total

    # Append the validation loss and accuracy to the lists
    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    # Print the training and validation metrics for this epoch
    print(f"Epoch {epoch+1}/{num_epochs}:")
    print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_accuracy:.2f}%")
    print(f"Val Loss: {val_loss:.4f} | Val Accuracy: {val_accuracy:.2f}%")
    print()

KeyError: (slice(None, None, None), 0)

#Filtering
Filtering the Data
(maybe do this after we actually have a nn)


In [11]:
import matplotlib.pyplot as plt
from scipy import signal

# Making a copy of the dataset so as the maintain the original
filtered_train = train_dataset
filtered_test = test_dataset

def filter_eeg_dataset(EEGDataset, f_low, f_high, order, type, fs):
  b, a = signal.butter(order, [f_low, f_high], btype=type, fs=fs)

  subject_names = EEGDataset.subject_names

  # For each Subject in EEG Dataset
  for subject in subject_names:
    # For each Series in Subject
    for series in range(6):
      try:
        data_trial = EEGDataset.subjects[subject][series]
        # For each Column in series and subject
        # Not the first non-numerical columns
        for col_name in data_trial.columns:

          if col_name != "id":
            filtered_data = signal.filtfilt(b, a, data_trial[col_name])

            # Replace data
            EEGDataset.subjects[subject][series][col_name] = filtered_data


      # Breaking when we run out of series
      except IndexError as e:
        print(str(subject) + " caps at "+ str(series))
        break




# System Params
order = 4
sampling_frequency = 500

# High and Low Pass Filter
f_low = 1
f_high = 100
f_type = 'bandpass'

filter_eeg_dataset(filtered_train, f_low, f_high, order, f_type, sampling_frequency)
filter_eeg_dataset(filtered_test, f_low, f_high, order, f_type, sampling_frequency)

# Bandreject
f_low = 49
f_high = 51
f_type = 'bandstop'

filter_eeg_dataset(filtered_train, f_low, f_high, order, f_type, sampling_frequency)
filter_eeg_dataset(filtered_test, f_low, f_high, order, f_type, sampling_frequency)

train/subj10 caps at 2
train/subj11 caps at 2
train/subj12 caps at 2
train/subj1 caps at 2
train/subj2 caps at 2
train/subj3 caps at 2
train/subj4 caps at 2
train/subj5 caps at 2
train/subj6 caps at 2
train/subj7 caps at 2
train/subj8 caps at 2
train/subj9 caps at 2
train/subj10 caps at 2
train/subj11 caps at 2
train/subj12 caps at 2
train/subj1 caps at 2
train/subj2 caps at 2
train/subj3 caps at 2
train/subj4 caps at 2
train/subj5 caps at 2
train/subj6 caps at 2
train/subj7 caps at 2
train/subj8 caps at 2
train/subj9 caps at 2


Feature Extraction
