In [1]:
# use the raw data :')
import mne
import numpy as np
import pandas as pd
import scipy
import re
import random

# some hyperparameter type things :)
sr = 200
t = 200
ch = 19
affix = '_raw'

In [None]:
def load_rawdata(subj):
    sr = 1000 if subj[1] == '_HFREQ' else 200
    mat = scipy.io.loadmat(f'preprocessing/raw_data/{subj[0]}{subj[1]}.mat')
    data = mat['o'][0][0][5].T
    data = np.delete(data, [10, 11, 21], axis = 0)

    if sr == 1000: data = scipy.signal.resample(data, int(data.shape[1]/5), axis = 1)

    events = pd.read_csv(f"preprocessing/{subj[0]}_events.txt", sep="\t").drop('number', axis=1)
    events = events[['latency', 'urevent', 'type']].to_numpy()
    events = mne.pick_events(events, include=[1, 2, 3, 4, 5]) # should replace this with a numpy function lol

    features = []
    labels = []

    for event in events:
        time = int(event[0])
        if sr == 1000: time = int(time/5)
        type = int(event[2])
        features.append([data[i][time:time+200] for i in range(ch)])
        labels.append(type - 1)

    # mne.filter.filter_data(features, 200, 5, None)

    return (np.array(features), labels)

In [None]:
raw_features = {}
raw_labels = {}

for subj in [('A_405', ''), ('A_408', '_HFREQ'), ('B_110', ''), ('B_309', '_HFREQ'), ('B_311', '_HFREQ'), ('B_316', ''), ('C_204', ''), ('C_429', '_HFREQ'), ('E_321', '_HFREQ'), ('E_415', '_HFREQ'), ('E_429', '_HFREQ'), ('F_027', ''), ('F_209', ''), ('F_210', '_HFREQ'), ('G_413', '_HFREQ'), ('G_428', '_HFREQ'), ('H_804', '_HFREQ'), ('I_719', '_HFREQ'), ('I_723', '_HFREQ')]:
    print(subj[0])
    raw_features[subj[0]], raw_labels[subj[0]] = load_rawdata(subj)

subjList = ['A_405', 'A_408', 'B_110', 'B_309', 'B_311', 'B_316', 'C_204', 'C_429', 'E_321', 'E_415', 'E_429', 'F_027', 'F_209', 'F_210', 'G_413', 'G_428', 'H_804', 'I_719', 'I_723']
fileList = []

for letter in ['A', 'B', 'C', 'E', 'F', 'G', 'H', 'I']:
    r = re.compile(f'{letter}_d*')
    subjects = list(filter(r.match, subjList))
    print(subjects)
    subjects = [(raw_features[x], raw_labels[x]) for x in subjects]
    fileName = f'subj_{letter}'
    _features = np.concatenate([features for features, labels in subjects])
    _labels = np.concatenate([labels for features, labels in subjects])

    _features = np.array(_features).reshape(-1, 19, 200)
    _labels = np.array(_labels).reshape(-1)
    fileList.append((fileName, _features, _labels))

for subj, file, labels in fileList:
    np.save(f'pickles/{subj}_features{affix}', file)
    np.save(f'pickles/{subj}_labels{affix}', labels)

In [2]:
# load from .pkl files ("raw")
raw_features = {}
raw_labels = {}
fileList = []

for letter in ['A', 'B', 'C', 'E', 'F', 'G', 'H', 'I']: 
    raw_features[f'subj_{letter}'] = np.load(f'pickles/subj_{letter}_features{affix}.npy')
    raw_labels[f'subj_{letter}'] = np.load(f'pickles/subj_{letter}_labels{affix}.npy').reshape(-1)
    fileList.append((f'subj_{letter}', raw_features[f'subj_{letter}'], raw_labels[f'subj_{letter}']))


features extraction ?

In [16]:
from scipy import fftpack
import matplotlib.pyplot as plt

def get_fft(sample): # pull one piece of that data set, trial 1 of thumb data
    #fig1 = plt.plot(np.linspace(0, 1.5, num=300), sample)
    # fft on that one piece...
    sig_fft = fftpack.fft(sample)
    power = np.abs(sig_fft)**2
    sample_freq = fftpack.fftfreq(sample.size)
    return power[0:int(power.size/2)]
    # fig2 = plt.plot(sample_freq[0:200], power[0:200])

In [29]:
# do fft for everyone :)

sample_freqs = fftpack.fftfreq(t) # since all samples are the same, sample freqs are the same

fft_features = {}
fft_labels = {}

for subj, features, labels in fileList:
    # file.event_id = {'thumb':1, 'index':2, 'middle':3, 'ring':4, 'pinkie':5}
    # conditions = ['thumb']
    fft_features[subj] = np.zeros((labels.shape[0], ch, int(t/2)))
    for trial_i in range(labels.shape[0]): #for each event
        fft = np.zeros((ch, int(t/2)))
        for channel_i in range(ch):
            power = get_fft(features[trial_i][channel_i])
            fft[channel_i] = power
        fft_features[subj][trial_i] = fft
    fft_labels[subj] = labels
    features = fft_features[subj]
#   print(f'{subj}: {file.events.shape[0]}')

(1826, 19, 100)
(3811, 19, 100)
(1899, 19, 100)
(2848, 19, 100)
(2874, 19, 100)
(1901, 19, 100)
(922, 19, 100)
(1917, 19, 100)


In [None]:
# get CSP features :D
from mne.decoding import CSP

file = mne.io.read_raw_eeglab('preprocessing/A_405_clean.set', verbose=False)

csp_features = {}
csp_labels = {}

for subj, features, labels in fileList:
    csp = CSP(n_components=5, reg=None, log=True)
    labels = raw_labels[subj]

    csp_features[subj] = csp.fit_transform(features, labels)
    csp_labels[subj] = labels

    csp.plot_filters(file.info, title='CSP patterns for %s' % subj)    



classifiers

In [31]:
#2D CNN w/ Adham's recommendations

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

class ConvNet_2D(nn.Module):
    def __init__(self):
        super(ConvNet_2D, self).__init__()
        ### FILL IN ### [10 POINTS]

        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 8, (7, 3)),
            nn.ReLU(), 
            #nn.MaxPool2d(2)
        )
        
        self.conv2 = nn.Sequential(
            nn.Conv2d(8, 16, (5, 3)),
            nn.Conv2d(16, 32, (3, 3)),
            nn.ReLU(),
            #nn.MaxPool2d(2)
        )

        self.conv3 = nn.Sequential(
            nn.ReLU(),
            #nn.MaxPool2d(2)
        )

        self.hidden = nn.Sequential(
            nn.Flatten(),
            # nn.Linear(121728, 256),
            nn.Linear(21056, 256),
            nn.ReLU(),
            nn.Dropout(0.4), # all features norefer
            # nn.Linear(1408, 256), #pseudosampled featrues .
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 5),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        ### FILL IN ### [5 POINTS]
        x = self.conv1(x)
        x = self.conv2(x)
        # x = self.conv3(x)
        x = self.hidden(x)
        return x


In [32]:
# run CNN
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
# train neural net
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
accuracies = []

for subj, features, labels in fileList:
    features = fft_features[subj]
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size= 0.3) # 70% training 30% test
    conv_net = ConvNet_2D().to(device)
    criterion = nn.CrossEntropyLoss()
    conv_net_optimizer = torch.optim.Adam(conv_net.parameters(), lr=0.001)
    # Train the neural network
    for epoch in range(50):
        # print("epoch = ",epoch)
        conv_net.train()
        data = X_train
        targets = y_train
        data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64)
        targets = targets.to(device)

        # Forward pass
        conv_net_predictions = conv_net(data)
        conv_net_loss = criterion(conv_net_predictions, targets)

        # Backward pass
        conv_net_optimizer.zero_grad()
        conv_net_loss.backward()
        conv_net_optimizer.step()

    # Evaluate the neural network
    conv_net.eval()
    correct, total = 0, 0
    with torch.no_grad():
        data = X_test
        targets = y_test
        data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64)
        targets = targets.to(device)
        conv_net_predictions = conv_net(data)
        _, predicted = torch.max(conv_net_predictions.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    print(f"test accuracy {subj}= ",correct/total)
    accuracies.append(correct/total)

print(f'avg accuracy = {np.mean(accuracies)}')


test accuracy subj_A=  0.19708029197080293
test accuracy subj_B=  0.24737762237762237
test accuracy subj_C=  0.21228070175438596
test accuracy subj_E=  0.2
test accuracy subj_F=  0.20278099652375434
test accuracy subj_G=  0.19264448336252188
test accuracy subj_H=  0.22743682310469315
test accuracy subj_I=  0.21354166666666666
avg accuracy = 0.2116428232200559


In [None]:
sum([0.395985401459854, 0.47465034965034963, 0.5421052631578948, 0.5941520467836258,
  0.42526071842410196, 0.4868651488616462, 0.3285198555956679, 0.5017361111111112])/8

In [None]:
# CNN transfer learning version
# train neural net
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
conv_net = ConvNet_2D().to(device)
criterion = nn.CrossEntropyLoss()
conv_net_optimizer = torch.optim.Adam(conv_net.parameters(), lr=0.001)

random.shuffle(fileList)

for subj, features, labels in fileList[0:-1]:
    # Train the neural network
    for epoch in range(30):
        # print("epoch = ",epoch)
        conv_net.train()
        data = features
        targets = labels
        data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64)
        targets = targets.to(device)

        # Forward pass
        conv_net_predictions = conv_net(data)
        conv_net_loss = criterion(conv_net_predictions, targets)

        # Backward pass
        conv_net_optimizer.zero_grad()
        conv_net_loss.backward()
        conv_net_optimizer.step()

    # Evaluate the neural network
    conv_net.eval()
    correct, total = 0, 0
    with torch.no_grad():
        data = fileList[-1][1]
        targets = fileList[-1][2]
        data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64)
        targets = targets.to(device)
        conv_net_predictions = conv_net(data)
        _, predicted = torch.max(conv_net_predictions.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    print(f"test accuracy {subj} = ",correct/total)

with torch.no_grad():
    data = fileList[-1][1]
    targets = fileList[-1][2]
    data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
    data = data.to(device)
    targets = torch.tensor(targets, dtype=torch.int64)
    targets = targets.to(device)
    conv_net_predictions = conv_net(data)
    _, predicted = torch.max(conv_net_predictions.data, 1)
    total += targets.size(0)
    correct += (predicted == targets).sum().item()
print("test accuracy = ",correct/total)


In [None]:
# SVM TIME WOOO
from sklearn.ensemble import BaggingClassifier
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn import metrics

avg_acc = 0

for subj, features, labels in fileList:
    #features = csp_features[subj]
    X_train, X_test, y_train, y_test = train_test_split(np.reshape(features, (features.shape[0], -1)), labels, test_size= 0.3) # 70% training 30% test
    
    classifier = BaggingClassifier(svm.SVC(kernel='rbf'), n_estimators=20) # radial basis kernel; er i think they used gamma=0.2 ngl
    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_test)
    avg_acc += metrics.accuracy_score(y_test, y_pred)

    print(f"Accuracy {subj}:",metrics.accuracy_score(y_test, y_pred))

print("Average accuracy: ", avg_acc/len(fileList))

In [None]:
# Random forest ?
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn import metrics

for subj, features, labels in fileList:
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size= 0.3) # 70% training 30% test

    rf = RandomForestClassifier(n_estimators = 100)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_test)

    print(f"Accuracy {subj}:",metrics.accuracy_score(y_test, y_pred))

In [9]:
# CNN + LSTM :D
#2D CNN w/ Adham's recommendations

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

class Conv_LSTM(nn.Module):
    def __init__(self):
        super(Conv_LSTM, self).__init__()
        ### FILL IN ### [10 POINTS]

        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 8, (5, 5)),
            nn.Conv2d(8, 16, (3, 3)),
            nn.ReLU(), 
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, (6, 1)),
        )

        self.lstm = nn.Sequential(
            nn.LSTM(32, 512, batch_first=True, num_layers=2, dropout=0.2)
        )

        self.hidden = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 5),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        ### FILL IN ### [5 POINTS]
        x = self.conv1(x)
        # x.shape = (batch_size//# trials, channels, height, width)
        x = x.reshape(x.shape[0], x.shape[1], x.shape[3])
        x = x.transpose(1, 2)
        x = self.lstm(x)
        x = x[0][:, -1, :]
        x = self.hidden(x)
        return x

In [10]:
# run CNN+LSTM
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
# train neural net
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
accuracies = []

for subj, features, labels in fileList:
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size= 0.3) # 70% training 30% test
    conv_lstm = Conv_LSTM().to(device)
    criterion = nn.CrossEntropyLoss()
    conv_lstm_optimizer = torch.optim.Adam(conv_lstm.parameters(), lr=0.001)
    # Train the neural network
    for epoch in range(50):
        # print("epoch = ",epoch)
        conv_lstm.train()
        data = X_train
        targets = y_train
        data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64) # nn.functional.one_hot().type(torch.float32)
        targets = targets.to(device)

        # Forward pass
        conv_lstm_predictions = conv_lstm(data)
        conv_lstm_loss = criterion(conv_lstm_predictions, targets)

        # Backward pass
        conv_lstm_optimizer.zero_grad()
        conv_lstm_loss.backward()
        conv_lstm_optimizer.step()

    # Evaluate the neural network
    conv_lstm.eval()
    correct, total = 0, 0
    with torch.no_grad():
        data = X_test
        targets = y_test
        data = torch.tensor(data.reshape(int(data.shape[0]), 1, data.shape[1], data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64) # nn.functional.one_hot().type(torch.float32)
        targets = targets.to(device)
        conv_lstm_predictions = conv_lstm(data)
        _, predicted = torch.max(conv_lstm_predictions.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    print(f"test accuracy {subj}= ",correct/total)
    accuracies.append(correct/total)

print(f'avg accuracy = {np.mean(accuracies)}')


test accuracy subj_A=  0.3010948905109489
test accuracy subj_B=  0.3129370629370629
test accuracy subj_C=  0.2754385964912281
test accuracy subj_E=  0.3929824561403509
test accuracy subj_F=  0.2688296639629201
test accuracy subj_G=  0.38003502626970226
test accuracy subj_H=  0.3140794223826715
test accuracy subj_I=  0.3802083333333333
avg accuracy = 0.3282006815035272


In [None]:
test accuracy subj_A=  0.36313868613138683
test accuracy subj_F=  0.593279258400927
test accuracy subj_B=  0.15297202797202797
test accuracy subj_G=  0.12084063047285463
test accuracy subj_C=  0.03859649122807018
test accuracy subj_I=  0.3263888888888889
test accuracy subj_H=  0.10469314079422383
test accuracy subj_E=  0.21403508771929824
avg accuracy = 0.23924302645095968

waht the heck so mixed


another run:

test accuracy subj_A=  0.09124087591240876
test accuracy subj_F=  0.1332560834298957
test accuracy subj_B=  0.0026223776223776225
test accuracy subj_G=  0.09281961471103327
test accuracy subj_C=  0.10175438596491228
test accuracy subj_I=  0.15625
test accuracy subj_H=  0.06859205776173286
test accuracy subj_E=  0.09005847953216374
avg accuracy = 0.09207423436681553


ok finally consistent results :

test accuracy subj_A=  0.3284671532846715
test accuracy subj_B=  0.38286713286713286
test accuracy subj_C=  0.38070175438596493
test accuracy subj_E=  0.43391812865497076
test accuracy subj_F=  0.26187717265353416
test accuracy subj_G=  0.37478108581436076
test accuracy subj_H=  0.3140794223826715
test accuracy subj_I=  0.4010416666666667
avg accuracy = 0.3597166895887467

next steps!

- try a minimal preprocessing approach and see if it improves anything (only bandpass filter and ica ???) the AAR might have messed things up last time, but not sure
- figure out CNN LSTM ? transformers ? GRU
- should i attempt any feature extraction ?
- 
- maybe read raw ECoG data and apply this classifier on it to see?
- see what happens if i combine ECoG and EEG data .. hm



SUBJECT TRANSFER:
- try randomizing samples
- try a retuning 20% data

In [None]:
# GRU

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

class gru_rnn(nn.Module):
    def __init__(self):
        super(gru_rnn, self).__init__()
        self.gru = nn.GRU(3800, 32, 3, dropout=0.2)
        self.fc = nn.Linear(32, 5)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.gru(x)[0]
        # x = self.fc(self.relu(x[0]))
        # x = self.hidden(x)
        return x

In [None]:
# run gRU !
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
# train neural net
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
accuracies = []

for subj, features, labels in fileList:
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size= 0.3) # 70% training 30% test
    neural_net = gru_rnn().to(device)
    criterion = nn.CrossEntropyLoss()
    neural_net_optimizer = torch.optim.Adam(neural_net.parameters(), lr=0.001)
    # Train the neural network
    for epoch in range(50):
        # print("epoch = ",epoch)
        neural_net.train()
        data = X_train
        targets = y_train
        data = torch.tensor(data.reshape(int(data.shape[0]), data.shape[1]* data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64)
        targets = targets.to(device)

        # Forward pass
        neural_net_predictions = neural_net(data)
        neural_net_loss = criterion(neural_net_predictions, targets)

        # Backward pass
        neural_net_optimizer.zero_grad()
        neural_net_loss.backward()
        neural_net_optimizer.step()

    # Evaluate the neural network
    neural_net.eval()
    correct, total = 0, 0
    with torch.no_grad():
        data = X_test
        targets = y_test
        data = torch.tensor(data.reshape(int(data.shape[0]), data.shape[1] * data.shape[2]), dtype=torch.float32) # reshape # of trial, 1 channel, # of samples
        data = data.to(device)
        targets = torch.tensor(targets, dtype=torch.int64)
        targets = targets.to(device)
        neural_net_predictions = neural_net(data)
        _, predicted = torch.max(neural_net_predictions.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    print(f"test accuracy {subj}= ",correct/total)
    accuracies.append(correct/total)

print(f'avg accuracy = {np.mean(accuracies)}')
