In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
import random
import matplotlib.pyplot as plt
import os

In [2]:
def samples_num_in_window(frequency, window_size_ms):  
    return int(window_size_ms * frequency / 1000)

In [3]:
def emg_data_windowing(data, window_size):
    data_win = np.copy(data)
    data_x = data_win[:,:-1]
    data_y = data_win[:,-1]
    n, m = data_x.shape
    size = n * m
    residual_rows_num =  n % window_size
    if residual_rows_num != 0:
        data_x = data_x[:-residual_rows_num,:]
        data_y = data_y[:-residual_rows_num]
    data_x = data_x.reshape((-1, m * window_size))
    
    data_y = data_y.reshape((-1, window_size))
    data_y = np.array(list(map(np.mean, data_y)))
    
    mixed_classes_idxs = np.where(data_y % 1 != 0)
    
    data_win = np.c_[data_x, data_y]
    data_win = np.delete(data_win, mixed_classes_idxs, 0)
    
    return data_win

In [4]:
def read_emg(data_path):
    sessions_csv = []
    for path, _, files in os.walk(data_path):
        for name in files:
            sessions_csv.append(os.path.join(path, name))

    data = pd.concat([pd.read_csv(file, header = None) for file in sessions_csv]).values
    print('input shape', data.shape)
    
    # reshape data
    # one column - one channel
    data_x = data[:,:-1]
    data_y = data[:,-1]
    data_x = data_x.reshape((-1, 8))
    data_y = data_y.repeat(8)
    data_y = data_y.reshape((-1,1))
    data = np.concatenate((data_x, data_y), axis=1)
    print('result shape: ', data.shape)

    return data

In [5]:
from nitime.algorithms.autoregressive import AR_est_LD
from sklearn.preprocessing import StandardScaler

def autoregression_coefficients(emg, order):
    coef = AR_est_LD(emg, order=order)[0]
    return coef

In [6]:
import math

def integrated_absolute_value(segment):
    return sum([abs(s) for s in segment])

def mean_absolute_value(segment):
    return sum([abs(s) for s in segment])/len(segment)

def waveform_length(segment):
    n = len(segment)
    wl = 0
    for i in range(1, n):
        wl += abs(segment[i] - segment[i-1])
    return wl

def zero_crossing(segment):
    n = len(segment)
    zc = 0
    for i in range(n - 1):
        if segment[i] * segment[i+1] < 0:
            zc += 1
    return zc

def slope_sign_changes(segment):
    n = len(segment)
    ssc = 0
    for i in range(1, n-1):
        if segment[i-1] < segment[i] and segment[i] > segment[i+1] or segment[i-1] > segment[i] and segment[i] < segment[i+1]:
            ssc += 1
    return ssc

def root_mean_square(segment):
    return math.sqrt(sum([s*s for s in segment])/len(segment))

In [7]:
def calculate_features(data_x, channels_num, ar_features=True):
    n, m = data_x.shape
    features = []
    
    for channel in range(channels_num):
        channel_features = []
        
        # Calculate MAV, ZC, SSC, WL features
        channel_features.append(list(map(mean_absolute_value, data_x[:,channel::channels_num])))
        channel_features.append(list(map(waveform_length, data_x[:,channel::channels_num])))
        channel_features.append(list(map(zero_crossing, data_x[:,channel::channels_num])))
        channel_features.append(list(map(slope_sign_changes, data_x[:,channel::channels_num])))
        
        if ar_features:
            # calculate AR6 coefficients
            ar_order = 6
            ar_coef = np.array(list(map(lambda x: autoregression_coefficients(x, ar_order), data_x[:,channel::channels_num])))
            channel_features += ar_coef.transpose().tolist()
        
        features += channel_features
    
    return np.array(features).transpose()

In [8]:
def gesture_classification(train, test, input_frequency, window_size_ms, classifier, ar_features=True):
    window_samples = samples_num_in_window(input_frequency, window_size_ms)
    train_win = emg_data_windowing(train, window_samples)
    test_win = emg_data_windowing(test, window_samples)
    
    train_x = train_win[:,:-1]
    train_y = train_win[:,-1].astype('int')
    test_x = test_win[:,:-1]
    test_y = test_win[:,-1].astype('int') 
    
    train_features = calculate_features(train_x, 8, ar_features)
    test_features = calculate_features(test_x, 8, ar_features)
    
    classifier.fit(train_features, train_y)
    pred = classifier.predict(test_features)
    
    return metrics.accuracy_score(test_y, pred)

In [9]:
# prepare data

sessions_path = 'data/5sessions/'
session_names = list(os.walk(sessions_path))[0][1]

sessions = []
for session_name in session_names:
    current_session = os.path.join(sessions_path, session_name)
    print(current_session)
    sessions.append(read_emg(current_session))
    print()

data/5sessions/session01
input shape (1205, 65)
result shape:  (9640, 9)

data/5sessions/session02
input shape (1201, 65)
result shape:  (9608, 9)

data/5sessions/session03
input shape (1185, 65)
result shape:  (9480, 9)

data/5sessions/session04
input shape (1197, 65)
result shape:  (9576, 9)

data/5sessions/session05
input shape (1214, 65)
result shape:  (9712, 9)



In [12]:
random.seed(101)

# hold-out for comparison of random split and split by sessions
# train: 1-4 sessions
# test: 5 session
train = sessions[:4]
test = sessions[4]

emg_windows_train = emg_data_windowing(np.concatenate(train), 40)
emg_windows_test = emg_data_windowing(test, 40)

train_x = emg_windows_train[:,:-1]
train_y = emg_windows_train[:,-1]
test_x = emg_windows_test[:,:-1]
test_y = emg_windows_test[:,-1]
features_train = calculate_features(train_x, 8)
features_test = calculate_features(test_x, 8)

clf.fit(features_train, train_y)
pred = clf.predict(features_test)
print("Accuracy on hold-out:", metrics.accuracy_score(test_y, pred))

# random split
random_train_x, random_test_x, random_train_y, random_test_y = train_test_split(features_train, train_y, test_size=0.25)
clf = svm.SVC(kernel='linear')
clf.fit(random_train_x, random_train_y)
pred = clf.predict(random_test_x)
print("Accuracy with random split:", metrics.accuracy_score(random_test_y, pred))

# split by sessions
# train: 1-3 sessions
# test: 4 session
train = sessions[:3]
test = sessions[3]
emg_windows_train = emg_data_windowing(np.concatenate(train), 40)
emg_windows_test = emg_data_windowing(test, 40)

train_x = emg_windows_train[:,:-1]
train_y = emg_windows_train[:,-1]
test_x = emg_windows_test[:,:-1]
test_y = emg_windows_test[:,-1]
features_train = calculate_features(train_x, 8)
features_test = calculate_features(test_x, 8)

clf.fit(features_train, train_y)
pred = clf.predict(features_test)
print("Accuracy with split by sessions:", metrics.accuracy_score(test_y, pred))

Accuracy on hold-out: 0.9079497907949791
Accuracy with random split: 0.9702127659574468
Accuracy with split by sessions: 0.9243697478991597
