In [1]:
import numpy as np
import scipy
import os
from sklearn import svm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, confusion_matrix
np.random.seed(0)

In [2]:
# Read Files
def load_data(dir_path):
    '''
        Function to Read Data from mat Files
    '''
    
    mat_files = []
    for m in os.listdir(dir_path):
        if m.endswith('.mat'):
            mat_files.append(m)
    
    data = {}
    for sbj, file in enumerate(mat_files):
        file_path = os.path.join(dir_path, file)
        mat_data = scipy.io.loadmat(file_path)
        data[sbj] = (mat_data)

    return data

In [3]:
def FilterBandPass(CNT, Wn, Fs, order = 3):    
    Wn = np.array([Wn[0] / (Fs/2), Wn[1] / (Fs/2)])
    
    # Filter Design
    type = 'bandpass'
    b, a = scipy.signal.butter(order, Wn, btype = 'bandpass')
    
    # Apply Filter
    CNT = scipy.signal.filtfilt(b, a, CNT, axis = 0)
    
    return CNT

In [4]:
# CAR Filter for Spatial Filtering
def CARFilter(Data):
    M = np.mean(Data, axis = 1)
    
    for i in range(Data.shape[1]):
        Data[:, i] = Data[:, i] - M
    
    return Data

In [5]:
def CSP(Train1, Train2, m):
    Rh = 0
    for i in range(Train1.shape[2]):
        Data = Train1[:, :, i]

        # Normalize Data
        M = np.mean(Data, axis = 0)

        for i in range(Data.shape[0]):
            Data[i, :] = Data[i, :] - M

        # Cov Matrix Calculation
        Rh += Data.T @ Data / np.trace(Data.T @ Data)

    Rf = 0
    for i in range(Train2.shape[2]):
        Data = Train2[:, :, i]

        # Normalize Data
        M = np.mean(Data, axis = 0)

        for i in range(Data.shape[0]):
            Data[i, :] = Data[i, :] - M

        # Cov Matrix Calculation
        Rf += Data.T @ Data / np.trace(Data.T @ Data)


    # Normalize Cov Matrix
    Rf /= Train2.shape[2]
    Rh /= Train1.shape[2]

    # Eigen Value Decomposition
    u, v = scipy.linalg.eig(Rh, Rf)
    # u = np.real(u)

    # Sort eigenvalues and eigenvectors by eigenvalues
    sorted_indices = np.argsort(u)[::-1]  # Sort in descending order
    u = u[sorted_indices]
    v = v[:, sorted_indices]

    Wm = list(range(m)) + list(range(v.shape[1] - m, v.shape[1]))
    W = v[:, Wm]

    return W

In [6]:
dir_path = './Dataset'
data = load_data(dir_path)
np.random.seed(0)

AvailbaleDataKeys = data.keys()
# AvailbaleDataKeys = [6]

AccAll = list()

for key in list(AvailbaleDataKeys):
    
    # Read Data
    CNT = data[key]['cnt']
    nfo = data[key]['nfo']
    mrk = data[key]['mrk']
    fs = nfo['fs'][0][0][0][0]
    Steps = 4
    Bands = np.array([np.arange(4, 37, Steps), np.arange(8, 41, Steps)])
#     CNT = FilterBandPass(CNT, [8, 30], fs)

    for iter in range(Bands.shape[1]):

        ## Filtering Data
        cnt = FilterBandPass(CNT, Bands[:, iter], fs)

        # Spatial Filtering
        cnt = CARFilter(cnt)

        # Trial Seperation
        Ltr = 4 * fs
        Pos = mrk['pos'][0][0][0]
        Group = mrk['y'][0][0][0]

        data1 = np.zeros((Ltr, cnt.shape[1], len(Group)//2))
        data2 = np.zeros((Ltr, cnt.shape[1], len(Group)//2))
        c1 = 0
        c2 = 0

        for i in range(len(Pos)):
            Idx = range(Pos[i], Pos[i] + Ltr)
            Trial = cnt[Idx, :]

            if Group[i] == 1:
                data1[:, :, c1] = Trial
                c1 += 1
            elif Group[i] == -1:
                data2[:, :, c2] = Trial
                c2 += 1

        ## Train and Test Data Separation
        # Index Creation
        Div = 0.8
        TrainIdx = np.arange(int(data1.shape[2] * Div))
        TestIdx = np.arange(int(data1.shape[2] * Div), data1.shape[2])
        
        #print(TrainIdx)
        #print(TestIdx)

        # Data Seperation
        Train1 = data1[:, :, TrainIdx]
        Train2 = data2[:, :, TrainIdx]

        Test1 = data1[:, :, TestIdx]
        Test2 = data2[:, :, TestIdx]

        # CSP Weight Matrix Creation
        M = 2
        W = CSP(Train1, Train2, M)


        # Train Feature Extraction
        Train = np.concatenate((Train1, Train2), axis = 2)    
        FeatureTrain = np.zeros((2 * M, Train.shape[2]))

        for i in range(Train.shape[2]):
            tmp = Train[:, :, i].T
            tmp = W.T @ tmp
            FeatureTrain[:, i] = np.var(tmp, axis = 1)

        # Test Feature Extraction
        Test = np.concatenate((Test1, Test2), axis = 2) 
        FeatureTest = np.zeros((2 * M, Test.shape[2]))

        for i in range(Test.shape[2]):
            tmp = Test[:, :, i].T
            tmp = W.T @ tmp
            FeatureTest[:, i] = np.var(tmp, axis = 1)
        
        if iter == 0:
            FTrain = FeatureTrain
            FTest = FeatureTest 
        else:
            FTrain = np.append(FTrain, FeatureTrain, axis = 0)
            FTest = np.append(FTest, FeatureTest, axis = 0) 


    # Label Creation
    LabelTrain = np.concatenate((np.ones(Train1.shape[2]), 2* np.ones(Train2.shape[2])))
    LabelTest = np.concatenate((np.ones(Test1.shape[2]), 2* np.ones(Test2.shape[2])))
    
    # Feature Selection Using Fishers Discriminant Ratio
    FDR = np.zeros(FTrain.shape[0])

    for k in range(FDR.size):
        M1 = np.mean(FTrain[k, :80])
        M2 = np.mean(FTrain[k, 80:])
        V1 = np.var(FTrain[k, :80])
        V2 = np.var(FTrain[k, 80:])

        FDR[k] = (M1 - M2)**2 / (V1 + V2)

    # Number of Features
    NumF = 6
    Idx = (-FDR).argsort()[:NumF]
    
    # Filter Data
    FTrain = FTrain[Idx, :]
    FTest = FTest[Idx, :]
    
    # Train and Test Classifier
    mdl = KNeighborsClassifier(n_neighbors=5, n_jobs=4)
    # mdl = svm.SVC(kernel='linear', C = 1.0)

    mdl.fit(FTrain.T, LabelTrain)
    LabelPredict = mdl.predict(FTest.T)

    Acc = accuracy_score(LabelTest.T, LabelPredict)
    AccAll.append(Acc)

    ConfMat = confusion_matrix(LabelTest, LabelPredict)
    
    # Print Metrics
    print(f'Subject {key + 1}')
    print("Accuracy : ", accuracy_score(LabelTest, LabelPredict))
    print(f"Class 1 Accuracy : {(np.diag(ConfMat) / ConfMat.sum(axis=1))[0] * 100}")
    print(f"Class 2 Accuracy : {(np.diag(ConfMat) / ConfMat.sum(axis=1))[1] * 100}")
    print("Confusion Matrix : \n", ConfMat)
    print('-----------------------------------------------------------------')
    
print(f"\n >>> Mean Accuracy ove All Subjects: {np.mean(AccAll)}")

Subject 1
Accuracy :  0.6
Class 1 Accuracy : 80.0
Class 2 Accuracy : 40.0
Confusion Matrix : 
 [[16  4]
 [12  8]]
-----------------------------------------------------------------
Subject 2
Accuracy :  0.5
Class 1 Accuracy : 15.0
Class 2 Accuracy : 85.0
Confusion Matrix : 
 [[ 3 17]
 [ 3 17]]
-----------------------------------------------------------------
Subject 3
Accuracy :  0.825
Class 1 Accuracy : 90.0
Class 2 Accuracy : 75.0
Confusion Matrix : 
 [[18  2]
 [ 5 15]]
-----------------------------------------------------------------
Subject 4
Accuracy :  0.75
Class 1 Accuracy : 70.0
Class 2 Accuracy : 80.0
Confusion Matrix : 
 [[14  6]
 [ 4 16]]
-----------------------------------------------------------------
Subject 5
Accuracy :  0.675
Class 1 Accuracy : 60.0
Class 2 Accuracy : 75.0
Confusion Matrix : 
 [[12  8]
 [ 5 15]]
-----------------------------------------------------------------
Subject 6
Accuracy :  0.875
Class 1 Accuracy : 90.0
Class 2 Accuracy : 85.0
Confusion Matrix : 