# import dpeeg and set global seed

In [None]:
import dpeeg

dpeeg.set_seed()
print(dpeeg.__version__)

## FBCSPSVM

In [None]:
import scipy.linalg
import numpy as np


class CSP:
    def __init__(self,m_filters):
        self.m_filters = m_filters

    def fit(self,x_train,y_train):
        x_data = np.copy(x_train)
        y_labels = np.copy(y_train)
        n_trials, n_channels, n_samples = x_data.shape
        cov_x = np.zeros((2, n_channels, n_channels), dtype=np.float32)
        for i in range(n_trials):
            x_trial = x_data[i, :, :]
            y_trial = y_labels[i]
            cov_x_trial = np.matmul(x_trial, np.transpose(x_trial))
            cov_x_trial /= np.trace(cov_x_trial)
            cov_x[y_trial, :, :] += cov_x_trial

        cov_x = np.asarray([cov_x[cls]/np.sum(y_labels==cls) for cls in range(2)])
        cov_combined = cov_x[0]+cov_x[1]
        eig_values, u_mat = scipy.linalg.eig(cov_combined,cov_x[0])
        sort_indices = np.argsort(abs(eig_values))[::-1]
        eig_values = eig_values[sort_indices]
        u_mat = u_mat[:,sort_indices]
        u_mat = np.transpose(u_mat)

        return eig_values, u_mat

    def transform(self,x_trial,eig_vectors):
        z_trial = np.matmul(eig_vectors, x_trial)
        z_trial_selected = z_trial[:self.m_filters,:]
        z_trial_selected = np.append(z_trial_selected,z_trial[-self.m_filters:,:],axis=0)
        sum_z2 = np.sum(z_trial_selected**2, axis=1)
        sum_z = np.sum(z_trial_selected, axis=1)
        var_z = (sum_z2 - (sum_z ** 2)/z_trial_selected.shape[1]) / (z_trial_selected.shape[1] - 1)
        sum_var_z = sum(var_z)
        return np.log(var_z/sum_var_z)


class FBCSP:
    def __init__(self,m_filters):
        self.m_filters = m_filters
        self.fbcsp_filters_multi=[]

    def fit(self,x_train_fb,y_train):
        y_classes_unique = np.unique(y_train)
        n_classes = len(y_classes_unique)
        self.csp = CSP(self.m_filters)

        def get_csp(x_train_fb, y_train_cls):
            fbcsp_filters = {}
            for j in range(x_train_fb.shape[0]):
                x_train = x_train_fb[j, :, :, :]
                eig_values, u_mat = self.csp.fit(x_train, y_train_cls)
                fbcsp_filters.update({j: {'eig_val': eig_values, 'u_mat': u_mat}})
            return fbcsp_filters

        for i in range(n_classes):
            cls_of_interest = y_classes_unique[i]
            select_class_labels = lambda cls, y_labels: [0 if y == cls else 1 for y in y_labels]
            y_train_cls = np.asarray(select_class_labels(cls_of_interest, y_train))
            fbcsp_filters=get_csp(x_train_fb,y_train_cls)
            self.fbcsp_filters_multi.append(fbcsp_filters)

    def transform(self,x_data,class_idx=0):
        n_fbanks, n_trials, n_channels, n_samples = x_data.shape
        x_features = np.zeros((n_trials,self.m_filters*2*len(x_data)))
        for i in range(n_fbanks):
            eig_vectors = self.fbcsp_filters_multi[class_idx].get(i).get('u_mat')
            eig_values = self.fbcsp_filters_multi[class_idx].get(i).get('eig_val')
            for k in range(n_trials):
                x_trial = np.copy(x_data[i,k,:,:])
                csp_feat = self.csp.transform(x_trial,eig_vectors)
                for j in range(self.m_filters):
                    x_features[k, i * self.m_filters * 2 + (j+1) * 2 - 2]  = csp_feat[j]
                    x_features[k, i * self.m_filters * 2 + (j+1) * 2 - 1]= csp_feat[-j-1]

        return x_features


class FeatureSelect:
    def __init__(self, n_features_select=4, n_csp_pairs=2):
        self.n_features_select = n_features_select
        self.n_csp_pairs = n_csp_pairs
        self.features_selected_indices=[]

    def fit(self,x_train_features,y_train):
        MI_features = self.MIBIF(x_train_features, y_train)
        MI_sorted_idx = np.argsort(MI_features)[::-1]
        features_selected = MI_sorted_idx[:self.n_features_select]

        paired_features_idx = self.select_CSP_pairs(features_selected, self.n_csp_pairs)
        x_train_features_selected = x_train_features[:, paired_features_idx]
        self.features_selected_indices = paired_features_idx

        return x_train_features_selected

    def transform(self,x_test_features):
        return x_test_features[:,self.features_selected_indices]

    def MIBIF(self, x_features, y_labels):
        def get_prob_pw(x,d,i,h):
            n_data = d.shape[0]
            t=d[:,i]
            kernel = lambda u: np.exp(-0.5*(u**2))/np.sqrt(2*np.pi)
            prob_x = 1 / (n_data * h) * sum(kernel((np.ones((len(t)))*x- t)/h))
            return prob_x

        def get_pd_pw(d, i, x_trials):
            n_data, n_dimensions = d.shape
            if n_dimensions==1:
                i=1
            t = d[:,i]
            min_x = np.min(t)
            max_x = np.max(t)
            n_trials = x_trials.shape[0]
            std_t = np.std(t)
            if std_t==0:
                h=0.005
            else:
                h=(4./(3*n_data))**(0.2)*std_t
            prob_x = np.zeros((n_trials))
            for j in range(n_trials):
                prob_x[j] = get_prob_pw(x_trials[j],d,i,h)
            return prob_x, x_trials, h

        y_classes = np.unique(y_labels)
        n_classes = len(y_classes)
        n_trials = len(y_labels)
        prob_w = []
        x_cls = {}
        for i in range(n_classes):
            cls = y_classes[i]
            cls_indx = np.where(y_labels == cls)[0]
            prob_w.append(len(cls_indx) / n_trials)
            x_cls.update({i: x_features[cls_indx, :]})

        prob_x_w = np.zeros((n_classes, n_trials, x_features.shape[1]))
        prob_w_x = np.zeros((n_classes, n_trials, x_features.shape[1]))
        h_w_x = np.zeros((x_features.shape[1]))
        mutual_info = np.zeros((x_features.shape[1]))
        parz_win_width = 1.0 / np.log2(n_trials)
        h_w = -np.sum(prob_w * np.log2(prob_w))

        for i in range(x_features.shape[1]):
            h_w_x[i] = 0
            for j in range(n_classes):
                prob_x_w[j, :, i] = get_pd_pw(x_cls.get(j), i, x_features[:, i])[0]

        t_s = prob_x_w.shape
        n_prob_w_x = np.zeros((n_classes, t_s[1], t_s[2]))
        for i in range(n_classes):
            n_prob_w_x[i, :, :] = prob_x_w[i] * prob_w[i]
        prob_x = np.sum(n_prob_w_x, axis=0)
        # prob_w_x = np.zeros((n_classes, prob_x.shape[0], prob_w.shape[1]))
        for i in range(n_classes):
            prob_w_x[i, :, :] = n_prob_w_x[i, :, :]/prob_x

        for i in range(x_features.shape[1]):
            for j in range(n_trials):
                t_sum = 0.0
                for k in range(n_classes):
                    if prob_w_x[k, j, i] > 0:
                        t_sum += (prob_w_x[k, j, i] * np.log2(prob_w_x[k, j, i]))

                h_w_x[i] -= (t_sum / n_trials)

            mutual_info[i] = h_w - h_w_x[i]

        mifsg = np.asarray(mutual_info)
        return mifsg


    def select_CSP_pairs(self,features_selected,n_pairs):
        features_selected+=1
        sel_groups = np.unique(np.ceil(features_selected/n_pairs))
        paired_features = []
        for i in range(len(sel_groups)):
            for j in range(n_pairs-1,-1,-1):
                paired_features.append(sel_groups[i]*n_pairs-j)

        paired_features = np.asarray(paired_features,dtype=np.int64)-1

        return paired_features


class Classifier:
    def __init__(self,model):
        self.model = model
        self.feature_selection = False

    def predict(self,x_features):
        if self.feature_selection:
            x_features_selected = self.feature_selection.transform(x_features)
        else:
            x_features_selected = x_features
        y_predicted = self.model.predict(x_features_selected)
        return y_predicted

    def fit(self,x_features,y_train):
        feature_selection = True
        if feature_selection:
            feature_selection = FeatureSelect()
            self.feature_selection = feature_selection
            x_train_features_selected = self.feature_selection.fit(x_features,y_train)
        else:
            x_train_features_selected = x_features
        self.model.fit(x_train_features_selected,y_train)
        y_predicted = self.model.predict(x_train_features_selected)
        return y_predicted

# Subject-Dependent & Session-Dependent

In [None]:
# # -----------------------------------
# # define subject-dependent classifier
# # ----------------------------------- 
# import os
# import numpy as np
# from dpeeg.tools import Filer
# from sklearn.svm import SVR


# class Trainer:
#     def __init__(self, out_folder) -> None:
#         self.out_folder = os.path.join(os.path.abspath(out_folder), 'FBCSPSVM')
#         os.makedirs(self.out_folder, exist_ok=True)

#     def _get_multi_class_regressed(self, y_predicted):
#         y_predict_multi = np.asarray([np.argmin(y_predicted[i,:]) for i in range(y_predicted.shape[0])])
#         return y_predict_multi

#     def _run_sub(self, train_data, train_label, test_data, test_label):
#         y_classes_unique = np.unique(train_label)
#         n_classes = len(np.unique(train_label))

#         fbcsp = FBCSP(4)
#         fbcsp.fit(train_data, train_label)
#         y_train_predicted = np.zeros((train_label.shape[0], n_classes), dtype=np.float32)
#         y_test_predicted = np.zeros((test_label.shape[0], n_classes), dtype=np.float32)

#         select_class_labels = lambda cls, y_labels: [0 if y == cls else 1 for y in y_labels]

#         for j in range(n_classes):
#             cls_of_interest = y_classes_unique[j]
#             y_train_cls = np.asarray(select_class_labels(cls_of_interest, train_label))

#             x_features_train = fbcsp.transform(train_data, class_idx=cls_of_interest)
#             x_features_test = fbcsp.transform(test_data, class_idx=cls_of_interest)

#             classifier_type = SVR(gamma='auto')
#             classifier = Classifier(classifier_type)
#             y_train_predicted[:, j] = classifier.fit(x_features_train, np.asarray(y_train_cls, dtype=np.float32))
#             y_test_predicted[:, j] = classifier.predict(x_features_test)

#         y_test_predicted_multi = self._get_multi_class_regressed(y_test_predicted)
#         acc = np.sum(y_test_predicted_multi == test_label) / len(test_label)

#         return y_test_predicted_multi, test_label, acc

#     def run(self, dataset, dataset_name):
#         data_folder = os.path.join(self.out_folder, dataset_name)
#         filer = Filer(os.path.join(data_folder, 'summary.txt'))

#         results = {}
#         acc_list = []
#         for sub in dataset.keys():
#             print(f'Subject-{sub} Training')
            
#             train_data, train_label = dataset[sub]['train']
#             test_data, test_label = dataset[sub]['test']
#             train_data = train_data.transpose(1, 0, 2, 3)
#             test_data = test_data.transpose(1, 0, 2, 3)

#             preds, target, acc = self._run_sub(train_data, train_label, test_data, test_label)
#             acc_list.append(acc)
#             print(f'\tSubject-{sub} Acc = {acc}')

#             filer.write(f'sub-{sub}: {acc}\n')
#             results[f'sub_{sub}'] = {'preds': preds, 'target': target, 'acc': acc}

#         acc = np.mean(acc_list)
#         print(f'AvgAcc = {acc}')
#         filer.write(f'AvgAcc = {acc}')
#         np.save(os.path.join(data_folder, 'results'), results)

#         return results

## bciciv2a

In [None]:
# from dpeeg.data import datasets, transforms

# trans = transforms.ComposeTransforms(
#     transforms.Normalization(),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
#     transforms.Augmentation('segmentation_and_reconstruction', multiply=1),
# )

# dataset = datasets.BCICIV2A(transforms=trans, mode='single_ses')

# trainer = Trainer('out')
# results = trainer.run(dataset.dataset, 'BCICIV2A_SD')

## bciciv2b

In [None]:
# from dpeeg.data import datasets, transforms

# trans = transforms.ComposeTransforms(
#     transforms.Normalization(),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
#     transforms.Augmentation('segmentation_and_reconstruction', multiply=1)
# )

# dataset = datasets.BCICIV2B(transforms=trans, mode='single_ses', test_sessions=[1])
# trainer = Trainer('out')
# results = trainer.run(dataset.dataset, 'BCICIV2B_SD')

## openbmi

In [None]:
# from dpeeg.data import transforms, load
# from scipy import signal

# lowcut = 4
# highcut = 30
# fs = 250
# order = 5
# def butter_bandpass_filter(data):
#     nyq = 0.5 * fs
#     low = lowcut / nyq
#     high = highcut / nyq
#     b, a = signal.butter(order, [low, high], btype='bandpass')
#     y = signal.filtfilt(b, a, data)
#     return y

# trans = transforms.ComposeTransforms(
#     transforms.ApplyFunc(butter_bandpass_filter),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
#     transforms.Augmentation('segmentation_and_reconstruction', multiply=1)
# )

# dataset = load('openbmi_session-dependent')
# dataset = trans(dataset)
# trainer = Trainer('out')
# results = trainer.run(dataset, 'OpenBMI_SD')

# Subject-Dependent & Session-Independent

## bciciv2a

In [None]:
# from dpeeg.data import datasets, transforms

# trans = transforms.ComposeTransforms(
#     transforms.Normalization(),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
#     transforms.Augmentation('segmentation_and_reconstruction', multiply=1)
# )

# dataset = datasets.BCICIV2A(transforms=trans)
# trainer = Trainer('out')
# results = trainer.run(dataset, 'BCICIV2A_SI')

## bciciv2b

In [None]:
# from dpeeg.data import transforms, datasets

# trans = transforms.ComposeTransforms(
#     transforms.Normalization(),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
#     transforms.Augmentation('segmentation_and_reconstruction', multiply=1),
# )

# dataset = datasets.BCICIV2B(transforms=trans, test_sessions=[3, 4])
# trainer = Trainer('out')
# results = trainer.run(dataset, 'BCICIV2B_SI')

## openbmi

In [None]:
# from dpeeg.data import transforms, load
# from scipy import signal

# lowcut = 4
# highcut = 30
# fs = 250
# order = 5
# def butter_bandpass_filter(data):
#     nyq = 0.5 * fs
#     low = lowcut / nyq
#     high = highcut / nyq
#     b, a = signal.butter(order, [low, high], btype='bandpass')
#     y = signal.filtfilt(b, a, data)
#     return y

# trans = transforms.ComposeTransforms(
#     transforms.ApplyFunc(butter_bandpass_filter),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
#     transforms.Augmentation('segmentation_and_reconstruction', multiply=1)
# )

# dataset = load('openbmi_session-independent')
# dataset = trans(dataset)
# trainer = Trainer('out')
# results = trainer.run(dataset, 'OpenBMI_SI')

# Subject-Independent

In [None]:
# -------------------------------------
# define subject-independent classifier
# -------------------------------------
import os
import numpy as np
from dpeeg.tools import Filer
from dpeeg.data.functions import merge_train_test
from sklearn.svm import SVR


class Trainer:
    def __init__(self, out_folder) -> None:
        self.out_folder = os.path.join(os.path.abspath(out_folder), 'FBCSPSVM')
        os.makedirs(self.out_folder, exist_ok=True)

    def _get_multi_class_regressed(self, y_predicted):
        y_predict_multi = np.asarray([np.argmin(y_predicted[i,:]) for i in range(y_predicted.shape[0])])
        return y_predict_multi

    def _run_sub(self, train_data, train_label, test_data, test_label):
        y_classes_unique = np.unique(train_label)
        n_classes = len(np.unique(train_label))

        fbcsp = FBCSP(4)
        fbcsp.fit(train_data, train_label)
        y_train_predicted = np.zeros((train_label.shape[0], n_classes), dtype=np.float32)
        y_test_predicted = np.zeros((test_label.shape[0], n_classes), dtype=np.float32)

        select_class_labels = lambda cls, y_labels: [0 if y == cls else 1 for y in y_labels]

        for j in range(n_classes):
            cls_of_interest = y_classes_unique[j]
            y_train_cls = np.asarray(select_class_labels(cls_of_interest, train_label))

            x_features_train = fbcsp.transform(train_data, class_idx=cls_of_interest)
            x_features_test = fbcsp.transform(test_data, class_idx=cls_of_interest)

            classifier_type = SVR(gamma='auto')
            classifier = Classifier(classifier_type)
            y_train_predicted[:, j] = classifier.fit(x_features_train, np.asarray(y_train_cls, dtype=np.float32))
            y_test_predicted[:, j] = classifier.predict(x_features_test)

        y_test_predicted_multi = self._get_multi_class_regressed(y_test_predicted)
        acc = np.sum(y_test_predicted_multi == test_label) / len(test_label)

        return y_test_predicted_multi, test_label, acc

    def _trans_dataset(self, trainset, testset):
        if self.transforms is not None:
            sub_dataset = {-1 : {'train':list(trainset), 'test':list(testset)}}
            sub_dataset = self.transforms(sub_dataset)
            trainset = sub_dataset[-1]['train']
            testset = sub_dataset[-1]['test']

        return trainset, testset

    def _merge_sub_dataset(self, exc_sub):
        merge_dataset = {}
        for sub, data in self.dataset.items():
            if sub != exc_sub:
                merge_dataset[sub] = merge_train_test(*data.values())
        merge_dataset = merge_train_test(*merge_dataset.values())
        return merge_dataset
    
    def _process_sub_dataset(self, sub):
        print(f'Leave Subject {sub} Out')
        testset = merge_train_test(*self.dataset[sub].values())
        trainset = self._merge_sub_dataset(sub)
        trainset, testset = self._trans_dataset(trainset, testset)
        return trainset, testset

    def run(self, dataset, dataset_name, transforms = None):
        self.dataset = dataset
        self.transforms = transforms
        data_folder = os.path.join(self.out_folder, dataset_name)
        filer = Filer(os.path.join(data_folder, 'summary.txt'))

        results = {}
        acc_list = []
        for sub in dataset.keys():
            if sub in range(1, 8):
                continue
            trainset, testset = self._process_sub_dataset(sub)
            train_data, train_label = trainset[0], trainset[1]
            test_data, test_label = testset[0], testset[1]
            train_data = train_data.transpose(1, 0, 2, 3)
            test_data = test_data.transpose(1, 0, 2, 3)

            print(f'Subject-{sub} Training')
            preds, target, acc = self._run_sub(train_data, train_label, test_data, test_label)
            acc_list.append(acc)
            print(f'\tSubject-{sub} Acc = {acc}')

            filer.write(f'sub-{sub}: {acc}\n')
            result = {'preds': preds, 'target': target, 'acc': acc}
            np.save(os.path.join(data_folder, f'sub{sub}_res'), result)
            results[f'sub_{sub}'] = result

        acc = np.mean(acc_list)
        print(f'AvgAcc = {acc}')
        filer.write(f'AvgAcc = {acc}')
        np.save(os.path.join(data_folder, 'results'), results)

        return results

## bciciv2a

In [None]:
# from dpeeg.data import datasets, transforms

# trans = transforms.ComposeTransforms(
#     transforms.Normalization(),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
# )

# dataset = datasets.BCICIV2A(transforms=trans)
# trainer = Trainer('out')
# results = trainer.run(dataset, 'BCICIV2A_LOSO')

## bciciv2b

In [None]:
# from dpeeg.data import transforms, datasets

# trans = transforms.ComposeTransforms(
#     transforms.Normalization(),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
# )

# dataset = datasets.BCICIV2B(transforms=trans, test_sessions=[3, 4])
# trainer = Trainer('out')
# results = trainer.run(dataset, 'BCICIV2B_LOSO')

## openbmi

In [None]:
# from dpeeg.data import transforms, load
# from scipy import signal

# lowcut = 4
# highcut = 30
# fs = 250
# order = 5
# def butter_bandpass_filter(data):
#     nyq = 0.5 * fs
#     low = lowcut / nyq
#     high = highcut / nyq
#     b, a = signal.butter(order, [low, high], btype='bandpass')
#     y = signal.filtfilt(b, a, data)
#     return y

# trans = transforms.ComposeTransforms(
#     transforms.ApplyFunc(butter_bandpass_filter),
#     transforms.FilterBank(freq=250, filter_bank=[[4,8],[8,12],[12,16],[16,20],[20,24],[24,28],[28,32],[32,36],[36,40]]),
# )

# dataset = load('openbmi_session-independent')
# trainer = Trainer('out')
# results = trainer.run(dataset, 'OpenBMI_LOSO', trans)

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from torchmetrics.functional.classification.confusion_matrix import multiclass_confusion_matrix

results = np.load('./out/FBCSPSVM/OpenBMI_SI/results.npy', allow_pickle=True).item()

cls_name = ['R', 'L']
preds_all, target_all = [], []
for sub in range(1, 55):
    preds, target = results[f'sub_{sub}']['preds'], results[f'sub_{sub}']['target']
    preds_all.append(preds)
    target_all.append(target)
preds = np.concatenate(preds_all)
target = np.concatenate(target_all)

plt.figure(figsize=(2.5, 2.5))
cm = multiclass_confusion_matrix(torch.from_numpy(preds), torch.from_numpy(target), len(cls_name))
ax = sns.heatmap(cm / cm.sum(1, keepdim=True), annot=True, cmap='Blues', square=True, annot_kws={'size': 'xx-large'}, cbar=False,
                 xticklabels=cls_name, yticklabels=cls_name, fmt='.3f')
plt.xticks(fontsize='x-large')
plt.yticks(fontsize='x-large')
plt.axhline(0, c='k'), plt.axhline(len(cls_name), c='k')
plt.axvline(0, c='k'), plt.axvline(len(cls_name), c='k')
plt.savefig('CM.png', dpi=600, bbox_inches='tight')