# <font color=#6495ED>Emg Classifier</font>

In [3]:
import pandas as pd
import scipy.io as scio
import numpy as np
import os
import time
import tensorflow as tf
import random
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

## Data loader

### LoadCsvData : load data from csv file on local or remote server
LoadCsvData(dataPath,DataColumnsNames=[],isLaptop=False,chunkSize=None)
* dataPath : data path in local folder or url link to download wiht Keras.utils.get_files
* dataColumnsNames : names for columns 
* isLaptop : whether is laptop, to determine whether use chunkload to avoid latence and burdon on memory, default is False
* chunkSize : if isLaptop, set chunkSize to read the data in chunk mode and return a iterator, default is None

#### Method

In [4]:
def LoadCsvData(dataPath,DataColumnsNames=[],isLaptop=False,chunkSize=None):
    if 'http' in dataPath:
        dataPath = tf.keras.utils.get_file(dataPath.split('/')[-1],dataPath)
    try:
        rawData = pd.read_csv(dataPath,names=DataColumnsNames,header=0,chunksize=chunkSize if isLaptop else None)
        return rawData
    except:
        return None

#### An examle of LoadCsvData()

In [3]:
dataPath = r'C:\\Users\\mswxh\\Desktop\\S1.csv'
dataUrl = r'http://bbl.sjturover.com:8080/download/Ninapro/Data2/csvFile/S2.csv'
dataColumnsNames = ['emg0','emg1','emg2','emg3','emg4','emg5','emg6','emg7','emg8','emg9','emg10','emg11','label']
rawData = LoadCsvData(dataPath,dataColumnsNames)
print(type(rawData)) #<class 'pandas.core.frame.DataFrame'>
# rawDataIterator = LoadCsvData(dataPath,dataColumnsNames,isLaptop=True,chunkSize=10000)
# print(type(rawDataIterator)) #<class 'pandas.io.parsers.TextFileReader'>

<class 'NoneType'>


#### Delsys Test Data

In [5]:
# dataPath = r'E:\\Desktop\\EmgData_20180320080557_Rover.csv' 
# dataPath = r'C:\\Users\\eConRover\\Desktop\\EmgData_20180320080557_Rover.csv'
dataPath = r'C:\\Users\\mswxh\\Desktop\\EmgData_20180320080557_Rover.csv'
dataUrl = r'http://bbl.sjturover.com:8080/download/Ninapro/Data2/csvFile/S2.csv'
dataColumnsNames = ['emg0','emg1','emg2','emg3','label']
rawData = LoadCsvData(dataPath,dataColumnsNames)
print(type(rawData)) #<class 'pandas.core.frame.DataFrame'>
# rawDataIterator = LoadCsvData(dataPath,dataColumnsNames,isLaptop=True,chunkSize=10000)
# print(type(rawDataIterator)) #<class 'pandas.io.parsers.TextFileReader'>

<class 'pandas.core.frame.DataFrame'>


In [7]:
rawData.describe()

Unnamed: 0,emg0,emg1,emg2,emg3,label
count,42000.0,42000.0,42000.0,42000.0,42000.0
mean,-4e-06,1.1e-05,-1.3e-05,1.8e-05,3.0
std,0.000129,3e-05,5.4e-05,1e-05,2.000024
min,-0.001344,-0.000343,-0.000621,-3.3e-05,0.0
25%,-2e-05,4e-06,-2.1e-05,1.2e-05,1.0
50%,-2e-06,1.1e-05,-1.4e-05,1.8e-05,3.0
75%,1.8e-05,1.9e-05,-6e-06,2.3e-05,5.0
max,0.00104,0.000235,0.000502,9e-05,6.0


## Data preprocessor

#### feat_extra : extract features from emg data, default is time domain(TD) and autoregression(AR) features, featureNum = (4TD+8AR)*channelNum
* data : a window set of emg data, np.ndarray
* userAR : whether usr AR feature, default is True

In [8]:
def feat_extra(data,useAR = False):
    def feat_TD(data):
        data = np.array(data)
        data_result = []
        data_result.append(np.sum(np.fabs(data)))
        data_result.append(np.sum(np.fabs(data[1:]-data[:-1])))
        data_result.append(np.sum(data[1:]*data[:-1]>0))
        data_result.append(np.sum((data[:-2]-data[1:-1])*(data[1:-1]-data[2:])>0))
        return data_result
    def feat_ARBurg(data):
        ret = 8*[0.0]
        res = ret[:]
        ret[0] = 1.0
        res[0] = 1.0
        matall = np.vstack((np.array(data),np.array(data)))
        for i in range(1,8):
            sumn = 0.0
            sumd = 0.0
            mat = matall[:,i-1:]
            matold = mat.copy()
            
            sumn = mat[0,1:].dot(mat[1,:-1].T)
            sumd = mat[0,1:].dot(mat[0,1:].T)+mat[1,:-1].dot(mat[1,:-1].T)
            garma = -2*sumn/sumd
            
            for j in range(1,i+1):
                res[j] = ret[j]+garma*ret[i-j]
            ret = res[:]
            mat[1,1:] = mat[1,:-1] + garma*mat[0,1:]
            mat[0,1:] = mat[0,1:] + garma*matold[1,:-1]
            matall[:,i-1:] = mat
        return res[1:]
    allChannels = np.array(data.copy()).T
    fea = []
    for channel in allChannels:
        fea += feat_TD(channel)
        if useAR:
            fea += feat_ARBurg(channel)
    return fea

### PreProcessData : process data to extract features and prepare train and test dataset
PreProcessData (rawData,DataColumnsNames=[],isLaptop=False,chunkSize=None)
* rawData : rawData loaded from csvfile, DataFrame or TextFileReader
* featExtraFunc : method to extract features from raw data
* trianDataFactor : factor of rawData to prepare the trian dataset
* uniSampling: whether sampling the features and labels uniformally

In [9]:
def PreProcessData(rawData,featExtraFunc,winLength,winIncrement):
    start = time.clock()

    winCache = []
    labelInit = int(rawData.iloc[0,-1])
    fealabelDict = {i:[] for i in range(18)}
    if 'DataFrame' in str(type(rawData)):
        for line in rawData.values:
            if int(line[-1]) != labelInit:
                winCache = []
            winCache.append(line[:-1])
            labelInit = int(line[-1])
            if len(winCache) == winLength:
                oneFeature = feat_extra(winCache)
                fealabelDict[labelInit].append(oneFeature)
                winCache = winCache[winIncrement:]
    else:
        for chunk in rawData:
            for line in chunk:
                if int(line[-1]) != labelInit:
                    winCache = []
                winCache.append(line[:-1])
                labelInit = int(line[-1])
                if len(winCache) == winLength:
                    oneFeature = feat_extra(winCache)
                    fealabelDict[labelInit].append(oneFeature)
                    winCache = winCache[winIncrement:]
            
    elapsed = (time.clock() - start)
    print("Time used:",elapsed)
    
    return fealabelDict

In [10]:
def PreProcessDataMap(rawData,featExtraFunc,winLength,winIncrement):
    start = time.clock()
    features = []
    labels = []
    
    fealabelDict = {i:[] for i in range(18)}
    if 'DataFrame' in str(type(rawData)):
        index = 0
        while index+winLength < len(rawData):
            if rawData.iloc[index,-1] != rawData.iloc[index+winLength,-1]:
                index += winLength
            else:
                oneFeature = featExtraFunc(rawData.iloc[index:index+winLength,0:-1])
                features.append(oneFeature)
                labels.append(rawData.iloc[int(index+winLength/2),-1])
                index += winIncrement
    else:
        for chunk in rawData:
            for line in chunk:
                if int(line[-1]) != labelInit:
                    winCache = []
                winCache.append(line[:-1])
                labelInit = int(line[-1])
                if len(winCache) == winLength:
                    fealabelDict[labelInit].append(winCache)
                    winCache = winCache[winIncrement:]
    #map process the data
#     for label,data in fealabelDict.items():
#         fealabelDict[label] = list(map(featExtraFunc,data))
    
    elapsed = (time.clock() - start)
    print("Time used:",elapsed)
    return features,labels

In [11]:
def load_data(fealabelDict,trainDataFactor,uniSampling=True):
    train_x = []
    train_y = []
    test_x = []
    test_y = []
    if uniSampling:
        sampleNum = min([len(value) for value in fealabelDict.values()])
        trainNum = int(sampleNum*trainDataFactor)        
        for label,feature in fealabelDict.items():
            random.shuffle(feature)
            train_x.extend(feature[:trainNum])
            train_y.extend([label]*trainNum)
            test_x.extend(feature[trainNum:sampleNum])
            test_y.extend([label]*(sampleNum-trainNum))  
    else:
        for label,feature in fealabelDict.items():
            random.shuffle(feature)
            trainNum = int(len(feature)*trainDataFactor)
            train_x.extend(feature[:trainNum])
            train_y.extend([label]*trainNum)
            test_x.extend(feature[trainNum:])
            test_y.extend([label]*len(feature[trainNum:]))
    return (train_x,train_y),(test_x,test_y)

#### An examle of PreProcessData()

In [12]:
fealabelDict = PreProcessData(rawData,feat_extra,300,100)
for label,features in fealabelDict.items():
    print("label: {0}, fetureNum: {1}".format(label,len(features)))

Time used: 0.12255673166023863
label: 0, fetureNum: 58
label: 1, fetureNum: 58
label: 2, fetureNum: 58
label: 3, fetureNum: 58
label: 4, fetureNum: 58
label: 5, fetureNum: 58
label: 6, fetureNum: 58
label: 7, fetureNum: 0
label: 8, fetureNum: 0
label: 9, fetureNum: 0
label: 10, fetureNum: 0
label: 11, fetureNum: 0
label: 12, fetureNum: 0
label: 13, fetureNum: 0
label: 14, fetureNum: 0
label: 15, fetureNum: 0
label: 16, fetureNum: 0
label: 17, fetureNum: 0


#### test on delsys data

In [12]:
train_x,train_y = PreProcessDataMap(rawData,feat_extra,300,100)
print("train x length %s, type %s" % (len(train_x),type(train_x)))
print("train y length %s, type %s" % (len(train_y),type(train_y)))

Time used: 0.2609189228021847
train x length 399, type <class 'list'>
train y length 399, type <class 'list'>


In [13]:
features,labels = PreProcessDataMap(rawData,feat_extra,300,100)
fealabelMapDict = {i:[] for i in range(7)}
for i in range(len(features)):
    fealabelMapDict[labels[i]].append(features[i])
for label,feature in fealabelMapDict.items():
    print("label: {0}, fetureNum: {1}".format(label,len(feature)))

Time used: 0.25929868437222936
label: 0, fetureNum: 57
label: 1, fetureNum: 57
label: 2, fetureNum: 57
label: 3, fetureNum: 57
label: 4, fetureNum: 57
label: 5, fetureNum: 57
label: 6, fetureNum: 57


## LDA classification

### LDA model for delsys data

In [126]:
lda = LDA(solver='svd',shrinkage=None,store_covariance=True,priors=[1,1,1,1,1,1,1])
lda.fit(features,labels)



LinearDiscriminantAnalysis(n_components=None, priors=[1, 1, 1, 1, 1, 1, 1],
              shrinkage=None, solver='svd', store_covariance=True,
              tol=0.0001)

In [154]:
pd.DataFrame(lda.means_).to_csv("means_.csv",index=False,index_label=False,header=False)

In [155]:
pd.DataFrame(lda.covariance_).to_csv("covariance_.csv",index=False,index_label=False,header=False)

In [156]:
pd.DataFrame(lda.coef_).to_csv("coef_.csv",index=False,index_label=False,header=False)

In [157]:
pd.DataFrame(lda.intercept_).to_csv("intercept_.csv",index=False,index_label=False,header=False)

### LDA classifier testing

In [164]:
# dataPath = r'E:\\Desktop\\1_EmgData_TestClassifier_20180320080640_Rover.csv'
dataPath = r'C:\\Users\\mswxh\\Desktop\\test.csv'
dataColumnsNames = ['emg0','emg1','emg2','emg3']
rawTestData = LoadCsvData(dataPath,dataColumnsNames)
print(type(rawTestData)) #<class 'pandas.core.frame.DataFrame'>

<class 'pandas.core.frame.DataFrame'>


In [165]:
rawTestData.describe()

Unnamed: 0,emg0,emg1,emg2,emg3
count,300.0,300.0,300.0,300.0
mean,-1e-06,1.075314e-05,-1.3e-05,1.9e-05
std,0.000137,1.659292e-05,4.7e-05,4e-06
min,-0.000494,-5.22e-05,-0.000231,7e-06
25%,-7.2e-05,6.7125e-07,-3.5e-05,1.6e-05
50%,1.3e-05,1.21e-05,-1.7e-05,1.9e-05
75%,8.5e-05,2.085e-05,2e-06,2.1e-05
max,0.000344,4.88e-05,0.000198,3.1e-05


In [166]:
test_x,test_y = PreProcessDataMap(rawTestData,feat_extra,300,100)
print("test x length %s, type %s" % (len(test_x),type(test_x)))
print("test y length %s, type %s" % (len(test_y),type(test_y)))

Time used: 2.2609674488194287e-05
test x length 0, type <class 'list'>
test y length 0, type <class 'list'>


In [168]:
fea = feat_extra(rawTestData)

In [171]:
lda.predict(np.array(fea).reshape(1,-1))

array([2], dtype=int64)

In [167]:
TrueCount = 0
for i in range(len(test_x)):
    result = lda.predict(np.array(test_x[i]).reshape(1,-1))
    if result[0] == test_y[i]:
        TrueCount += 1
print("predictPrecision: {0}".format(TrueCount/len(test_x)))

ZeroDivisionError: division by zero