# COGS 189 Final Project 

**Team Name**  
Argus

**Team Members**  
Xiaolong Huang	(xih002@ucsd.edu),  
Bohan Lei		(blei@ucsd.edu),  
Mingze Xu		(m6xu@ucsd.edu),  
Weijia Zeng		(wezeng@ucsd.edu)     

**Project Goal**
- To fully understand and implement the vanilla Common Spatial Pattern (CSP) algorithm for EEG pattern recognition.
- To experiment with different types of Common Spatial Pattern (CSP) and Linear Discriminant Analysis (LDA) algorithms in motor imagery signal pattern classification.
- To improve motor imagery signal pattern classification accuracy by different CSP and LDA hyperparameter configurations.

**Sources**
- Data source: https://github.com/bregydoc/bcidatasetIV2a
- Data explanation: https://www.bbci.de/competition/iv/desc_2a.pdf
- CSP paper: https://ieeexplore.ieee.org/document/4408441
- SACSP paper: https://cogsci.ucsd.edu/~desa/Winter_conference_on_bci_2022.pdf

## Section 1: Setup

In [65]:
import numpy as np
from tqdm import tqdm
from scipy.linalg import eigh
from scipy.signal import butter, sosfiltfilt, sosfreqz
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.linear_model import LinearRegression as LR
import scipy.linalg

## Section 2: Data Import

In [3]:
n_subject = 9

data_dir = './data/'
data_prefix = 'A'
data_suffix = '.npz'
data_type = {'train':'T', 'test':'E'}
data_path = data_dir + data_prefix + '{subject:02d}{type_:s}' + data_suffix

In [4]:
raw_train_subject = []
raw_test_subject = []

for subject_num in range(1, n_subject+1):
    train_path = data_path.format(subject=subject_num, type_=data_type['train'])
    test_path  = data_path.format(subject=subject_num, type_=data_type['test'])
    raw_train_subject.append(np.load(train_path))
    raw_test_subject.append(np.load(test_path))

In [5]:
class_code = {'left':769, 'right':770, 'foot':771, 'tongue':772}

In [6]:
# globally
fs = 250                                    # sampling frequency

# within in each epoch
cue_start = 2.                              # cue starts at
cue_end = 3.25                              # cue ends at
smr_start = 3.                              # smr starts after
smr_end = 6.                                # smr ends after
cue_start_offset = round(cue_start * fs)    # the first cue sample is at 
cue_end_offset   = round(cue_end * fs)      # the last cue sample is at
smr_start_offset = round(smr_start * fs)    # the first smr sample is after
smr_end_offset   = round(smr_end * fs)      # the last smr sample is after

In [7]:
train_subject = []
for raw_data in raw_train_subject: 
    signal = raw_data['s'].T
    cue_class = raw_data['etyp']
    cue_position = raw_data['epos']
    
    X_train = {}
    
    for name, code in class_code.items():
        epoch_start = cue_position[cue_class==code] - cue_start_offset
        
        epochs = []
        for i in range(len(epoch_start)):
            epochs.append(signal[:, epoch_start[i]+smr_start_offset:epoch_start[i]+smr_end_offset])
        epochs = np.stack(epochs)
        
        X_train[name] = epochs
    
    train_subject.append(X_train)

In [9]:
test_subject = []
for raw_data in raw_test_subject: 
    signal = raw_data['s'].T
    cue_class = raw_data['etyp']
    cue_position = raw_data['epos']
    
    X_test = {}
    
    name = 'unknown'
    code = 783
    
    epoch_start = cue_position[cue_class==code] - cue_start_offset

    epochs = []
    for i in range(len(epoch_start)):
        epochs.append(signal[:, epoch_start[i]+smr_start_offset:epoch_start[i]+smr_end_offset])
    epochs = np.stack(epochs)

    X_test[name] = epochs
    
    test_subject.append(X_test)

In [16]:
train_subject[0]['left']

array([[[ -0.24414062,   3.17382812,   0.        , ...,  -9.32617188,
          -7.12890625, -14.50195312],
        [  4.58984375,   2.9296875 ,  -0.87890625, ..., -12.15820312,
         -11.62109375, -16.69921875],
        [ -0.73242188,   1.61132812,  -1.85546875, ..., -10.7421875 ,
          -9.32617188, -16.2109375 ],
        ...,
        [  4.8828125 ,   8.30078125,   6.34765625, ...,   2.9296875 ,
           4.8828125 ,   2.44140625],
        [  0.9765625 ,   6.34765625,   3.90625   , ...,   8.30078125,
           3.90625   ,  -2.9296875 ],
        [  7.32421875,   6.34765625,   7.8125    , ...,  -0.48828125,
           3.41796875,  -4.8828125 ]],

       [[-11.1328125 ,  -6.93359375,  -1.61132812, ...,   4.54101562,
           1.51367188,   1.02539062],
        [-12.5       ,  -7.76367188,  -2.97851562, ...,  -1.31835938,
          -2.97851562,  -1.7578125 ],
        [-11.37695312,  -3.80859375,   2.734375  , ...,   3.27148438,
           0.43945312,  -0.5859375 ],
        ...,


## Section 3: Data Processing

In [24]:
fs = 250
lowcut = 9
highcut = 14

In [25]:
def butter_bandpass(lowcut, highcut, fs, order = 2):
        nyq = 0.5 * fs
        low = lowcut / nyq
        high = highcut / nyq
        sos = butter(order, [low, high], analog = False, btype = 'band', output = 'sos')
        return sos

def butter_bandpass_filter(data, lowcut, highcut, fs, order = 2):
        sos = butter_bandpass(lowcut, highcut, fs, order = order)
        y = sosfiltfilt(sos, data)
        return y

In [33]:
for i in tqdm(range(len(train_subject))):
    for key in train_subject[i]:
        X_bandpass = np.apply_along_axis(butter_bandpass_filter, 2, train_subject[i][key], lowcut, highcut, fs, 4)
        T = X_bandpass.shape[2]
        #X = (X_bandpass / np.sqrt(T)) @ (np.identity(T) - np.ones((T,T)))
        train_subject[i][key] = X_bandpass - np.tile( X_bandpass.mean(axis=2).reshape(X_bandpass.shape[0],X_bandpass.shape[1], 1), X_bandpass.shape[2]) 

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 9/9 [01:03<00:00,  7.07s/it]


In [35]:
train_subject[0]['left'][0][0]

array([ 0.01302651,  0.01293258,  0.0120975 ,  0.01020444,  0.00699871,
        0.00233626, -0.00377486, -0.01114585, -0.01939584, -0.02796113,
       -0.03612626, -0.04307584, -0.04796413, -0.04999676, -0.04851768,
       -0.04309299, -0.03358284, -0.0201932 , -0.00350026,  0.01555751,
        0.03572206,  0.05549248,  0.07323173,  0.08729609,  0.09617764,
        0.09864762,  0.09388809,  0.08159888,  0.06206846,  0.03619965,
        0.00548454, -0.02807289, -0.06208343, -0.09394103, -0.1210211 ,
       -0.14089388, -0.15153549, -0.15151861, -0.14016564, -0.11764911,
       -0.08502808, -0.04421402,  0.00213469,  0.05078457,  0.09815422,
        0.14058079,  0.17460573,  0.19725733,  0.20630676,  0.20047586,
        0.17957758,  0.14457531,  0.09755301,  0.04159541, -0.0194155 ,
       -0.08107408, -0.13877531, -0.18806541, -0.22499162, -0.24642337,
       -0.25031832, -0.23591061, -0.20380441, -0.15596262, -0.09558917,
       -0.02691159,  0.04512141,  0.11520515,  0.17807968,  0.22

## Section 4: Data Selection

In [130]:
subject_num = 0
class_pos = 'left'
class_neg = 'right'

train_test_ratio = 0.6

In [131]:
X_pos = train_subject[subject_num][class_pos]
X_neg = train_subject[subject_num][class_neg]

np.random.shuffle(X_pos)
np.random.shuffle(X_neg)

X_test_pos = X_pos[int(n_trials * train_test_ratio):, :, :]
X_test_neg = X_neg[int(n_trials * train_test_ratio):, :, :]

X_train_pos = X_pos[:int(n_trials * train_test_ratio), :, :]
X_train_neg = X_neg[:int(n_trials * train_test_ratio), :, :]

X_train = np.concatenate((X_train_pos, X_train_neg), axis=0)
y_train = np.concatenate((np.ones(X_train_pos.shape[0]), -np.ones(X_train_neg.shape[0])), axis=0)

X_test = np.concatenate((X_test_pos, X_test_neg), axis=0)
y_test = np.concatenate((np.ones(X_test_pos.shape[0]), -np.ones(X_test_neg.shape[0])), axis=0)

X_train, y_train = shuffle(X_train, y_train)
X_test, y_test = shuffle(X_test, y_test)

In [132]:
y_test.shape

(58,)

## Section 5: Common Spatial Pattern

In [133]:
def compute_covariance_matrix(X):
    return np.sum(X @ X.transpose((0,2,1)), axis=0) / X.shape[0]

def compute_spatial_filters(S1, S2):
    return scipy.linalg.eigh(S1, S1+S2)

In [134]:
S1 = compute_covariance_matrix(X_train_pos)
S2 = compute_covariance_matrix(X_train_neg)
L, W = compute_spatial_filters(S1, S2)

In [135]:
W.shape

(25, 25)

## Section 6: Linear Discriminant Analysis

In [136]:
J = 6

In [137]:
def Process_w(w, j):
    processed_w = np.delete(w, list(range(int(j/2), int(25-j/2))) , axis = 1)
                            
    return processed_w

In [138]:
def filter_input(data, w, j):
    processed_data = []
    for k in range(data.shape[0]):
        processed_data_temp = [1]
        for i in range(0,j):
            temp_data = np.log(w[:,i].T @ data[k,:,:] @ data[k,:,:].T @ w[:,i])
            processed_data_temp.append(temp_data)
        processed_data.append(np.array(processed_data_temp))
    return np.vstack(processed_data)

In [139]:
def CSPFilter(data, w, label, j):
    w = np.array(w)
    data = np.array(data)
    model = LDA()
    processed_data = filter_input(data, w, j)
    model.fit(processed_data,label)
    return model

In [140]:
W_ = Process_w(W, J)
model = CSPFilter(X_train, W_, y_train, J)
model.score(filter_input(X_test, W_, J), y_test)

0.8103448275862069