In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
def extractInfo(data,nSamples):
    """Extract relevant information from the matlab data object"""
    
    channelNames = [channel[0] for channel in data["nfo"][0][0][2][0]]

    #Extracting Sampling Rate
    sRate = data["nfo"][0][0][0][0][0]

    # Extracting class labels
    classLabels = list(map(lambda x : x[0], data["nfo"][0][0][1][0]))

    #Extracting event onset data
    eventOnsets = data["mrk"][0][0][0]

    #Extracting event code data
    eventCodes = data["mrk"][0][0][1]

    #labels for each eeg data sample
    labels  = np.zeros((1,nSamples),dtype=int)

    # Set labels positions to event codes using eventOnsets as indexes
    # This ensure that data samples that arent associated with any event have a target value of zero

    labels[0,eventOnsets] = eventCodes

    return channelNames,sRate,classLabels,eventOnsets,eventCodes,labels


In [3]:
def getTrials(labels, uniqueEventCodes, trialWindow,eeg,eventCodes_train_test,eventOnsets_train_test,nChannels,selectedChannels=None):

    """Extract trials from continuous EEG data 
    
    Paramters
    ---------
    labels - An Array containing the class labels (left and right)

    uniqueEventCodes - Event codes corresponding to a particular label (-1 and 1)

    trialWindow - An array containing values representing the trial window. This array is added 
                  to a given event onset time sample point to extract the effective trial window
    
    eeg - 2D array of shape channel x samples

    eventOnsets_train_test - An array containing the event onset from which to extract trials 
    
    eventCodes_train_test - An array containing the event codes corresponding to the given event onsets

    nChannels - The number of channels in the data

    selectedChannels - An array containing specific channels from which to extract data


    Returns
    -------
    trials - a dictionary object containing extracted trials for the given motor imagery classes
    
    """

    #In order to obtain the appropriate trials for this dataset, we need to define a time window for epoching the data.

    #The NFFT param value for the PSD function must be exactly half of the length of the
    # sample window + 1
    #It seems that increasing the length of the time window increases the classification accuracy but only up on until 
    # time samples
    #Create visual representaion for this.
    #It may be useful to show also the effect of shifting the start time of the window 0.0, 0.1, 0.2 etc

    trials = {}

    for cls, code in zip(labels,uniqueEventCodes):

        #Get all event onsets for the particular class
        # Create a filter array i.e return an array with True
        # values at all indeces where the eventCode = code
        # and False otherwise

        filter_arr = eventCodes_train_test == code

        # use arr to filter event onsest specific to this class
        clsOnsets = eventOnsets_train_test[filter_arr]
        
        #Allocate memmory for trial
        trials[cls] = np.zeros(
            (nChannels if selectedChannels is None else len(selectedChannels), len(clsOnsets), len(trialWindow)))
        
        #Extract trials for the class
        
        for i, onset in enumerate(clsOnsets):
            #For all 59 channels extract class trials of size onset + win from
            # Each onsent represents the start time of an external cue
            #Each row is a list of the values for each channel of the 59 channels recorded
            #within the time window of interest
                
            lastI = onset + len(trialWindow)
                
            selectedWindow = eeg[:,onset+trialWindow]

                
            trials.get(cls)[:,i,:] = selectedWindow if selectedChannels is None else selectedWindow[selectedChannels,:]
        
    
    return trials


In [4]:
from scipy import signal

def getFilteredTrials(trials, trialWin,nChannels,sRate,b=None,a=None,selectedChannels=None):
    """Extract filtered trials 
    
    Parameters
    ----------
    trials - a dictionary object containing extracted trials the given motor imagery classes

    trialWin - An array containing values representing the trial window. This array is added 
                  to a given event onset time sample point to extract the effective trial window
    
    nChannels - The number of channels in the data

    sRate - The sampling rate

    b - numerator (b)  coefficients of an iirfilter

    a - denominator (a) coefficients of an iirfilter

    selectedChannels - An array containing specific channels from which to extract data.

    Returns
    -------
    trials_filt: A dictionary object contatining filtered trials for the given motor imagery classes
    
    """
    
    trials_filt = {}

    def bandPass(trial_lr,lowcut,highcut,fs):
        nonlocal b, a
        """Bandpass filter 

        Parameters
        ----------
        trial_lr: A 3D ndarray of shape (channels x trials x time) which contains the trials per channels

        lowcut: Lower frequency bound in Hz

        highcut: Higher frequency bound in Hz
        
        fs: Sampling frequency

        Returns
        -------
        trials_filt: A 3D ndarray of shape (channels x trials x time) which contains the bandpass filtered trials per channels
        """
        nqfreq = 0.5*fs
        
        if b is None and a is None:
            b , a = signal.iirfilter(4,[lowcut/nqfreq,highcut/nqfreq])

        nTrials = trial_lr.shape[1]
        filt_trials = np.zeros((nChannels if selectedChannels is None else len(selectedChannels),nTrials,len(trialWin)))


        for t in range(nTrials):
            filt_trials[:,t,:] = signal.filtfilt(b,a,trial_lr[:,t,:],axis=1)

        return filt_trials

    trials_filt["left"] = bandPass(trials["left"],8,12,sRate)
    trials_filt["right"] = bandPass(trials["right"],8,12,sRate)

    return trials_filt


In [5]:
from matplotlib import mlab

def psd(trials, trialWindow, nChannels, sRate):
    """
    Parameters
    ----------
    trials: A 3D ndarray of shape (channels x trials x time)

    Returns
    -------
    trials_PSD: A 3D ndarray of shape (channels x trials x PSD) the PSD for each trial
    """

    nTrials = trials.shape[1]
    trials_PSD = np.zeros((nChannels,nTrials, int(len(trialWindow)/2)+1)) #Why?

    for trial in range(nTrials):
        for ch in range(nChannels):
            #Calculate the PSD
            
            (PSD, freqs) = mlab.psd(trials[ch, trial , :], NFFT=len(trialWindow), Fs=sRate)
            trials_PSD[ch, trial, :] = PSD

    return trials_PSD, freqs


In [6]:
def plot_psd(trial_PSDs,freqs,channel_IDXs,ymax):

    """ Plot the mean Power Spectral Density of left and right hand signals from 
        all trials at the given electrode channels """

    plt.figure(figsize=(12,5))
    nChans = len(channel_IDXs)
    nRows = int(np.ceil(nChans/3))
    nCols = min(3,nChans)

    #for channels in channel indexes
    for i,ch in enumerate(channel_IDXs):
        plt.subplot(nRows,nCols,i+1)

        for cls in trial_PSDs.keys():
            plt.plot(freqs,np.mean(trial_PSDs[cls][ch,:,:],axis=0),label=cls)

        #plt.fill_betweenx(np.mean(trial_PSDs["left"][ch,:,:],axis=0), 8, 12, color="green", alpha=0.2)

        plt.xlabel("Frequency in Hz")
        
        plt.xlim(1,30)
        plt.grid()
        plt.ylim(0,ymax)

        plt.legend()

    plt.tight_layout()


In [7]:
#Following the standard bci classification paradigm outlined in "The non-invasive Berlin Brain–Computer Interface: Fast acquisition of effective performance in untrained subjects"
#Benjamin Blankertz,a,⁎ Guido Dornhege,a Matthias Krauledat,a,b Klaus-Robert Müller, and Gabriel Curio

def logvar(trials):
    #trials has a shape of 59 x 100 x 200
    return np.log(np.var(trials,axis=2))
    #calculate variance along the sample (time sample) axis
    #then calculate the log of the result

    # Since VARIANCE of band-pass filtered signals
    # is equal to band-power, CSP analysis is applied
    # to approximately band-pass filtered signals in order
    # to obtain an effective discrimination of mental states
    # that are characterized by ERD/ERS effects ref:Optimizing Spatial filters for Robust EEG Single-Trial Analysis

    # The log of the variance can be useful for data that exhibits exponential or power-law relationships,
    # as it can help to compress the range of the data and make it easier to visualize and analyze.

    # For example, consider a set of spatial filters with variances[1, 10, 100]. The log of the variances
    # would be[0, 1, 2], which has a smaller range than the original data. This can be useful for data that
    # has a wide range of values, as it can make it easier to visualize and analyze the data.


In [8]:
def plot_logvar(trials, nChannels, classLabels):
    """ Plot the mean log-var (logarithm of the variance) """
    plt.figure(figsize=(12,5))

    x0 = np.arange(nChannels)
    x1 = np.arange(nChannels) + 0.4

    y0 = np.mean(trials["left"],axis=1)
    y1 = np.mean(trials["right"],axis=1)

    #axis 1 refers to the axis at position 1 in the dimension tuple
    #in this case, the dimension tuple of trials["left"] or trials["right"]
    # is (59,100)
    #Hence the mean is calculated along the axis with 100 values
    #Leaving a vector of shape (59,)


    plt.bar(x0,y0, width=0.5, color="b")
    plt.bar(x1,y1, width=0.4, color="r")

    plt.xlim(-0.5,nChannels+0.5)

    plt.gca().yaxis.grid(True)
    plt.title("log-var of each channel")
    plt.xlabel("channels")
    plt.ylabel("log-var")
    plt.legend(classLabels)

    #A plot of the log of the variance of the CSP transformed data can be useful for identifying patterns or
    # trends in the data that may be indicative of differences between the two classes of signals.
    # For example, if the log of the variance of the CSP transformed data is higher for one class than the
    # other, it could indicate that the filters for that class are more discriminative.

    # To choose the subset of filters that are most discriminative between the two classes of signals,
    # it may be useful to plot the log of the variance of the CSP transformed data and examine the patterns
    # or trends that emerge. The filters with the highest variance could be selected as the most discriminative,
    # as they may contain the most information about the differences between the two classes.

    #In the case below, it seems that the log of the variance of spatial filter 0 (column 0) is at its highest for
    #left hand signals and its lowest for right hand singals, whereas, spatial filter 59, has its log-var at its highest
    #right hand signals and its lowest for left hand signals. Hence, for each signal window (shape 59 x 1 x 200)
    # We can extract the two most relevant spatial filters to determine whether it represents left or right.

    


In [9]:
from numpy import linalg

def cov(trials):
    """Calculate the covariance for each trial and return their average
    
    Parameters
    ----------
    trials - Array (channels x trials x samples) 

    Returns
    -------
    A covariance matrix containing the mean values of all covariance matrices generated from trials
    """
    ntrials = trials.shape[1]

    #Select all 59 channels (59 rows), then select the ith trial
    #for each of those 59 rows along with all the columns (values) for that
    # trial (resulting in a 59 x 200 matrix) and calculate the covariance matrix

    covs = [np.cov(trials[:,i,:]) for i in range(ntrials)]

    # covs consists of nTrials x 59 x 59 matrices
    #Since covariance is calculated by the dot product of X and X.T
    #Where X.T is the transpose of X [(59 x 200).dot((200 x 59))]

    #print(np.array(covs).shape)

    return np.mean(covs, axis=0)


In [10]:
def whitening(sigma):
    """Calculate a whitening matrix for covariance matrix sigma.

    Paramters
    ---------
    sigma - A covariance matrix of shape N x N, where N is the number of channels for a given
            trial. 

    Returns
    --------
    The whitened covariance matrix

    """
    #In singular value decomposition(SVD), a matrix is decomposed into the product of
    # three matrices: a left singular matrix, a diagonal matrix, and a right singular matrix.
    # The diagonal matrix, called the singular value matrix, contains the singular values of the
    # original matrix. The left and right singular matrices contain the left and right singular
    # vectors, respectively.

    # A whitening matrix is a diagonal matrix that is used to transform the singular value matrix
    # so that it has the identity matrix as its diagonal. This is done by dividing each element on the
    # diagonal of the singular value matrix by the square root of the corresponding singular value.

    # The whitening matrix is useful in SVD because it can be used to decorrelate the singular vectors,
    # which can simplify certain computations and make it easier to interpret the results of
    # the decomposition. For example, in dimensionality reduction, whitening the singular value matrix
    # can help to remove some of the redundancy in the data and make it easier to identify the underlying
    # structure of the data.

    U, l, _ = linalg.svd(sigma) # l is a vector of singular values U is the left singular matrix
    return U.dot(np.diag(l ** -0.5))

In [11]:
def csp(trials_r, trials_l):
    """Calculate the CSP transformation matrix W

    Paramters
    ---------
    trials_r - Array(channels x trials x samples) containing right hand movement trials
    trials_l - Array(channels x trials x samples) containing left hand movement trials
    
    Returns
    -------
    Mixing matrix W
    """
    cov_r = cov(trials_r)
    cov_l = cov(trials_l)
    P = whitening(cov_r + cov_l)

    B, _, _ = linalg.svd(P.T.dot(cov_r).dot(P))

    W = P.dot(B)

    return W


In [12]:
def apply_mix(W, trials, trialWin, nChannels, selectedChannels=None):
    """Apply a mixing matrix to each trial (basically multiply W with the EEG signal matrix)"""
    ntrials = trials.shape[1]


    trials_csp = np.zeros((nChannels if selectedChannels is None else len(selectedChannels) , ntrials, len(trialWin)))


    for i in range(ntrials):
        trials_csp[:,i,:] = W.T.dot(trials[:,i,:])

    return trials_csp


In [13]:
def scatter(left,right,classLabels):
    """ Display scatter plot of the distribution left and right motor imagery trials"""
    plt.figure()
    plt.scatter(left[0,:],left[-1,:],color="b")
    plt.scatter(right[0,:],right[-1,:],color="r")
    plt.xlabel("First Component")
    plt.ylabel("Last Component")
    plt.legend(classLabels)


In [14]:
import pandas as pd

def label_data(data,label,foldNum):
    """
    Parameters
    ----------
    data - An array of dimensions observation x features
    label the desired label

    Returns
    -------
    newData - An dataframe of with columns F1...Fn and label
    """

    nTrainSamples = data.shape[0]
    nFeatures = data.shape[1]

    featureColumns = [f"F{x}" for x in range(1,nFeatures+1)]

    newData = pd.DataFrame(data,columns=featureColumns)

    newData["Labels"] = np.array([label for x in range(nTrainSamples)])

    # Apply fold num
    newData["Fold"] = np.random.randint(1,foldNum+1,nTrainSamples)

    return newData


In [15]:
from sklearn.model_selection import train_test_split
def featureExtraction(data, split_percentage, b=None, a=None, selectedChannels=None, trialWinStart=0.5, trialWinEnd=2.5):

    """Extract csp features from training data"""
    
    eeg = data["cnt"].T 

    nChannels, nSamples = eeg.shape

    channelNames,sRate,classLabels,eventOnsets,eventCodes,labels = extractInfo(data,nSamples)


    trialWindow = np.arange(int(trialWinStart*sRate),int(trialWinEnd*sRate))

    trials = getTrials(classLabels,
                        np.unique(eventCodes),
                        trialWindow,
                        eeg,
                        eventCodes,
                        eventOnsets,
                        nChannels,
                        selectedChannels = selectedChannels)

    

    filteredTrials = getFilteredTrials(trials,
                                       trialWindow,
                                       nChannels,
                                       sRate,
                                       selectedChannels=selectedChannels,
                                       b=b,
                                       a=a)

    train = {"left": filteredTrials["left"],
            "right": filteredTrials["right"]}

    W = csp(train["right"],train["left"])

    train = {
    "left" : apply_mix(W,train["left"],trialWindow,nChannels,selectedChannels=selectedChannels),
    "right" : apply_mix(W,train["right"],trialWindow,nChannels,selectedChannels=selectedChannels)
    }

 
    train["left"] = logvar(train["left"])
    train["right"] = logvar(train["right"])

    comp = [-1, 0]

    train["left"] = train["left"][comp,:]
    train["right"] = train["right"][comp,:]

    train_l_df = label_data(train["left"].T,"l",5)
    train_r_df = label_data(train["right"].T,"r",5)

    train_comb = pd.concat([train_l_df,train_r_df])

    if split_percentage < 1:

        train_comb, test_comb = train_test_split(train_comb, train_size=split_percentage,random_state=42)

        return train_comb, test_comb, W, comp

    return train_comb, W, comp


In [16]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
import warnings
warnings.filterwarnings('ignore')

def testClassifier(rawData,trialWinStart,trialWinEnd,percentSplit,model,b=None,a=None,selectedChannels=None):

    """ Extract the best classifier performace from training data """
    
    #It may not yield the best results testing this classifier on data with a different trial 
    # width or start time since the data being tested is extracted using a different decomposition matrix (W) from the 
    # CSP algorithm. As the classifier being tested was initially fit to data extracted using a different decompostion matrix, 
    # poor performance is a likely result. It may be that in order to test the effect of increasing or decreasing the trial window 
    # length or the trial window start point, entirely new classifier tuning is required for each new window.
    
    
    trainData, _, _, _ = featureExtraction(rawData,percentSplit,
                                                        selectedChannels=selectedChannels,
                                                        trialWinStart=trialWinStart,
                                                        trialWinEnd=trialWinEnd,
                                                        b = b,
                                                        a = a)

    nCols = len(trainData.columns)
    ########################################## CLASSIFIER TUNING ####################################################

    ########################################## LINEAR DISCRIMINANT ANALYSIS #########################################
    
    if model == "LDA":
        param_grid_LDA = {'solver': ['svd', 'lsqr', 'eigen'],'shrinkage':np.arange(0,1,0.1)}
        gridLDA = GridSearchCV(estimator=LDA(), param_grid=param_grid_LDA,scoring="accuracy", cv=5)
        gridLDA.fit(trainData.iloc[:,:nCols-2], trainData.iloc[:,nCols-2])

        return gridLDA.best_score_, gridLDA.best_estimator_


    ######################################### LOGISTIC REGRESSION #####################################################

    if model == "Logistic Regression":
        param_grid_LR = {'penalty': ['l1','l2'], 'C':np.logspace(-3,3), 'solver': ['newton-cg', 'lbfgs', 'liblinear']}
        gridLR = GridSearchCV(estimator=LogisticRegression(), param_grid=param_grid_LR,scoring="accuracy", cv=5)
        gridLR.fit(trainData.iloc[:,:nCols-2], trainData.iloc[:,nCols-2])

        
        return gridLR.best_score_, gridLR.best_estimator_


    ######################################### SUPPORT VECTOR CLASSIFIER #################################################

    if model == "SVC":
        param_grid_SVC = {'C': [0.0001,0.001,0.01,1,10,100,1000],
                          'gamma': [1, 0.1, 0.01, 0.001, 0.0001],
                          'kernel': ["linear",'rbf']}
        gridSVC = GridSearchCV(estimator=SVC(), param_grid=param_grid_SVC,scoring="accuracy", cv=5)
        gridSVC.fit(trainData.iloc[:,:nCols-2], trainData.iloc[:,nCols-2])

    
        return gridSVC.best_score_ , gridSVC.best_estimator_


In [17]:
def startRunningClassifier(testData,calibratedModel,comp,W_train, trialWinStart,trialWinEnd,b=None, a=None):
    """ Extract class probabilities from evaluation test data for each time sample"""
    
    #Extracting Sampling Rate
    sRate = testData["nfo"][0][0][0][0][0]

    testEEG = testData["cnt"].T 
   
    nSamples = testEEG.shape[1] 

    trialWindow = np.arange(int(trialWinStart*sRate),int(trialWinEnd*sRate))

    preds = []
  
    try:
        for i in range(nSamples):
            
            #capture signal in sliding window
            captured_signal = testEEG[:,i+trialWindow]
            
            #filter captured signal
            nqfreq = 0.5*sRate

            if b is None and a is None:
                b , a = signal.iirfilter(6,[8/nqfreq,12/nqfreq])

            filtered_test_trial = signal.filtfilt(b,a,captured_signal,axis=1)

            #Extract features from transformed data
            spatialFilters = W_train.T.dot(filtered_test_trial)

            spatialFilters = spatialFilters[comp,:]

            features = np.log(np.var(spatialFilters,axis=1)).T

            features = pd.DataFrame([features],columns=["F1","F2"])

            classProbabilities = calibratedModel.predict_proba(features)

            preds.extend(classProbabilities)
    
    except IndexError:
        pass
       
    
    return preds


In [18]:
def createFilterBank(fs):
    """
        Parameters
        ----------
        fs: Sampling frequency

        Returns
        -------
        filterBank: A dictionary with keys represented by a frequency band; (lower bound, upper bound)
                    and values as numerator (b) and denominator (a) coefficients of the iirfilter
    """
    
    filterBank = {}

    freq_bands = [(8,12),(8,15),(13,30)]

    for lowcut, highcut in freq_bands:
        nqfreq = 0.5*fs
        b , a = signal.iirfilter(4,[lowcut/nqfreq,highcut/nqfreq])
        filterBank[(lowcut,highcut)] = (b,a)

    return filterBank


In [19]:
def testWindowLength(data,modelNames,scoring,split):

    """ Extract best performance from trial window length tuning """
    
    bestTime = None
    bestPerformance = 0
    bestEstimator = None
    
    for model in modelNames:
        
        accVals = {}
        bestModelPerformance = 0
        bestModelEstimator = None

        for i in np.linspace(1,5,9):
            accVals[i-0.5], estimator = testClassifier(data,0.5,i,split,model)
            
            if accVals[i-0.5] > bestModelPerformance:
                bestModelPerformance = accVals[i-0.5]
                bestModelEstimator = estimator
                bestTime = i-0.5

        if bestModelPerformance > bestPerformance:
            bestPerformance = bestModelPerformance
            bestEstimator = bestModelEstimator

        #plot acc Val
        x, y = zip(*accVals.items())
        plt.plot(x,y,label=f"{model}({bestModelPerformance})")
    
    plt.axvline(x=bestTime, color='purple', ls='--', lw=1.5, label=f"Best window length ({bestTime})")
    plt.legend()
    

    return bestEstimator
    


In [20]:
def testWindowStartTime(data,modelNames,scoring,split,bestLen,times=None):

    """ Extract best performance from trial window start time tuning """

    bestTime = None
    bestPerformance = 0
    bestEstimator = None

    for model in modelNames:

        accVals = {}
        bestModelPerformance = 0
        bestModelEstimator = None
        
        times = np.arange(0,2.1,0.1) if times is None else times
        for i in times:
            accVals[i], estimator = testClassifier(data,i,i+bestLen,split,model)

            if accVals[i] > bestModelPerformance:
                bestModelPerformance = accVals[i]
                bestModelEstimator = estimator
                bestTime = i

        if bestModelPerformance > bestPerformance:
            bestPerformance = bestModelPerformance
            bestEstimator = bestModelEstimator


        x, y = zip(*accVals.items())
        plt.plot(x,y,label=f"{model}({bestModelPerformance})")
   
    plt.axvline(x=bestTime, color='purple', ls='--', lw=1.5, label=f"Best start time({bestTime})")
    plt.legend()
    
    return bestEstimator
        


In [21]:
def testFrequencyBand(rawData,modelNames,filterBank,split,selectedChannels=None, trialWinStart=None, trialWinEnd=None):

    """ Extract best performance from frequency band tuning """


    totalBestBandAcc = 0
    bandFreqRes = {}
    bestEstimator = None
    
    bestBand = None
    
    for lower,upper in filterBank.keys():

        b, a = filterBank[lower,upper]
        
        bestBandAccVal = 0
        bestBandEstimator = None

        for model in modelNames:
            accVal, estimator = testClassifier(rawData,trialWinStart,trialWinEnd,split,model,b=b,a=a)

            if accVal > bestBandAccVal:
                bestBandAccVal = accVal
                bestBandEstimator = estimator
        
        if bestBandAccVal > totalBestBandAcc:
            bestBand = lower,upper
            totalBestBandAcc = bestBandAccVal
            bestEstimator = bestBandEstimator

        bandFreqRes[lower,upper] = (b,a)
        print(f"{lower}-{upper} : {bestBandAccVal}")

    

    return bestBand, bestEstimator, bandFreqRes[bestBand]
        
