In [None]:
## module imports ##

import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
import numpy as np
import scipy
import scipy.io
import scipy.signal
import math
import copy
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn import svm, metrics, preprocessing
from functools import partial
import copy
import tensorflow as tf
import statistics
import random
import seaborn as sn
from scikeras.wrappers import KerasClassifier

In [None]:
def preproc_signals(moves, win_sz, overlap):  ## Main processing function - load raw data and organise steps ##
    
    feat_mat = np.array([])
    
    for subject in range(1,23):
        
        ## retrieve signals
        
        ex1_mat_file = 'S' + str(subject) + '_E1_A1.mat'
        ex1_mat = scipy.io.loadmat(ex1_mat_file)
        
        ex2_mat_file = 'S' + str(subject) + '_E2_A1.mat'
        ex2_mat = scipy.io.loadmat(ex2_mat_file)
        

        emg = np.concatenate((np.array(ex1_mat['emg']), np.array(ex2_mat['emg'])), axis=0)
        
        acc = np.concatenate((np.array(ex1_mat['acc']), np.array(ex2_mat['acc'])), axis=0)
        
        gyro = np.concatenate((np.array(ex1_mat['gyro']), np.array(ex2_mat['gyro'])), axis=0)
        
        mag = np.concatenate((np.array(ex1_mat['mag']), np.array(ex2_mat['mag'])), axis=0)
        
        stim = np.concatenate((np.array(ex1_mat['restimulus']), np.array(ex2_mat['restimulus'])), axis=0)
        
        print('S' + str(subject) + ' concatenation complete')
        
        ## find samples where movement is present

        stim[np.where(stim!=0)] = 1
        
        ## remove dc components of signals
        
        remove_DC_component(emg, stim)
        
        remove_DC_component(acc, stim)
        
        remove_DC_component(gyro, stim)
        
        remove_DC_component(mag, stim)
        
        print('S' + str(subject) + ' dc removal complete')
        
        ## filter signals using a butterworth filter
        
        freq_filt(emg, 'bp')
        
        freq_filt(acc, 'lp')
        
        freq_filt(gyro, 'lp')
        
        freq_filt(mag, 'lp')
        
        print('S' + str(subject) + ' filtering complete')
        
        ## retain samples of signals where movement is present
        
        emg *= stim
        
        acc *= stim
        
        gyro *= stim
        
        mag *= stim
        
        print('S' + str(subject) + ' cleaning complete')
        
        ## split stream into individual movements
        
        feat_mat = split(emg, acc, gyro, mag, stim, subject, moves, feat_mat, win_sz, overlap)
        
        print('\nS' + str(subject) + ' splitting and feature extraction complete\n')
    
    print('\nPreprocessing task complete')
        
    return feat_mat

In [None]:
def split(emg, acc, gyro, mag, stim, subnum, moves, feat_mat, win_sz, overlap):  ## Split raw data stream into individual movements and repetitions ##
    
    i = -1
    
    m = 1
    r = 1
    
    while i < len(stim)-1:
        
        i += 1
        
        if stim[i] != 0:
            
            temp = i
            
            while stim[i] != 0:
                
                i += 1
                
                if i >= len(stim): break
            
            if m in moves:
                
                mov_rep_mat = np.concatenate((
                    create_vector_matrix_with_overlap(emg[temp:i], m, r, subnum, win_sz, overlap, 'emg'),
                    create_vector_matrix_with_overlap(acc[temp:i], m, r, subnum, win_sz, overlap, 'acc'),
                    create_vector_matrix_with_overlap(gyro[temp:i], m, r, subnum, win_sz, overlap, 'gyro'),
                    create_vector_matrix_with_overlap(mag[temp:i], m, r, subnum, win_sz, overlap, 'mag')
                    ),
                    axis = 1
                    )
                
                
                if feat_mat.size == 0:
                    
                    feat_mat = mov_rep_mat
                    
                else:
                    
                    feat_mat = np.append(feat_mat, mov_rep_mat, axis=0)
                    
            
                print('['+str(m)+'-'+str(r)+']', end = '')
            
            
            r += 1
            
            if r == 7:
                
                m += 1
                
                r = 1
                   
    return feat_mat

In [None]:
def create_vector_matrix_with_overlap(signal, m, r, subnum, win_sz, overlap, tp):  ## Build feature matrix for given movement and repetition, connect fragment to main feature matrix ##
    
    mat = []
    
    if tp == 'emg':
        
        features = [mean_absolute_value,
                    zero_crossings,
                    slope_sign_changes,
                    waveform_length,
                    root_mean_square,
                    variance]
        
    else:
        
        features = [mean_absolute_value,
                    zero_crossings,
                    slope_sign_changes,
                    waveform_length,
                    root_mean_square,
                    variance]
    
    for w in range(0, len(signal), int((1-overlap)*win_sz)):
        
        vec = []
        
        for f in features:
            
            for s in range(0, signal.shape[1]):
                
                vec.append(f(signal[:,s][w : w + win_sz]))
        
        if tp == 'mag':
            
            vec.append(m)
            vec.append(r)
            vec.append(subnum)
        
        mat.append(vec)
    
    return mat

In [None]:
def remove_DC_component(signal, stim):  ## Remove DC component of signal ##
    
    for col in range(0, signal.shape[1]):
        
        s = signal[:, col]
        
        y = stim == 0
        
        noise = s[y[:,0]==0]
        
        dcv = np.mean(noise)
        
        signal[:, col] -= dcv
        
    return

In [None]:
def freq_filt(signal, tp):  ## Filter signal using either lp or bp Butterworth filter ##
    
    if tp == 'lp':
    
        b, a = btw_lpf(4, 2000, 45)
            
    elif tp == 'bp':
        
        b, a = btw_bpf(4, 2000, 20, 450)
        
    for col in range(0, signal.shape[1]):
            
        signal[:,col] = scipy.signal.filtfilt(b, a, signal[:, col])
    
    return
    

def btw_lpf(order, fs, cutoff):  ## Build Butterworth Bandpass Filter ##
    
    nyqf = 0.5 * fs
    
    cut = cutoff / nyqf
    
    b, a = scipy.signal.butter(order, cut, btype='low')
    
    return b, a


def btw_bpf(order, fs, lowcut, highcut):  ## Build Butterworth Bandpass Filter ##
    
    nyqf = 0.5 * fs
    
    low = lowcut / nyqf
    
    high = highcut / nyqf
    
    b, a = scipy.signal.butter(order, [low, high], btype='band')
    
    return b, a

In [None]:
def mean_absolute_value(sig):  ## Calculate mean absolute value of signal ##
    
    return np.mean(np.abs(sig))

def mean_absolute_value_slope(mav_vec):  ## Calculate mean absolute value slope ##
    
    return np.append(mav_vec[1:], mav_vec[0]) - mav_vec

def zero_crossings(sig):  ## Calculate zero crossings number of signal ##
    
    return len(np.where(np.diff(np.signbit(sig)))[0])

def slope_sign_changes(sig):  ## Calculate slope sign changes number of signal ##

    sgn = np.sign(np.diff(sig))
    
    ssc = sum((np.roll(sgn, 1) - sgn) != 0)
            
    return ssc

def waveform_length(sig):  ## Calculate waveform length of signal ##
    
    wl = 0
    
    wl = np.sum(abs(sig[1:] - sig[:-1]))
        
    return wl

def root_mean_square(sig):  ## Calculate root mean square of signal ##
    
    return np.sqrt(np.mean(sig)**2)

def variance(sig):  ## Calculate variance of signal ##
    
    return np.mean(sig**2)

In [None]:
def standardize_features(feat_mat):  ## Standardize feature matrix ##
    
    for f in range(0, feat_mat.shape[1]-3):
        
        print(f)
        
        feat_mat[:,f] = feat_mat[:,f]/np.amax(abs(feat_mat[:,f]))
        
        mean_val = np.mean(feat_mat[:,f])
        std_val = statistics.stdev(feat_mat[:,f])
        
        feat_mat[:,f] = (feat_mat[:,f] - mean_val) / std_val
        
        #feat_mat[:,f] = feat_mat[:,f]/np.amax(abs(feat_mat[:,f]))
        
    print('\nFeature standardization complete.')
                            
    return feat_mat

In [None]:
def SVM_classifier(feat_mat, tp, fname):  ## Model and train SVM classifier ##

    print('SVM' + fname)
    
    x_train, y_train, x_test, y_test, input_size, output_size = extract_sets_random(feat_mat, tp, False)
                
    clf = svm.SVC()
    
    clf.fit(x_train, y_train)

    y_pred = clf.predict(x_test)
    
    print("Testing accuracy:",metrics.accuracy_score(y_test, y_pred))
    
    indiv_acc = cfmat_extraction(y_test, y_pred, fname)
    
    return clf

In [None]:
def ANN_classifier(feat_mat, tp, lr, fname):  ## Model and train ANN classifier ##

    print('ANN ' + fname)
    
    x_train, y_train, x_test, y_test, input_size, output_size = extract_sets_random(feat_mat, tp, True)
    
    #x_train, y_train, x_test, y_test, input_size, output_size = filter_amputees(feat_mat, tp, True)
    #input_size = len(x_train[0])
    
    model = tf.keras.Sequential(name='ANN-FF_model')
    model.add(tf.keras.layers.Dense(units=512,
                                    input_shape=(input_size,),
                                    activation='relu',
                                    name='input_layer')
              )
    
    
    model.add(tf.keras.layers.Dense(units=512,
                                    activation='relu',
                                    name='hidden_layer')
              )
    
    
    model.add(tf.keras.layers.Dense(output_size,
                                    activation='softmax',
                                    name='output_layer')
              )
    
    
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
                  loss='categorical_crossentropy',
                  metrics=['acc'])
    
    
    print(model.summary())
    
    callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=30)
    
    history = model.fit(x_train,
                        y_train,
                        batch_size=2048,
                        epochs=1000,
                        verbose=1,
                        validation_split=0.1,
                        shuffle=True,
                        validation_freq=1, callbacks=[callback]
                        )
    
    print('Training finalized at epoch', len(history.history['loss']))
    
    plot_acc_loss(history, fname)
    
    model.evaluate(x_test, y_test)
    
    y_pred = model.predict(x_test)
   
    indiv_acc = cfmat_extraction(y_test, y_pred, fname)
    
    return model

In [None]:
def filter_amputees(feat_mat, tp, ohe): ## take only amputee movements ##
    
    data = feat_mat
    target = feat_mat[:, -3]
    
    
    data, target = shuffle_data_and_target(data, target)
    
    if ohe == True:
        
        target_enc, input_size, output_size = one_hot_encoding(data, target)
        
        x_train, x_test, y_train, y_test = train_test_split(data, target_enc, test_size = 0.1, stratify=target_enc)
        
        if tp == 'emg':
            
            x_train = x_train[:,:72]
            mask = (x_test[:,-1]==21) | (x_test[:,-1]==22)
            x_test = x_test[mask,:72]
            y_test = y_test[mask]
            
        else:
            
            x_train = x_train[:,:-3]
            mask = (x_test[:,-1]==21) | (x_test[:,-1]==22)
            x_test = x_test[mask,:-3]
            y_test = y_test[mask]
        
    
    else:
        
        input_size = len(data[0])
        output_size = 1
        
        x_train, x_test, y_train, y_test = train_test_split(data, target, test_size = 0.1, stratify=target)
    

    return x_train, y_train, x_test, y_test, input_size, output_size  
        
        
def extract_sets_random(feat_mat, tp, ohe): ## Extract train and test sets, apply one hot encoding if necessary ##
    
    if tp == 'emg':
        
        data = feat_mat[:, 0:72]
        
    else:
        
        data = feat_mat[:, :-3]
        
    target = feat_mat[:, -3]
    
    
    data, target = shuffle_data_and_target(data, target)
    
    if ohe == True:
        
        target_enc, input_size, output_size = one_hot_encoding(data, target)
        
        x_train, x_test, y_train, y_test = train_test_split(data, target_enc, test_size = 0.1, stratify=target_enc)
    
    else:
        
        input_size = len(data[0])
        output_size = 1
        
        x_train, x_test, y_train, y_test = train_test_split(data, target, test_size = 0.1, stratify=target)
    

    return x_train, y_train, x_test, y_test, input_size, output_size


def one_hot_encoding(data, target):  ## Apply one hot encoding to target values ##
    
    le = preprocessing.LabelEncoder()
    le.fit(target)
    
    input_size = len(data[0])
    output_size = len(list(le.classes_))
    
    target_enc = tf.keras.utils.to_categorical(le.transform(target), output_size)
    
    return target_enc, input_size, output_size
    

def shuffle_data_and_target(data, target):  ## Shuffle data and target arrays ##
    
    indices = np.arange(data.shape[0])
    np.random.shuffle(indices)

    data = data[indices]
    target = target[indices]
    
    return data, target


def cfmat_extraction(y_test, y_pred, fname): ## Extract confusion matrix and individual movement accuracy, save both to files #
    
    img_dir = '*insert image directory location*'
    
    if y_pred.ndim == 2:
        
        y_pred = np.argmax(y_pred, axis = 1)
        y_test = np.argmax(y_test, axis = 1)
    
    cf_mat = metrics.confusion_matrix(y_test, y_pred)
    
    cf_mat = np.divide(cf_mat,cf_mat.sum(axis=1))
    
    plt.figure()
    
    plt.matshow(cf_mat)
    
    plt.savefig(img_dir + str(fname)+'_cfm1.png', dpi=100)
    
    plt.figure()
    
    sn.heatmap(cf_mat, annot=False)
    
    plt.savefig(img_dir + str(fname)+'_cfm2.png', dpi=100)
    
    indiv_acc = np.ones(cf_mat.shape[0])
    
    for i in range(0,len(cf_mat)):
        
        indiv_acc[i] = cf_mat[i, i] / np.sum(cf_mat[i,:])
    
    np.savetxt(img_dir + str(fname)+'_ia', indiv_acc)
    
    return indiv_acc
        
        
def plot_acc_loss(history, fname):  ## Plot accuracy graph ##

    img_dir = '*insert image directory location*'
    
    plt.figure()
    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.savefig(img_dir + str(fname)+'_accplot.png', dpi=100)
    
    
    plt.figure()
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.savefig(img_dir + str(fname)+'_lossplot.png', dpi=100)
    
    
    return

In [None]:
def create_model(activation, init, neurons): ## create model for hyperparameter tuning ##

    model = tf.keras.Sequential(name='ANN-FF_model')
    model.add(tf.keras.layers.Dense(neurons, input_shape=(720,), activation=activation, name='input_layer')) ## change according to tp
    model.add(tf.keras.layers.Dense(neurons, activation=activation, name='hidden_layer_1'))
    model.add(tf.keras.layers.Dense(neurons, activation=activation, name='hidden_layer_2'))
    model.add(tf.keras.layers.Dense(40, activation='softmax', name='output_layer'))
    
    model.compile(loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

def get_best_hyperparams_ANN(feat_mat, tp): ## tune ANN model ##
    
    x_train, y_train, x_test, y_test, input_size, output_size = extract_sets_random(feat_mat, tp, True)
    
    model = KerasClassifier(model=create_model, verbose=10)

    batch_size = [512, 1024, 2048]
    epochs = [250]
    optimizer = ['adam']
    learning_rate = [0.1, 0.01, 0.001]
    activation = ['relu', 'tanh', 'sigmoid']
    init = ['uniform','normal','zero']
    neurons = [128,256,512]
    
    param_grid=dict(batch_size=batch_size, 
                    epochs=epochs, 
                    optimizer=optimizer,
                    optimizer__learning_rate=learning_rate,
                    model__activation=activation,
                    model__init = init,
                    model__neurons = neurons
                    )
    
    grid = GridSearchCV(estimator=model, 
                        param_grid=param_grid, 
                        n_jobs=-1, 
                        cv=3,
                        verbose=10)
    
    grid_result = grid.fit(x_train, 
                           y_train,
                           batch_size=2048,
                           verbose=10,
                           shuffle=True,
                           validation_freq=1
                            )

    return grid_result.best_params_


def get_best_hyperparams_SVM(feat_mat, tp): ## tune SVM model ##
    
    
    x_train, y_train, x_test, y_test, input_size, output_size = extract_sets_random(feat_mat, tp, False)
     
    kernel = ['linear', 'poly', 'rbf', 'sigmoid']
    C = [0.1, 1, 5, 10, 25, 50, 100]
    gamma = [1, 0.1, 0.01, 0.005, 0.001, 0.0005]
    
    param_grid = dict(kernel=kernel,
                      C=C,
                      gamma=gamma) 
           
    clf = GridSearchCV(estimator=svm.SVC, param_grid=param_grid)
    
    clf.fit(x_train, y_train)
    
    return clf.best_estimator_