In [None]:
import numpy as np
import mne
import scipy
import sklearn
import lightgbm as lgb
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.metrics import accuracy_score

In [None]:
class CSP:
    def __init__(self,m_filters):
        self.m_filters = m_filters

    def fit(self,x_train,y_train):
        x_data = np.copy(x_train)
        y_labels = np.copy(y_train)
        n_trials, n_channels, n_samples = x_data.shape
        cov_x = np.zeros((2, n_channels, n_channels), dtype=np.float64)
        for i in range(n_trials):
            x_trial = x_data[i, :, :]
            y_trial = y_labels[i]
            cov_x_trial = np.matmul(x_trial, np.transpose(x_trial))
            cov_x_trial /= np.trace(cov_x_trial)
            cov_x[y_trial, :, :] += cov_x_trial

        cov_x = np.asarray([cov_x[cls]/np.sum(y_labels==cls) for cls in range(2)])
        cov_combined = cov_x[0]+cov_x[1]
        eig_values, u_mat = scipy.linalg.eig(cov_combined,cov_x[0])
        sort_indices = np.argsort(abs(eig_values))[::-1]
        eig_values = eig_values[sort_indices]
        u_mat = u_mat[:,sort_indices]
        u_mat = np.transpose(u_mat)

        return eig_values, u_mat

    def transform(self,x_trial,eig_vectors):
        z_trial = np.matmul(eig_vectors, x_trial)
        z_trial_selected = z_trial[:self.m_filters,:]
        z_trial_selected = np.append(z_trial_selected,z_trial[-self.m_filters:,:],axis=0)
        sum_z2 = np.sum(z_trial_selected**2, axis=1)
        sum_z = np.sum(z_trial_selected, axis=1)
        var_z = (sum_z2 - (sum_z ** 2)/z_trial_selected.shape[1]) / (z_trial_selected.shape[1] - 1)
        sum_var_z = sum(var_z)
        return np.log(var_z/sum_var_z)

In [None]:
class FBCSP:
    def __init__(self,m_filters):
        self.m_filters = m_filters
        self.fbcsp_filters_multi=[]

    def fit(self,x_train_fb,y_train):
        y_classes_unique = np.unique(y_train)
        n_classes = len(y_classes_unique)
        self.csp = CSP(self.m_filters)

        def get_csp(x_train_fb, y_train_cls):
            fbcsp_filters = {}
            for j in range(x_train_fb.shape[0]):
                x_train = x_train_fb[j, :, :, :]
                eig_values, u_mat = self.csp.fit(x_train, y_train_cls)
                fbcsp_filters.update({j: {'eig_val': eig_values, 'u_mat': u_mat}})
            return fbcsp_filters

        for i in range(n_classes):
            cls_of_interest = y_classes_unique[i]
            select_class_labels = lambda cls, y_labels: [0 if y == cls else 1 for y in y_labels]
            y_train_cls = np.asarray(select_class_labels(cls_of_interest, y_train))
            fbcsp_filters=get_csp(x_train_fb,y_train_cls)
            self.fbcsp_filters_multi.append(fbcsp_filters)

    def transform(self,x_data,class_idx=0):
        n_fbanks, n_trials, n_channels, n_samples = x_data.shape
        x_features = np.zeros((n_trials,self.m_filters*2*len(x_data)),dtype=np.float64)
        for i in range(n_fbanks):
            eig_vectors = self.fbcsp_filters_multi[class_idx].get(i).get('u_mat')
            eig_values = self.fbcsp_filters_multi[class_idx].get(i).get('eig_val')
            for k in range(n_trials):
                x_trial = np.copy(x_data[i,k,:,:])
                csp_feat = self.csp.transform(x_trial,eig_vectors)
                for j in range(self.m_filters):
                    x_features[k, i * self.m_filters * 2 + (j+1) * 2 - 2]  = csp_feat[j]
                    x_features[k, i * self.m_filters * 2 + (j+1) * 2 - 1]= csp_feat[-j-1]

        return x_features

In [None]:
def fbcsp_score(signal, labels, m_filters, fs, cv_N = 5, classifier = []):
    
    # Create 4D matrix
    filter_range = [4,40]
    step = 4
    filters_start = range(filter_range[0], filter_range[1], step)
    filtered_data_4d = []
    
    for start_freq in filters_start:
        filtered = mne.filter.filter_data(np.float64(signal), fs, start_freq, start_freq+step, verbose=0)
        filtered_data_4d.append(filtered)
    filtered_data_4d = np.stack(filtered_data_4d, axis=0)
    
    # Set verbose to 0
    mne.set_log_level(verbose='WARNING', return_old_level=False, add_frames=None)

    
    if classifier:
        features = classifier[0].transform(filtered_data_4d)
        y_pred = classifier[1].predict(features)
        acc = sklearn.metrics.accuracy_score(labels, y_pred)
        return acc
    
    else:
        # implement CV
        n_trials = labels.shape[0]
        idx_perm = np.random.permutation(n_trials)
        scores = []
        
        # CV loop
        for i in range(cv_N):
            fractions = np.array_split(idx_perm, cv_N)

            val_idx = list(fractions.pop(i))
            train_idx = list(np.concatenate(fractions))
            
            X_train = filtered_data_4d[:,train_idx,:,:]
            y_train = labels[train_idx]
            X_val = filtered_data_4d[:,val_idx,:,:]
            y_val = labels[val_idx]
            
            # Assemble classifier
            fbcsp = FBCSP(m_filters=m_filters)
#             clf = LinearDiscriminantAnalysis()
            clf = lgb.LGBMClassifier()
            
            # Train classifier
            fbcsp.fit(X_train, y_train)
            features_train = fbcsp.transform(X_train)
            clf.fit(features_train, y_train)            
            
            # Evaluate classifier
            features_val = fbcsp.transform(X_val)
            y_hat = clf.predict(features_val)
            scores.append(sklearn.metrics.accuracy_score(y_val, y_hat)) 
           
        # Train on full data
        fbcsp = FBCSP(m_filters=m_filters)
#         clf = LinearDiscriminantAnalysis()
        clf = lgb.LGBMClassifier()
        
        fbcsp.fit(filtered_data_4d, labels)
        features = fbcsp.transform(filtered_data_4d)
        clf.fit(features, labels)            
        
    return np.mean(scores), [fbcsp, clf]