Code for training smaller models on stationary signals described by Experiment 2.

CNN model trained on full non-stationary training set.

5 times for cross-validation.

In [1]:
import numpy as np
import matplotlib.pyplot as plt 
import random
import tensorflow as tf
import tensorflow.keras.layers as tfl
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, LambdaCallback
from tensorflow.keras.models import Sequential, load_model, Model
from scipy.signal import find_peaks

In [2]:
#Function two shuffle 2 arrays in the same way
#Aquired from: https://stackoverflow.com/questions/4601373/better-way-to-shuffle-two-numpy-arrays-in-unison
def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

In [3]:
#Recieves data and applies z-score standardisation on all channels
def standardise(stored_data):
    scaler = StandardScaler()
    standard_stored_data = scaler.fit_transform(stored_data)
    return standard_stored_data

In [4]:
#Following code was made by adapting original code by Wen et. al (2021)
#Provided in their paper: "A convolutional neural network to identify motor units
#from high-density surface electromyography signals inreal time"
#Original code can be found here: https://github.com/ywen3/dcnn_mu_decomp/blob/main/hdEMG_DCNN.ipynb

#Used for calculating RoA during model training

def RoA_m(y_true, y_pred):
    threshold = 3*tf.math.reduce_std(y_pred)
    y_pred_binary = tf.where(y_pred>=threshold, 1., 0.)
    y_comp = y_pred_binary + y_true
    true_positives = tf.shape(tf.where(y_comp == 2))[0]
    unmatched = tf.shape(tf.where(y_comp == 1))[0]
    return true_positives/(true_positives + unmatched)


class AccuracyCallback(Callback):
    def __init__(self, metric_name = 'accuracy'):
        super().__init__()
        self.metric_name = metric_name
        self.val_metric = []
        self.metric = []
        self.val_metric_mean = 0
        self.metric_mean = 0
        self.best_metric = 0
        
    def on_epoch_end(self, epoch, logs=None):
#         print('Accuracycallback')
        # extract values from logs
        self.val_metric = []
        self.metric = []
        for log_name, log_value in logs.items():
            if log_name.find(self.metric_name) != -1:
                if log_name.find('val') != -1:
                    self.val_metric.append(log_value)
                else:
                    self.metric.append(log_value)

        self.val_metric_mean = np.mean(self.val_metric)
        self.metric_mean = np.mean(self.metric)
        logs['val_{}'.format(self.metric_name)] = np.mean(self.val_metric)   # replace it with your metrics
        logs['{}'.format(self.metric_name)] = np.mean(self.metric)   # replace it with your metrics

In [5]:
#Calculate the RoA given model predictions and true labels Y
def singleModelRoA(predictions, Y):
    y_pred = tf.squeeze(predictions)
    threshold = 3*np.std(y_pred,axis = 1)
    match = 0
    unmatch = 0
    for MU in range(len(Y)):
        pred_spikes, _ = find_peaks(y_pred[MU], height = threshold[MU], distance = 2)
        true_spikes = tf.squeeze(tf.where(np.array(Y)[MU] == 1))
        a = set(true_spikes.numpy())
        b = set(pred_spikes)
        matches = len(a.intersection(b))
        unmatched1 = a - b
        unmatched2 = b - a
        tolerance = len([x for x in unmatched1 if (x+1 in unmatched2 or x-1 in unmatched2)])
        match = match + matches + tolerance
        unmatch = unmatch + len(unmatched1) + len(unmatched2) - (2*tolerance)
    return match/(match + unmatch)

In [6]:
#Function for standardising and windowing training signal
def windowtrain(EMGtrain, spiketrain, window_size):
    
    EMGtrain = standardise(EMGtrain)
    x_train = []
    y_train = []
    for i in range(30,EMGtrain.shape[0]-120):
        
        if any(spiketrain[i, 0:5] == 1):
            x_train.append(EMGtrain[i-10:i+(window_size-10),:])
            y_train.append(spiketrain[i, 0:5])
        else:
            if random.uniform(0, 1) < .05:
                x_train.append(EMGtrain[i-10:i+(window_size-10),:])
                y_train.append(spiketrain[i, 0:5])
    y_train = np.array(y_train)
    y_train2 = []
    for i in range(5):
        y_train2.append(y_train[:,i])
    
    return np.array(x_train), y_train2

In [7]:
#Function for windowing test signal, predictions are recieved 
#from the model in batches to limit memory issues
def windowtest(EMGtrain, spiketrain, window_size, model):
    
    EMGtrain = standardise(EMGtrain)
    x_train = []
    y_train = []
    predictions = []
    count = 1
    for i in range(30,EMGtrain.shape[0]-120):
        x_train.append(EMGtrain[i-10:i+(window_size-10),:])
        y_train.append(spiketrain[i, 0:5])
        if count%8162 == 0:
            predictions.append(model(np.array(x_train)))
            x_train = []
        count = count + 1
            
    y_train = np.array(y_train)
    y_train2 = []
    for i in range(5):
        y_train2.append(y_train[:,i])
    
    return tf.concat(predictions, axis = 1), y_train2

In [8]:
#Outputs a CNN model that recieves a HD-sEMG signal window as input and outputs
#a 0 or 1 label based on wether the respective MU it has been trained to detect
#is present in the signal. The number of these models in parallel can be given by MUs
def convolutional_model(input_shape, filter_num, filter_size, dense_num, MUs):
    input_signal = tf.keras.Input(shape = input_shape)
    out = []
    for i in range(1, MUs+1):
        X = tfl.Conv1D(filter_num, filter_size, activation = 'relu')(input_signal)
        X = tfl.Dropout(0.2)(X)
        X = tfl.Flatten()(X)
        X = tfl.Dense(dense_num, activation='relu')(X)
        X = tfl.Dropout(0.5)(X)
        output = tfl.Dense(1, activation = 'sigmoid', name='output_{}'.format(i))(X)
        out.append(output)
    
    model = tf.keras.Model(inputs = input_signal, outputs = out)
    return model

In [9]:
#Builds and compiles CNN model based on given parameters
def build_model(window_size, filter_num, filter_size, dense_num, MUs):
    model= convolutional_model((window_size, 192), filter_num, filter_size, dense_num, MUs)
    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=0.001),
                  loss = tf.keras.losses.BinaryCrossentropy(from_logits=False),
                  metrics = ['accuracy',RoA_m])
    return model

In [10]:
#Get stationary training set for training seperate models to be used in ensemble
def getSmallTrainingSetExp2(training_noise, window_size, folder_name):  
    training_folds = [1,2,3,4,5]
    print(training_noise)
    iter = False
    for fold in training_folds:
        EMGtrain = np.load('{}/{}_fold{}_x.npy'.format(folder_name, training_noise, fold))
        spikes = np.load('{}/{}_fold{}_y.npy'.format(folder_name, training_noise, fold))
        fold_windows , fold_spikes = windowtrain(EMGtrain, spikes, window_size)
        if iter == False:
            X = fold_windows
            Y = np.array(fold_spikes)
        else:
            X = np.concatenate((X, fold_windows), axis = 0)
            Y = np.concatenate((Y, np.array(fold_spikes)), axis = 1)
        iter = True
    return X, Y

In [16]:
#train models for the 5 different variations of training sets during
#5-fold cross validation
def trainModels(var, noises):
    random.seed(60)
    for training_noise in noises:
        X, Y_raw = getSmallTrainingSetExp2(training_noise, 60, '{}_data'.format(var))
        X, Y_shuffled = unison_shuffled_copies(X, np.array(Y_raw).T)
        Y = []
        for i in range(len(Y_raw)):
            Y.append(Y_shuffled[:,i])
        conv_model = build_model(60, 16, 4, 16, 5)
        mc_vR= ModelCheckpoint('{}_models_exp2/bestvR_{}trained.h5'.format(var, training_noise), monitor='val_RoA_m', mode='max', verbose=0, save_best_only=True)
        RoA_callback = AccuracyCallback('RoA_m')
        history = conv_model.fit(X,
                                 Y,
                                 shuffle = True,
                                 epochs = 100,
                                 validation_split=0.2,
                                 batch_size=256,
                                 verbose = 0,
                                 callbacks = [RoA_callback, mc_vR])
        conv_model.save('{}_models_exp2/final_{}trained.h5'.format(var, training_noise))
        print('Small model noise {} trained'.format(training_noise))
    return None

In [None]:
var = 'noise'
noises = ['20dB','15dB','10dB','5dB','0dB']
trainModels(var,noises)

In [None]:
var = 'MUs'
noises = ['10MUs','20MUs','30MUs','40MUs','50MUs']
trainModels(var,noises)

In [None]:
var = 'lowpass'
noises = ['500Hz', '300Hz', '200Hz', '150Hz', '125Hz']
trainModels(var,noises)

In [None]:
var = 'shift'
noises = ['0mm','2mm','4mm','6mm','8mm']
trainModels(var,noises)