In [1]:
import scipy.io
import os
import numpy as np
import pandas as pd
import time
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import cohen_kappa_score
import Butterworth_filtering

from BW_metric import BW_dist
from AI_metric import AI_dist
from manifold_project_toolbox import F_dist
from manifold_project_toolbox import bw_projection_mean
from manifold_project_toolbox import ai_projection_mean
from manifold_project_toolbox import x2corr

from load_BCI_IV_IIa import import_data

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all" # enable multiple output in one cell

In [2]:
def check_weights(weights, n_weights, *, check_positivity=False):
    
    if weights is None:
        weights = np.ones(n_weights)

    else:
        weights = np.asarray(weights)
        if weights.shape != (n_weights,):
            raise ValueError(
                "Weights do not have the good shape. Should be (%d,) but got "
                "%s." % (n_weights, weights.shape,)
            )
        if check_positivity and any(weights <= 0):
            raise ValueError("Weights must be strictly positive.")

    weights /= np.sum(weights)
    return weights
    
def mean_euclid(X, sample_weight=None):
    return np.average(X, axis=0, weights=sample_weight)

def _matrix_operator(C, operator):
    """Matrix function."""
    if not isinstance(C, np.ndarray) or C.ndim < 2:
        raise ValueError("Input must be at least a 2D ndarray")
    if C.dtype.char in np.typecodes['AllFloat'] and (
            np.isinf(C).any() or np.isnan(C).any()):
        raise ValueError(
            "Matrices must be positive definite. Add "
            "regularization to avoid this error.")
    eigvals, eigvecs = np.linalg.eigh(C)
    eigvals = operator(eigvals)
    if C.ndim >= 3:
        eigvals = np.expand_dims(eigvals, -2)
    D = (eigvecs * eigvals) @ np.swapaxes(eigvecs.conj(), -2, -1)
    return D

def sqrtm(C):
    return _matrix_operator(C, np.sqrt)

def invsqrtm(C):
    def isqrt(x): return 1. / np.sqrt(x)
    return _matrix_operator(C, isqrt)

def logm(C):
    return _matrix_operator(C, np.log)

def expm(C):
    return _matrix_operator(C, np.exp)

import warnings
def mean_riemann(X, *, tol=10e-9, maxiter=50, init=None, sample_weight=None):
    
    dists = []
    ssds = []
    
    n_matrices, n, _ = X.shape
    sample_weight = check_weights(sample_weight, n_matrices)
    if init is None:
        M = mean_euclid(X, sample_weight=sample_weight)
    else:
        M = check_init(init, n)
    
    nu = 1.0
    tau = np.finfo(np.float64).max
    crit = np.finfo(np.float64).max
    for i in range(maxiter):
        print(i)
        M12, Mm12 = sqrtm(M), invsqrtm(M)
        J = np.einsum("a,abc->bc", sample_weight, logm(Mm12 @ X @ Mm12))
        M_new = M12 @ expm(nu * J) @ M12

        # dist_mean = AI_dist(M_new, M)
        # dists.append(dist_mean)
        # ssd_new = np.sum(np.array([AI_dist(M_new,X[i])**2 for i in range(X.shape[0])]))
        # ssds.append(ssd_new)

        M = M_new
        
        crit = np.linalg.norm(J, ord="fro")
        h = nu * crit
        if h < tau:
            nu = 0.95 * nu
            tau = h
        else:
            nu = 0.5 * nu

        # print("crit <= tol:", crit <= tol, "nu <= tol:", nu <= tol)
        
        if crit <= tol or nu <= tol:
            break
        else:
            warnings.warn("Convergence not reached")

    # return M, np.array(dists), np.array(ssds), i+1
    return M

In [3]:
def MDM_dist(A, B, method):
    if method == "BW":
        output = BW_dist(A,B)
    elif method == "AI":
        output = AI_dist(A,B)
    elif method == "Euc":
        output = F_dist(A,B)
    return output

def MDM_mean(x, eps, method, verbose=False):
    # input: psd matrices [N,n,n]
    # output: mean matrix [n,n]
    if method == "BW":
        output = bw_projection_mean(x, eps, verbose=verbose)
    elif method == "AI":
        output = mean_riemann(x, tol=eps)
        # output = ai_projection_mean(x, eps, verbose=verbose)
    elif method == "Euc":
        output = np.mean(x, axis=0) # arithmetic mean
    return output

In [12]:
# BCI IV IIa
subject = 9
data, label = import_data("A0"+str(subject)+"T")
data_filtered = Butterworth_filtering.filter_all(data, order=5, lowcut=8, highcut=30, fs=250)

# left hand (class 1) vs. right hand (class 2)
idx = np.where(np.isin(label, [1, 2]))[0]
sample = x2corr(data_filtered)
sample = sample[idx]
label = label[idx]

sample.shape
label.shape

# from sklearn.covariance import LedoitWolf

# shrunk_matrices = []
# shrinkage_values = []
# for mat in sample:
#     lw = LedoitWolf()
#     lw.fit(mat)
#     shrunk_matrices.append(lw.covariance_)
#     shrinkage_values.append(lw.shrinkage_)

# sample = np.array(shrunk_matrices)
# sample.shape

(144, 22, 22)

(144,)

In [5]:
accuracy_all = {'AI': {}, 'BW': {}, 'Euc': {}}
execution_time_all = {'AI': {}, 'BW': {}, 'Euc': {}}

for method in ['AI','BW','Euc']:
    
    # parameter setting
    repetition = 10 # number of reps for cv
    k = 6 # k-fold cv
    
    accuracy = np.zeros((repetition, k), dtype = np.float32)
    execution_time = np.zeros((repetition, k), dtype = np.float32)
    
    for rep_idx in range(repetition):
    
        ##### k-fold split
        kfold = StratifiedKFold(n_splits=k, shuffle=True, random_state=rep_idx+22)
        x_train_kfold = []
        y_train_kfold = []
        x_test_kfold = []
        y_test_kfold = []
        for train_idx, test_idx in kfold.split(sample, label):
            x_train_kfold.append(sample[train_idx])
            y_train_kfold.append(label[train_idx])
            x_test_kfold.append(sample[test_idx])
            y_test_kfold.append(label[test_idx])
    
        ##### MDM
        for fold_idx in range(k):
            # choose fold
            x_train = x_train_kfold[fold_idx]
            y_train = y_train_kfold[fold_idx]
            x_test = x_test_kfold[fold_idx]
            y_test = y_test_kfold[fold_idx]
            # compute mean of each class
            start_time = time.time() # timer starts
            mean1 = MDM_mean(x_train[np.where(y_train==1)], 0.01, method, verbose=True)
            mean2 = MDM_mean(x_train[np.where(y_train==2)], 0.01, method, verbose=True)
            # assign each testing sample to the nearest class mean
            y_pred = np.zeros(x_test.shape[0], dtype = np.float32)
            for test_idx in range(x_test.shape[0]):
                dist1 = MDM_dist(mean1, x_test[test_idx], method)
                dist2 = MDM_dist(mean2, x_test[test_idx], method)
                y_pred[test_idx] = 1 if dist1<dist2 else 2
            end_time = time.time() # timer stops
            # record accuracy and execution time
            accuracy[rep_idx,fold_idx] = sum(y_pred==y_test)/len(y_test)
            execution_time[rep_idx,fold_idx] = end_time - start_time
            
            print("Fold " + str(fold_idx+1) + " completed.")
        print("Repetition " + str(rep_idx+1) + " completed.")

    accuracy_all[method] = accuracy
    execution_time_all[method] = execution_time

0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 1 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2




Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 2 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0




1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 3 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 4 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1




0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 5 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2




Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 6 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0




1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 7 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1




0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 8 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1




2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 9 completed.
0
1
0
1
2
Fold 1 completed.
0
1
0
1
2
Fold 2 completed.
0
1
0
1
2
Fold 3 completed.
0
1
0
1
2
Fold 4 completed.
0
1
0
1
2
Fold 5 completed.
0
1
0
1
2
Fold 6 completed.
Repetition 10 completed.
Iter1, dist_mean=0.0144534
Iter2, dist_mean=4.98e-05
Iter1, dist_mean=0.0533245
Iter2, dist_mean=0.0005055
Fold 1 completed.
Iter1, dist_mean=0.0144811
Iter2, dist_mean=4e-05
Iter1, dist_mean=0.0570116
Iter2, dist_mean=0.0005407
Fold 2 completed.
Iter1, dist_mean=0.0142854
Iter2, dist_mean=4.41e-05
Iter1, dist_mean=0.0556182
Iter2, dist_mean=0.0003583
Fold 3 completed.
Iter1, dist_mean=0.0141747
Iter2, dist_mean=4.46e-05
Iter1, dist_mean=0.0561523
Iter2, dist_mean=0.0004242
Fold 4 completed.
Iter1, dist_mean=0.0153104
Iter2, dist_mean=4.99e-05
Iter1, dist_mean=0.0563364
Iter2, dist_mean=0.000443
Fold 5 completed.
Iter1, dist_mean=0.0145322
Ite

In [6]:
np.mean(accuracy_all['AI'])
np.mean(accuracy_all['BW'])
np.mean(accuracy_all['Euc'])

np.float32(0.8854167)

np.float32(0.8645834)

np.float32(0.8423612)

In [7]:
np.mean(execution_time_all['AI'])
np.mean(execution_time_all['BW'])
np.mean(execution_time_all['Euc'])

np.float32(0.033605594)

np.float32(0.1362689)

np.float32(0.000880599)

In [8]:
save_dir = "D:/Projects/Scientific Report/MDRM with LedoitWolf/BCI IV IIa/"
rows = ['Rep{}'.format(i+1) for i in list(range(repetition))]
cols = ['Fold{}'.format(i+1) for i in list(range(k))]

for method in ['AI','BW','Euc']:
    
    accuracy_df = pd.DataFrame(accuracy_all[method], index=rows, columns=cols)
    execution_time_df = pd.DataFrame(execution_time_all[method], index=rows, columns=cols)
    
    accuracy_df.to_csv(save_dir+"A0"+str(subject)+"T"+"_accuracy_"+method+".csv")
    execution_time_df.to_csv(save_dir+"Execution Time/"+"A0"+str(subject)+"T"+"_execution_time_"+method+".csv")