In [None]:
import numpy as np
import scipy.linalg as la
import mne.filter as bandpass
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
import geomstats as gs
import geomstats.geometry.spd_matrices as spd
from geomstats.learning.frechet_mean import FrechetMean
from scipy.io import loadmat  

In [16]:
class RCSP:
    def __init__(self, channels):
        self.channels = channels
        self.V=None
      
    def bandpass(self, X, fs, fl, fh):
        X = np.swapaxes(X, 1, 2)
        X = bandpass.filter_data(X, fs, fl, fh, verbose=False)
        X = np.swapaxes(X, 1, 2)
        return X
        
    def calcucateCovariances(self, X):
        covlist = np.empty((X.shape[0], X.shape[2], X.shape[2]))
        for i in range(len(X)):
            epoch = X[i]
            covlist[i] = np.dot(epoch.T, epoch)
        return covlist
    
    def estimateMeans(self, classSpecificCOV, metric):
        if metric=="AIRM":
            estimator = FrechetMean(spd.SPDMetricAffine(n=self.channels), max_iter=64)
        elif metric=="LEM":
            estimator = FrechetMean(spd.SPDMetricLogEuclidean(n=self.channels), max_iter=64)
        else:
            raise Exception("Not implemented metric")
            
        means = []
        
        for COV in classSpecificCOV:
            estimator.fit(COV)
            mean = estimator.estimate_
            means.append(mean)
        return means
    
    
    def separate_classes(self, X, Y):
        classSpecificCOV = []
        for i in range(2): 
            ind = np.where(Y==i)[0]
            classCOV = X[ind]
            classSpecificCOV.append(classCOV)
        return classSpecificCOV
    
    def CSP(self, X, Y, metric, n):
        classSpecificCOV = self.separate_classes(X, Y)
        if metric == "classic":
            class0_avg = sum(classSpecificCOV[0])/len(classSpecificCOV[0])
            class1_avg = sum(classSpecificCOV[1])/len(classSpecificCOV[1])
        else:
            class0_avg, class1_avg = self.estimateMeans(classSpecificCOV, metric)

        _,V = la.eigh(class0_avg, class0_avg+class1_avg)
        V = np.concatenate((V[:, :n], V[:, -n:]), axis=1)
        
        return V
    
    def applyCSP(self, dk, V):
        a = np.dot(np.dot(V.T, dk), V) 
        f = np.log(np.diagonal(a)/np.trace(a)) 
        return f
    
    def get_mi_features(self, data, metric="classic", mode="train", fs=1000, fl=8, fh=30, n=3):

        channels = [7, 8, 9, 10, 12, 13, 14, 17, 18, 19, 20, 32, 33, 34, 35, 36, 37, 38, 39, 40]

        X = data['smt'][0][0][:, :, channels]
        Y = data['y_dec'][0][0][0]

        X = np.swapaxes(X, 0, 1)
        Y = Y-1

        filteredX = self.bandpass(X, fs, fl, fh)

        covX = self.calcucateCovariances(filteredX)
        
        if mode=="train":
            V = self.CSP(covX, Y, metric, n)
            self.V = V
        else:
            if self.V is None:
                raise Exception("train CSP first please")
            V=self.V
        
        features = np.empty((len(X), 2*n))
  
        for i in range(len(X)):
            epoch = covX[i]
            features[i] = self.applyCSP(epoch, V) 
            
        return features, Y

In [17]:
csp = RCSP(20)

In [18]:
metric = "classic"

In [13]:
print("Metric is " + metric)
for i in range(1,6):
    filename = 'datasets/sess01_subj0{}_EEG_MI.mat'.format(i)
    data = loadmat(filename)
    train = data['EEG_MI_train']
    test = data['EEG_MI_test']
    
    trainX, trainY = csp.get_mi_features(train, metric=metric, mode="train")
    testX, testY = csp.get_mi_features(test, metric=metric, mode="test")
    clf = LDA()
    clf.fit(trainX, trainY)
    res = clf.score(testX, testY)
    print("Subject " + str(i) + ", accuracy = " + str(res))

Metric is classic
Subject 1, accuracy = 0.78
Subject 2, accuracy = 0.92
Subject 3, accuracy = 0.92
Subject 4, accuracy = 0.49
Subject 5, accuracy = 0.86


In [19]:
metric = "AIRM"

In [20]:
print("Metric is " + metric)
for i in range(1,6):
    filename = 'datasets/sess01_subj0{}_EEG_MI.mat'.format(i)
    data = loadmat(filename)
    train = data['EEG_MI_train']
    test = data['EEG_MI_test']
    
    trainX, trainY = csp.get_mi_features(train, metric=metric, mode="train")
    testX, testY = csp.get_mi_features(test, metric=metric, mode="test")
    clf = LDA()
    clf.fit(trainX, trainY)
    res = clf.score(testX, testY)
    print("Subject " + str(i) + ", accuracy = " + str(res))

Metric is AIRM
Subject 1, accuracy = 0.74
Subject 2, accuracy = 0.88
Subject 3, accuracy = 0.93
Subject 4, accuracy = 0.54
Subject 5, accuracy = 0.88


In [21]:
metric = "LEM"

In [22]:
print("Metric is " + metric)
for i in range(1,6):
    filename = 'datasets/sess01_subj0{}_EEG_MI.mat'.format(i)
    data = loadmat(filename)
    train = data['EEG_MI_train']
    test = data['EEG_MI_test']
    
    trainX, trainY = csp.get_mi_features(train, metric=metric, mode="train")
    testX, testY = csp.get_mi_features(test, metric=metric, mode="test")
    clf = LDA()
    clf.fit(trainX, trainY)
    res = clf.score(testX, testY)
    print("Subject " + str(i) + ", accuracy = " + str(res))

Metric is LEM
Subject 1, accuracy = 0.7
Subject 2, accuracy = 0.88
Subject 3, accuracy = 0.91
Subject 4, accuracy = 0.59
Subject 5, accuracy = 0.86
