In [7]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

import scipy.io as sio
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

import hdc
import time

import importlib
importlib.reload(hdc)

%matplotlib notebook

In [46]:
# location of all offline data
dataDir = './emg_mat/offline/'

# choose experiments for base and new context
baseExperiment = 1
newExperiment = 3

# segment only the hold portions of data
holdStart = 70
holdEnd = 149
numEx = holdEnd - holdStart + 1

# hypervector and feature dimensions
D = 10000
numFeat = 320


numIter = 2
# testPercentage = np.linspace(0.05,1,20)
# adaptThreshold = np.linspace(0.05,0.8,16)
testPercentage = np.linspace(0.1,0.8,2)
adaptThreshold = np.linspace(0.2,0.5,2)

numSVM = np.zeros((len(testPercentage),numIter))
accSVM = np.zeros((len(testPercentage),numIter))

numHDC = np.zeros((len(testPercentage),len(adaptThreshold),numIter))
accHDC = np.zeros((len(testPercentage),len(adaptThreshold),numIter))

# subject labels are 1-indexed
subject = 3
print('Gathering data for Subject ' + str(subject))

# load data from the two contexts
filename = dataDir + 'S' + str(subject) + 'E' + str(baseExperiment) + '.mat'
base = sio.loadmat(filename)['emgHD']
filename = dataDir + 'S' + str(subject) + 'E' + str(newExperiment) + '.mat'
new = sio.loadmat(filename)['emgHD']

# get metatdata
numGest, numTrial = base.shape
numCh = base[0,0][2].shape[1]

# collect all data and as single dataframe
features = np.empty((numCh*5,0))
ngrams = np.empty((D,0))
labels = np.empty(0)
trials = np.empty(0)
context = np.empty(0)

# collect baseline data
for g in range(numGest):
    for t in range(numTrial):
        trial = base[g,t]
        feat = np.empty((0,numEx))
        for i in range(5):
            feat = np.concatenate((feat,trial[2][(holdStart+i):(holdEnd+i+1),:].T),axis=0)
        features = np.concatenate((features,feat),axis=1)
        ngrams = np.concatenate((ngrams,trial[3][:,holdStart:holdEnd+1]),axis=1)
        labels = np.concatenate((labels,g*np.ones(numEx)))
        trials = np.concatenate((trials,t*np.ones(numEx)))
        context = np.concatenate((context,0*np.ones(numEx)))

# collect new data
for g in range(numGest):
    for t in range(numTrial):
        trial = new[g,t]
        feat = np.empty((0,numEx))
        for i in range(5):
            feat = np.concatenate((feat,trial[2][(holdStart+i):(holdEnd+i+1),:].T),axis=0)
        features = np.concatenate((features,feat),axis=1)
        ngrams = np.concatenate((ngrams,trial[3][:,holdStart:holdEnd+1]),axis=1)
        labels = np.concatenate((labels,g*np.ones(numEx)))
        trials = np.concatenate((trials,t*np.ones(numEx)))
        context = np.concatenate((context,1*np.ones(numEx)))

# create dataframe for features
featCols = ['feature' + str(i) for i in range(features.shape[0])]
featData = pd.DataFrame(features.T,columns=featCols)
featData['gesture'] = labels
featData['trial'] = trials
featData['context'] = context

# create dataframe for ngrams
ngramCols = ['hv' + str(i) for i in range(ngrams.shape[0])]
ngramData = pd.DataFrame(ngrams.T,columns=ngramCols)
ngramData['gesture'] = labels
ngramData['trial'] = trials
ngramData['context'] = context

# loop through different testing percentages
for tpIdx,tp in enumerate(testPercentage):
    print('Running with %f of single trial for training' % (tp))
    # iterate through to get averages (different cross-validation folds)
    elapsedTime = 0
    for n in range(numIter):
        startTime = time.time()
        isTrain = np.empty(0)
        trainTrials = np.random.randint(0,numTrial,numGest)
        for g in range(numGest):
            for t in range(numTrial):
                if t == trainTrials[g]:
                    isTrain = np.concatenate((isTrain,np.random.permutation(np.concatenate((np.ones(int(round(tp*numEx))), -np.ones(numEx - int(round(tp*numEx))))))))
                else:
                    isTrain = np.concatenate((isTrain,np.zeros(numEx)))
        trainTrials = np.random.randint(0,numTrial,numGest)
        for g in range(numGest):
            for t in range(numTrial):
                if t == trainTrials[g]:
                    isTrain = np.concatenate((isTrain,np.random.permutation(np.concatenate((np.ones(int(round(tp*numEx))), -np.ones(numEx - int(round(tp*numEx))))))))
                else:
                    isTrain = np.concatenate((isTrain,np.zeros(numEx)))

        featData['isTrain'] = isTrain
        ngramData['isTrain'] = isTrain

        # train HD model
        allGest = ngramData['gesture'].unique()
        
        # loop through all HDC adaptive thresholds
        for atIdx,at in enumerate(adaptThreshold):
            AM = hdc.Memory(D)
            for g in allGest:
                ng = np.asarray(ngramData.loc[(ngramData['gesture'] == g) & (ngramData['isTrain'] == 1) & (ngramData['context'] == 0)].iloc[:,0:D])
                AM.train_sub_cluster(ng,vClass=g,threshold=at)
                AM.prune(min=5)

            for g in allGest:
                ng = np.asarray(ngramData.loc[(ngramData['gesture'] == g) & (ngramData['isTrain'] == 1) & (ngramData['context'] == 1)].iloc[:,0:D])
                AM.train_sub_cluster(ng,vClass=g,threshold=at)
                AM.prune(min=5)
                
            # collect testing data and perform inference
            testNgram = ngramData.loc[(ngramData['isTrain'] == 0)].iloc[:,0:D]
            testLabel = ngramData.loc[(ngramData['isTrain'] == 0)].iloc[:,D]
#             AM.prune(min=5)
            label,sim = AM.match(np.asarray(testNgram),bipolar=True)
            accHDC[tpIdx,atIdx,n] = (label == np.asarray(testLabel)).sum()/len(label)
            numHDC[tpIdx,atIdx,n] = len(AM.classes)

        # train and test SVM model
        clf = svm.SVC(decision_function_shape='ovo',kernel='linear',C=1)
        clf.fit(featData.loc[featData['isTrain'] == 1].iloc[:,0:numFeat],featData.loc[featData['isTrain'] == 1].iloc[:,numFeat])
        yhat = clf.predict(featData.loc[featData['isTrain'] == 0].iloc[:,0:numFeat])
        accSVM[tpIdx,n] = accuracy_score(yhat,featData.loc[featData['isTrain'] == 0].iloc[:,numFeat])
        numSVM[tpIdx,n] = len(clf.support_)
        
        endTime = time.time()
        elapsedTime = (elapsedTime*n/(n+1)) + ((endTime-startTime)/(n+1))
        print('Finished iteration %d, average time %f seconds\r' % (n+1, elapsedTime), end="")
    print('')
        

Gathering data for Subject 3
Running with 0.100000 of single trial for training
Finished iteration 2, average time 3.556813 seconds
Running with 0.800000 of single trial for training
Finished iteration 2, average time 5.493183 seconds


In [47]:
np.mean(accSVM,axis=accSVM.ndim-1)

array([0.87253606, 0.90228365])

In [48]:
np.mean(numSVM,axis=numSVM.ndim-1)

array([157.5, 559. ])

In [49]:
np.mean(accHDC,axis=accHDC.ndim-1)

array([[0.82608173, 0.83515625],
       [0.84471154, 0.83936298]])

In [50]:
np.mean(numHDC,axis=numHDC.ndim-1)

array([[13.5, 39.5],
       [13.5, 37.5]])

In [None]:
matOut = {}
matOut['accSVM'] = accSVM
matOut['accHDC'] = accHDC
matOut['numSVM'] = numSVM
matOut['numHDC'] = numHDC

In [None]:
sio.savemat('s2_adapt.mat',matOut)

In [53]:
sim.shape

(8320, 35)

In [54]:
len(AM.classes)

35