In [1]:
# For Google Colab
from google.colab import drive
drive.mount('/content/drive')
import sys
import os
sys.path.append('/content/drive/My Drive/c147-project')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

from models.cnn import *
from scripts.preprocessing import *

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# Load Data
%cd /content/drive/My Drive/c147-project/project_data/project
X_test = np.load("X_test.npy")
y_test = np.load("y_test.npy")
person_train_valid = np.load("person_train_valid.npy")
X_train_valid = np.load("X_train_valid.npy")
y_train_valid = np.load("y_train_valid.npy")
person_test = np.load("person_test.npy")

## Adjusting the labels so that

# Cue onset left - 0
# Cue onset right - 1
# Cue onset foot - 2
# Cue onset tongue - 3

y_train_valid -= 769
y_test -= 769

/content/drive/My Drive/c147-project/project_data/project


In [4]:
## Data Augumentation
def aug(X_train, y_train, X_test, y_test, Trim, sub_sample, average, noise, noid):

    total_X_train = None
    total_y_train = None
    total_X_test = None
    total_y_test = None

    #Trimming
    H = int(Trim * X_train.shape[2])

    X_train = X_train[:, :, 0:H]

    X_test = X_test[:,:, 0:H]

    #Maxpooling
    X_max_train = np.max(X_train.reshape(X_train.shape[0], X_train.shape[1], -1, sub_sample), axis=3)

    X_max_test = np.max(X_test.reshape(X_test.shape[0], X_test.shape[1], -1, sub_sample), axis=3)

    total_X_train = X_max_train
    total_y_train = y_train

    total_X_test = X_max_test
    total_y_test = y_test

    #Jittering
    X_average_train = np.mean(X_train.reshape(X_train.shape[0], X_train.shape[1], -1, average),axis=3)
    X_average_test = np.mean(X_test.reshape(X_test.shape[0], X_test.shape[1], -1, average),axis=3)

    X_average_train = X_average_train + np.random.normal(0.0, noise, X_average_train.shape)
    X_average_test = X_average_test + np.random.normal(0.0, noise, X_average_test.shape)

    total_X_train = np.vstack((total_X_train, X_average_train))
    total_y_train = np.hstack((total_y_train, y_train))

    total_X_test = np.vstack((total_X_test, X_average_test))
    total_y_test = np.hstack((total_y_test, y_test))

    #Subsampling
    for i in range(sub_sample):

        X_subsample_train = X_train[:, :, i::sub_sample] + \
                            (np.random.normal(0.0, noise, X_train[:, :,i::sub_sample].shape) if noid else 0.0)

        total_X_train = np.vstack((total_X_train, X_subsample_train))
        total_y_train = np.hstack((total_y_train, y_train))

        X_subsample_test = X_test[:, :, i::sub_sample] + \
                            (np.random.normal(0.0, noise, X_test[:, :,i::sub_sample].shape) if noid else 0.0)

        total_X_test = np.vstack((total_X_test, X_subsample_test))
        total_y_test = np.hstack((total_y_test, y_test))


    return total_X_train,total_y_train,total_X_test,total_y_test


def aug_valid(X_train, y_train, Trim, sub_sample, average, noise, noid):

    total_X_train = None
    total_y_train = None

    #Trimming
    H = int(Trim * X_train.shape[2])
    X_train = X_train[:, :, 0:H]

    #Maxpooling
    X_max_train = np.max(X_train.reshape(X_train.shape[0], X_train.shape[1], -1, sub_sample), axis=3)

    total_X_train = X_max_train
    total_y_train = y_train

    return total_X_train,total_y_train

In [5]:
X_train, X_valid, y_train, y_valid = train_test_split(X_train_valid, y_train_valid, test_size=0.2, shuffle=True)
X_train_prep, y_train_prep, X_test_prep, y_test_prep = aug(X_train,y_train,X_test,y_test,700,2,2,0.5,1)
X_valid_prep, y_valid_prep = aug_valid(X_valid,y_valid,700,2,2,0.5,1)

print((X_train_prep).shape)
print((X_valid_prep).shape)
print((X_test_prep).shape)

class EEGDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.FloatTensor(X).unsqueeze(1)
        self.X = torch.FloatTensor(X) # for conv1d
        # self.X = self.X.permute(0, 3, 2, 1)
        self.Y = torch.LongTensor(Y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        return self.X[index], self.Y[index]

    def shape(self):
        return self.X.shape

# Creating Dataset instances
train_set = EEGDataset(X_train_prep, y_train_prep)
val_set = EEGDataset(X_valid_prep, y_valid_prep)
test_set = EEGDataset(X_test_prep, y_test_prep)

print(train_set.shape())
print(val_set.shape())
print(test_set.shape())

# Initializing DataLoaders with concise batch_size and shuffle parameters
train_loader = DataLoader(train_set, batch_size=100, shuffle=True)
val_loader = DataLoader(val_set, batch_size=100, shuffle=True)
test_loader = DataLoader(test_set, batch_size=50, shuffle=True)

(6768, 22, 500)
(423, 22, 500)
(1772, 22, 500)
torch.Size([6768, 22, 500])
torch.Size([423, 22, 500])
torch.Size([1772, 22, 500])


In [6]:
def train_evaluate(model, train_loader, val_loader, criterion, optimizer, save_path, num_epochs=100):
    best_val_accuracy = 0.0  # Initialize best validation accuracy
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        train_correct = 0
        train_total = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

        train_accuracy = 100 * train_correct / train_total

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_accuracy = 100 * correct / total
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), save_path)  # Save the best model

        print(f"Epoch {epoch+1}/{num_epochs}, Training Accuracy: {train_accuracy}%, Validation Accuracy: {val_accuracy}%, Best Validation Accuracy: {best_val_accuracy}%")

    return best_val_accuracy

In [7]:
# grid search for cnn hyperparameters
# best_params = {'learning_rate': 0.001, 'batch_size': 64, 'dropout_rate': 0.6, 'filter_size': 5}
'''
learning_rates = [0.0001, 0.001, 0.01, 0.1]
batch_sizes = [16, 32, 64, 100]
dropout_rates = [0.2, 0.3, 0.4, 0.5, 0.6]
filter_sizes = [3, 5, 7]

# Placeholder for best hyperparameters and their corresponding validation loss
best_params = {}
best_val_loss = float('inf')

for lr in learning_rates:
    for batch_size in batch_sizes:
      for dropout_rate in dropout_rates:
        for filter_size in filter_sizes:
          # Create data loaders with the current batch size
          train_loader = DataLoader(train_set, batch_size=100, shuffle=True)
          val_loader = DataLoader(val_set, batch_size=100, shuffle=True)

          # Initialize model, loss function, and optimizer
          model = CNN(dropout_rate, filter_size)
          criterion = nn.CrossEntropyLoss()
          optimizer = optim.Adam(model.parameters(), lr=lr)
          model = model.to(device)

          # Train and evaluate the model
          val_loss = train_evaluate(model, train_loader, val_loader, criterion, optimizer)
          print(f"LR: {lr}, Batch Size: {batch_size}, Dropout Rate: {dropout_rate}, Filter Size: {filter_size}, Validation Loss: {val_loss}")

          # Update best parameters if current model is better
          if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_params = {'learning_rate': lr, 'batch_size': batch_size, 'dropout_rate': dropout_rate, 'filter_size': filter_size}

print(f"Best Parameters: {best_params}, Best Validation Loss: {best_val_loss}")
'''

'\nlearning_rates = [0.0001, 0.001, 0.01, 0.1]\nbatch_sizes = [16, 32, 64, 100]\ndropout_rates = [0.2, 0.3, 0.4, 0.5, 0.6]\nfilter_sizes = [3, 5, 7]\n\n# Placeholder for best hyperparameters and their corresponding validation loss\nbest_params = {}\nbest_val_loss = float(\'inf\')\n\nfor lr in learning_rates:\n    for batch_size in batch_sizes:\n      for dropout_rate in dropout_rates:\n        for filter_size in filter_sizes:\n          # Create data loaders with the current batch size\n          train_loader = DataLoader(train_set, batch_size=100, shuffle=True)\n          val_loader = DataLoader(val_set, batch_size=100, shuffle=True)\n\n          # Initialize model, loss function, and optimizer\n          model = CNN(dropout_rate, filter_size)\n          criterion = nn.CrossEntropyLoss()\n          optimizer = optim.Adam(model.parameters(), lr=lr)\n          model = model.to(device)\n\n          # Train and evaluate the model\n          val_loss = train_evaluate(model, train_loader, v

In [8]:
'''
best_params = {'learning_rate': 0.001, 'batch_size': 64, 'dropout_rate': 0.6, 'filter_size': 5}
best_model = CNN(dropout_rate=best_params['dropout_rate'], filter_size=best_params['filter_size']).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(best_model.parameters(), lr=best_params['learning_rate'])
train_loader = DataLoader(train_set, batch_size=best_params['batch_size'], shuffle=True)
val_loader = DataLoader(val_set, batch_size=best_params['batch_size'], shuffle=True)
train_evaluate(best_model, train_loader, val_loader, criterion, optimizer, num_epochs=200, save_path='/content/drive/My Drive/c147-project/models/best_cnn.pth')
'''

"\nbest_params = {'learning_rate': 0.001, 'batch_size': 64, 'dropout_rate': 0.6, 'filter_size': 5}\nbest_model = CNN(dropout_rate=best_params['dropout_rate'], filter_size=best_params['filter_size']).to(device)\ncriterion = nn.CrossEntropyLoss()\noptimizer = optim.Adam(best_model.parameters(), lr=best_params['learning_rate'])\ntrain_loader = DataLoader(train_set, batch_size=best_params['batch_size'], shuffle=True)\nval_loader = DataLoader(val_set, batch_size=best_params['batch_size'], shuffle=True)\ntrain_evaluate(best_model, train_loader, val_loader, criterion, optimizer, num_epochs=200, save_path='/content/drive/My Drive/c147-project/models/best_cnn.pth')\n"

In [9]:
# Testing loop
test_loader = DataLoader(test_set, batch_size=50, shuffle=True)
test_best_model = CNN(dropout_rate=0.6, filter_size=5).to(device)
test_best_model.load_state_dict(torch.load(r'/content/drive/My Drive/c147-project/models/cnn_7302.pth', map_location=device))
test_best_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = test_best_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy of the model on the test set: {accuracy}%')

Accuracy of the model on the test set: 73.53273137697516%
