# IMPORTS AND LOADS

In [1]:
import scipy
import numpy
from scipy.signal import periodogram
import scipy.io as io
import numpy as np
from matplotlib import pyplot as plt
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression as LR
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier as  RFC
from sklearn.metrics import accuracy_score as acc
import sklearn
from sklearn.neural_network import MLPClassifier
from sklearn.base import clone
from tqdm import tqdm
import pickle

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Following arrays are used to determine the letter after the prediction of the model:

In [3]:
DICT_1D = []
for i in range(26):
    DICT_1D.append(chr(i + 65))
for i in range(10):
    DICT_1D.append(i)
DICT_1D = np.array(DICT_1D)
DICT_2D = DICT_1D.reshape(6, 6)

In [5]:
train_path = '/content/drive/MyDrive/Data/Train/TrainData'
test_path = '/content/drive/MyDrive/Data/Test/TestData'
train_data = []
test_data = []

for i in range(9):
    temp = io.loadmat(train_path + str(i+1) + '.mat')
    train_data.append(temp['TrainData'+ str(i+1)])

    temp = io.loadmat(test_path + str(i+1) + '.mat')
    test_data.append(temp['TestData' + str(i+1)])

    del temp

In [6]:
Ts = train_data[0][0][1] - train_data[0][0][0]
Fs = 1 / Ts

# 1 Primary Classes

## 1.1 Feature Operations Class

In [133]:
class Feature():
    
    def __init__(self, person):
        self.person = person

        # Features related to only one channel

        self.lens_ch = np.array([1, 1, 1, person.window, 1, 1, 1, person.window // 2, 1, 1, 7, 1])
        self.funcs_ch = [Feature.time_mean,
                         Feature.time_var,
                         Feature.time_skew,
                         Feature.stack_time,
                         Feature.fft_mean,
                         Feature.fft_var,
                         Feature.fft_skew,
                         Feature.stack_fft,
                         Feature.ft_median,                      
                         Feature.ft_mean_freq,
                         Feature.bandpower,
                         Feature.f_max]

        # Features that are related to the relation of more than one channel

        self.lens_cs = np.array([1, 1])
        self.funcs_cs = [Feature.time_cor,
                         Feature.fft_cor]

        self.funcs = self.funcs_ch + self.funcs_cs


    ####################################
    # These are different models with different hyperparameters
    # Last function calls each one and gets the best accuracy
    # Using accuracy as the metric, best classifier is chosen

    def logistic(self):
        
        clf = LR(class_weight= {0: 1, 1:person.weight}, max_iter= 5000)
        top_features = np.zeros(3) - 1
        for k in range(3):
            max_acc = 0
            for i in range(14 - k):

                features = np.zeros(14).astype(np.int64)
                if np.where(top_features > -1)[0].shape[0] > 0:
                    features[top_features[np.where(top_features > -1)[0]].astype(np.int64)] = 1
                zeros_indices = np.where(features == 0)[0]
                features[zeros_indices[i]] = 1
                data = self.feature_maker(person.X_train, features)
                acc = person.cross_validation(data, clf)

                if acc > max_acc:
                    max_acc = acc
                    temp = int(zeros_indices[i])

            top_features[k] = temp

        return (max_acc, top_features, clf)


    def svm(self):

        max_acc_hyperparams = 0
        top_clf = None
        top_top_features = None

        Cs = [0.3]
        for C in Cs:
            clf = SVC(class_weight= {0: 1, 1:person.weight}, max_iter= 5000, C= C)
            top_features = np.zeros(3) - 1
            for k in range(3):
                max_acc = 0
                for i in range(14 - k):

                    features = np.zeros(14).astype(np.int64)
                    if np.where(top_features > -1)[0].shape[0] > 0:
                        features[top_features[np.where(top_features > -1)[0]].astype(np.int64)] = 1
                    zeros_indices = np.where(features == 0)[0]
                    features[zeros_indices[i]] = 1
                    data = self.feature_maker(person.X_train, features)
                    acc = person.cross_validation(data, clf)

                    if acc > max_acc:
                        max_acc = acc
                        temp = int(zeros_indices[i])

                top_features[k] = temp
            
            if max_acc > max_acc_hyperparams:

                max_acc_hyperparams = max_acc
                top_clf = clf
                top_top_features = top_features

        return (max_acc_hyperparams, top_top_features, clf)

    def Random_Forest(self):

        max_acc_hyperparams = 0
        top_clf = None
        top_top_features = None

        n_estimators = [2, 5, 8]
        max_depths = [2, 5, 8]

        for n_estimator in n_estimators:
            for max_depth in max_depths:
                clf = RFC(class_weight= {0: 1, 1:person.weight}, n_estimators=n_estimator, max_depth=max_depth)
                top_features = np.zeros(3) - 1
                for k in range(3):
                    max_acc = 0
                    for i in range(14 - k):

                        features = np.zeros(14).astype(np.int64)
                        if np.where(top_features > -1)[0].shape[0] > 0:
                            features[top_features[np.where(top_features > -1)[0]].astype(np.int64)] = 1
                        zeros_indices = np.where(features == 0)[0]
                        features[zeros_indices[i]] = 1
                        data = self.feature_maker(person.X_train, features)
                        acc = person.cross_validation(data, clf)

                        if acc > max_acc:
                            max_acc = acc
                            temp = int(zeros_indices[i])

                    top_features[k] = temp
                
                if max_acc > max_acc_hyperparams:

                    max_acc_hyperparams = max_acc
                    top_clf = clf
                    top_top_features = top_features

        return (max_acc_hyperparams, top_top_features, clf)

    def tune(self):

        person = self.person
        MAX_ACC = 0
        CLF = None
        TOP_FEATURES = np.zeros(3)


        acc_lr, top_features_lr, clf_lr = self.logistic()
        acc_svm, top_features_svm, clf_svm = self.svm()
        acc_rf, top_features_rf, clf_rf = self.Random_Forest()

        # Comparison between classifier
        if acc_lr > MAX_ACC:
            MAX_ACC = acc_lr
            CLF = clf_lr
            TOP_FEATURES = top_features_lr

        if acc_svm > MAX_ACC:
            MAX_ACC = acc_svm
            CLF = clf_svm
            TOP_FEATURES = top_features_svm

        if acc_rf > MAX_ACC:
            MAX_ACC = acc_rf
            CLF = clf_rf
            TOP_FEATURES = top_features_rf
        

        person.TOP_FEATURES = TOP_FEATURES
        person.CLF = CLF
        names = [self.funcs[int(TOP_FEATURES[0])].__name__ , self.funcs[int(TOP_FEATURES[1])].__name__, self.funcs[int(TOP_FEATURES[2])].__name__]
        person.TOP_FEATURES_NAMES = ' '.join(names)
        person.MAX_ACC = MAX_ACC

    #########################################################

    #########################################################
    # Theses functions are used to create features from the data
    # Some are only related to one channel
    # and some are between channels

    def feature_maker(self, X, selection):

        selection = np.array(selection)
        select_channels_wise = selection[:len(self.funcs_ch)]
        # print(len(self.funcs_ch))
        select_cross_channel = selection[len(self.funcs_ch):]

        # print(select_channels_wise)
        # print(select_cross_channel)

        data1 = self.channel_wise(X, select_channels_wise)
        data2 = self.cross_channel(X, select_cross_channel)

        data = np.hstack((data1, data2))
        return data

    def cross_channel(self, X, select):

        data = np.zeros((int(self.person.trials), int(28 * np.sum(np.multiply(select, self.lens_cs)))))
        for i in range(self.person.trials):
            index = 0
            for j in range(len(self.funcs_cs)):
                if select[j] == 1:
                    for k in range(8):
                        for s in range(k+1, 8):                           
                            data[i][index + self.lens_cs[j] * k : index + self.lens_cs[j] * (k+1)] = self.funcs_cs[j](self.person, X[i][k], X[i][s])
                    index += 28 * self.lens_cs[j]

        return data

    def channel_wise(self, X, select):

        # print(select)
        data = np.zeros((int(self.person.trials), int(8 * np.sum(np.multiply(select, self.lens_ch)))))
        for i in range(self.person.trials):
            index = 0
            for j in range(len(self.funcs_ch)):
                if select[j] == 1:
                    
                    for k in range(8):
                        x = X[i][k]
                        if not self.funcs_ch[j].__name__ == 'time_mean':
                            x = x - np.mean(x)
                        data[i][index + self.lens_ch[j] * k : index + self.lens_ch[j] * (k+1)] = self.funcs_ch[j](self.person, x)
                    index += 8 * self.lens_ch[j]
        return data

    #############################################################
    #############################################################
    # Features: 

    # Channel Wise:
    def time_mean(person, x):
        return np.mean(x)

    def time_var(person, x):
        return np.var(x)

    def time_skew(person, x):
        return scipy.stats.skew(x)

    def fft_mean(person, x):
        fft = np.real(scipy.fft.fft(x) / person.fs)
        return np.mean(fft)

    def fft_var(person, x):
        fft = np.real(scipy.fft.fft(x) / person.fs)
        return np.var(fft)

    def fft_skew(person, x):
        fft = np.real(scipy.fft.fft(x) / person.fs)
        return np.var(fft)

    def ft_median(person, x):
        f, Pxx = periodogram(x, fs= person.fs)
        sum = np.trapz(Pxx, f)
        for i in range(f.shape[0]):
            temp = np.trapz(Pxx[:i], f[:i])
            if temp > sum / 2:
                break
        return f[i]
    def ft_mean_freq(person, x):
        f, Pxx = periodogram(x, fs= person.fs)
        a = np.trapz(np.multiply(f, Pxx), f)
        b = np.trapz(Pxx, f)
        return a/b

    def bandpower(person, x):
        f, Pxx = periodogram(x, fs= person.fs)
        bands = np.zeros(7)
        for i in range(7):
            fmin = 2 + 7*i
            fmax = 2 + 7*(i+1)
            ind_min = np.argmax(f > fmin) - 1
            ind_max = np.argmax(f > fmax) - 1
            bands[i] = np.trapz(Pxx[ind_min: ind_max], f[ind_min: ind_max])

        return bands

    def f_max(person, x):
        f, Pxx = periodogram(x, fs= person.fs)
        index = np.argmax(Pxx)
        return(f[index])


    def stack_fft(person, x):
        return np.real(scipy.fft.fft(x) / person.fs)[person.window // 2:]

    def stack_time(person, x):
        return x 


    # Cross Channel featurs

    def time_cor(person, x, y):
        return np.correlate(x,y) / np.sqrt(np.correlate(x,x) * np.correlate(y,y))
        return np.correlate(x, y)

    def fft_cor(person, x, y):
      
        return np.correlate(np.real(scipy.fft.fft(x) / person.fs), np.real(scipy.fft.fft(y) / person.fs))

## 1.2 Person Class

In [167]:
class Person:

    def __init__(self, train_data, test_data, window= 128, fs= 256):

        self.fs = fs 
        self.train_data = train_data
        self.test_data = test_data
        self.EEG_train = train_data[1:9]
        self.EEG_test = test_data[1:9]
        self.light_train = train_data[9]
        self.light_test = test_data[9]
        self.label_train = train_data[10]
        if np.sum(train_data[10]) == 300: # Single Char
            self.trials = 2700
            self.single_char = True
            self.weight = 35
        else:
            self.trials = 900 # Row-Column
            self.single_char = False
            self.weight = 5

        self.channels = 8
        self.window = window 
        
        self.extract_data()

    def extract_data(self):
        
        self.X_train = np.zeros((self.trials, self.channels, self.window))
        self.X_test = np.zeros((self.trials, self.channels, self.window))
        self.y_train = np.zeros(self.trials)
        self.letters_train = np.zeros(self.trials)
        self.letters_test = np.zeros(self.trials)

        i = 0
        for j in range(1, self.light_train.shape[0]):
            if self.light_train[j-1] == 0 and not self.light_train[j] == 0:
                index0 = (j-(self.window//8))
                index1 = (j+(7*self.window//8))
                for k in range(self.channels):
                    self.X_train[i][k] =  self.EEG_train[k][index0:index1]
                self.y_train[i] = self.label_train[j] 
                self.letters_train[i] = self.light_train[j]
                i += 1

        i = 0
        for j in range(1, self.light_test.shape[0]):
            if self.light_test[j-1] == 0 and not self.light_test[j] == 0:
                index0 = j-(self.window//8)
                index1 = j+(7*self.window//8)
                for k in range(self.channels):
                    self.X_test[i][k] =  self.EEG_test[k][index0:index1]         
                self.letters_test[i] = self.light_test[j]
                i += 1

    def plot_sample(self):

        plt.figure(figsize= (20, 15))
        for k in range(1, 9):
            plt.subplot(8, 1, k)
            plt.xlabel('t')
            plt.ylabel('Channel ' + str(k))
            plt.grid(True)
            plt.plot(self.X_train[k][0])

    def unison_shuffled_copies(a, b):
        assert len(a) == len(b)
        p = np.random.permutation(len(a))
        return a[p], b[p]

    # This method is used in training session: 

    def data_generator(self, Train= True):

        if Train:
            X = self.X_train

        else:
            X = self.X_test

        feature = Feature(self) # Feature Object
        features = np.zeros(14) # 14 Features are avaiable
        features[self.TOP_FEATURES.astype(np.int64)] = 1
        data = feature.feature_maker(X, features)

        return data

    # This method is used in Hyperparameter tuning session: 
    def tune(self):

        feature = Feature(self)
        feature.tune()

    def cross_validation(self, data, clf):

        data = StandardScaler().fit(data).transform(data)
        accuracies = []

        # CROSS VALIDATION
        for i in range(5): 

            indexes = np.arange(self.trials)
            test_index = np.arange(self.trials//5 * i, self.trials//5 * (i+1))
            train_index = np.setdiff1d(indexes, test_index)

            X_train = data[train_index]
            y_train = self.y_train[train_index]
            letters_train = self.letters_train[train_index]

            X_test = data[test_index]
            y_test = self.y_train[test_index]
            letters_test = self.letters_train[test_index]

            d, y = Person.unison_shuffled_copies(X_train, y_train)        
            clf.fit(d, y)
            pred = clf.predict(X_test)

            # Weighted Accuracy
            sw = np.multiply(y_test, (self.weight - 1)*np.ones(y_test.shape[0])) + np.ones(y_test.shape[0])
            accuracies.append(np.round(acc(y_test, pred, sample_weight= sw), 2))

        return np.mean(np.array(accuracies))

    def predict_on_train(self):

        data = self.data_generator()
        data = StandardScaler().fit(data).transform(data)
        word = []
        accuracies = []

        # CROSS VALIDATION
        for i in range(5): 

            indexes = np.arange(self.trials)
            test_index = np.arange(self.trials//5 * i, self.trials//5 * (i+1))
            train_index = np.setdiff1d(indexes, test_index)

            X_train = data[train_index]
            y_train = self.y_train[train_index]
            letters_train = self.letters_train[train_index]

            X_test = data[test_index]
            y_test = self.y_train[test_index]
            letters_test = self.letters_train[test_index]
            clf = clone(self.CLF)
            d, y = Person.unison_shuffled_copies(X_train, y_train)        
            clf.fit(d, y)
            pred = clf.predict(X_test)

            # Weighted Accuracy
            sw = np.multiply(y_test, (self.weight - 1)*np.ones(y_test.shape[0])) + np.ones(y_test.shape[0])
            accuracies.append(np.round(acc(y_test, pred, sample_weight= sw), 2))

            # Finding the letter:
            simu = np.multiply(pred, letters_test).astype(np.int64)
            simu[np.where(simu == 0)[0][0]] = 36
            count = np.bincount(simu)
            count[0] = 0
            count[-1] = count[-1] - 1

            if self.single_char:
                confidence = count[np.argmax(count)] / np.sum(count)
                letter = np.argmax(count) - 1
                word.append(DICT_1D[letter])

            else:
                if not np.sum(count[13:]) == 0:
                    raise ValueError

                count_rows = count[7:12]
                count_columns = count[1:6]

                row = np.argmax(count_rows)
                col = np.argmax(count_columns)
                word.append(DICT_2D[row][col])

        print('Predicted Word using cross validation is (' + ''.join(word) + ') With accuracy of ' + str(round(np.mean(np.array(accuracies)), 2)))

    def train_completely(self):

        data = self.data_generator()
        data = StandardScaler().fit(data).transform(data)
        d, y = Person.unison_shuffled_copies(data, self.y_train)
        clf = clone(self.CLF) # Copy of ideal Classifier
        clf.fit(d, y)
        return clf
    
    def predict_on_test(self):

        clf = self.train_completely()

        word = []
        
        data = self.data_generator(Train= False)
        data = StandardScaler().fit(data).transform(data)
        for i in range(5): 

            indexes = np.arange(self.trials)
            test_index = np.arange(self.trials//5 * i, self.trials//5 * (i+1))
            X_test = data[test_index]
            letters_test = self.letters_test[test_index]
            pred = clf.predict(X_test)

            # Finding the letter:
            simu = np.multiply(pred, letters_test).astype(np.int64)
            simu[simu == 0][0] = 36
            count = np.bincount(simu)
            count[0] = 0
            count[-1] = count[-1] - 1

            if self.single_char:
                confidence = count[np.argmax(count)] / np.sum(count)
                letter = np.argmax(count) - 1
                word.append(DICT_1D[letter])

            else:
                if not np.sum(count[13:]) == 0:
                    raise ValueError
                count_rows = count[7:12]
                count_columns = count[1:6]

                row = np.argmax(count_rows)
                col = np.argmax(count_columns)

                conf_row = count[np.argmax(count_rows)] / np.sum(count_rows)
                conf_columns = count[np.argmax(count_columns)] / np.sum(count_columns)
                word.append(DICT_2D[row][col])


        print('Predicted Word is ' + ''.join(word))

    def summary(self):

        names = self.TOP_FEATURES_NAMES.split()
        print('Using weighted accuracy as the metric, top 3 features for this person has been extracted')
        print('These Features are: ')
        print('1) ' + names[0])
        print('2) ' + names[1])
        print('3) ' + names[2])
        print('3 different Classifiers were tested and the best is: ', end= '')
        print(self.CLF)
        print('The max CV accuracy is ' + str(round(self.MAX_ACC, 2)))

# 2 Hyperparameter Tuning

## 2.1 Extracing Best Features with different WindowSizes

In [None]:
persons256 = []
for i in range(9):
    person = Person(train_data=train_data[i], test_data= test_data[i], window= 256)
    print('------')
    print(i)
    person.tune()
    persons256.append(person)

In [None]:
persons128 = []
for i in range(9):
    person = Person(train_data=train_data[i], test_data= test_data[i], window= 128)
    print('------')
    print(i)
    person.tune()
    persons128.append(person)

## 2.2 Validation Results

In [160]:
for i in range(9):

    print('Person numeber ' + str(i+1))
    print('Size128: ', end= '')
    persons128[i].predict_on_train()
    print('Size256: ', end= '')
    persons256[i].predict_on_train()    

Person numeber 1
Size128: Predicted Word using cross validation is (L4KAY) With accuracy of 0.59
Size256: Predicted Word using cross validation is (E6FAN) With accuracy of 0.6
Person numeber 2
Size128: Predicted Word using cross validation is (LUKAS) With accuracy of 0.69
Size256: Predicted Word using cross validation is (LUKAS) With accuracy of 0.68
Person numeber 3
Size128: Predicted Word using cross validation is (PHGAS) With accuracy of 0.58
Size256: Predicted Word using cross validation is (HKKAS) With accuracy of 0.61
Person numeber 4
Size128: Predicted Word using cross validation is (HUKAS) With accuracy of 0.76
Size256: Predicted Word using cross validation is (HUKAS) With accuracy of 0.7
Person numeber 5
Size128: Predicted Word using cross validation is (IUKAM) With accuracy of 0.65
Size256: Predicted Word using cross validation is (IOKAS) With accuracy of 0.66
Person numeber 6
Size128: Predicted Word using cross validation is (KSKCS) With accuracy of 0.59
Size256: Predicted W

## 2.3 Deciding Window Size

In [159]:
final_persons = []

final_persons.append(persons128[0])
final_persons.append(persons256[1])
final_persons.append(persons256[2])

final_persons.append(persons128[3])
final_persons.append(persons256[4])
final_persons.append(persons256[5])

final_persons.append(persons256[6])
final_persons.append(persons128[7])
final_persons.append(persons128[8])

# 3 Results

## 3.1 Summary of selected featuers

In [180]:
for i in range(9):
    person = final_persons[i]
    print('Person Number ' + str(i+1) + ': ', end= '')
    summary(person)
    print('----------')


Person Number 1: Using weighted accuracy as the metric, top 3 features for this person has been extracted
These Features are: 
1) fft_cor
2) time_mean
3) fft_var
3 different Classifiers were tested and the best is: LogisticRegression(class_weight={0: 1, 1: 35}, max_iter=5000)
The max CV accuracy is 0.59
----------
Person Number 2: Using weighted accuracy as the metric, top 3 features for this person has been extracted
These Features are: 
1) stack_time
2) bandpower
3) time_mean
3 different Classifiers were tested and the best is: LogisticRegression(class_weight={0: 1, 1: 35}, max_iter=5000)
The max CV accuracy is 0.68
----------
Person Number 3: Using weighted accuracy as the metric, top 3 features for this person has been extracted
These Features are: 
1) stack_time
2) ft_median
3) f_max
3 different Classifiers were tested and the best is: SVC(C=0.3, class_weight={0: 1, 1: 5}, max_iter=5000)
The max CV accuracy is 0.61
----------
Person Number 4: Using weighted accuracy as the metric,

## 3.2 Validation Results

In [164]:
for i in range(9):
    person = final_persons[i]
    print('Person Number ' + str(i+1) + ': ', end= '')
    person.predict_on_train()


Person Number 1: Predicted Word using cross validation is (L4KAY) With accuracy of 0.59
Person Number 2: Predicted Word using cross validation is (LUKAS) With accuracy of 0.68
Person Number 3: Predicted Word using cross validation is (HKKAS) With accuracy of 0.61
Person Number 4: Predicted Word using cross validation is (HUKAS) With accuracy of 0.76
Person Number 5: Predicted Word using cross validation is (IOKAS) With accuracy of 0.66
Person Number 6: Predicted Word using cross validation is (DCKAA) With accuracy of 0.57
Person Number 7: Predicted Word using cross validation is (JUKAS) With accuracy of 0.64
Person Number 8: Predicted Word using cross validation is (HIKAS) With accuracy of 0.73
Person Number 9: Predicted Word using cross validation is (KUKAS) With accuracy of 0.68


## 3.3 Results On Test Data

In [181]:
for i in range(9):
    person = final_persons[i]
    print('Person Number ' + str(i+1) + ': ', end= '')
    person.predict_on_test()


Person Number 1: Predicted Word is LDNHA
Person Number 2: Predicted Word is LUKAS
Person Number 3: Predicted Word is KUKAS
Person Number 4: Predicted Word is KUKAS
Person Number 5: Predicted Word is WATEQ
Person Number 6: Predicted Word is WMUAA
Person Number 7: Predicted Word is 2AZEM
Person Number 8: Predicted Word is WATEM
Person Number 9: Predicted Word is WATEP
