<a href="https://colab.research.google.com/github/bermanlabemory/gait_signatures/blob/main/Gait_Signatures_Script_2_Extract_Internal_Parameters.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## This script accesses the trained models in their respective folders from 'Gait Signatures Script 1: Train Model Architectures.ipynb', extracts externally-driven and self-driven Hs and Cs and plots respective predicted kinematics.

**notes:** 
1. Each model's parameters (Hs and Cs) are extracted and used to develop the signatures

**Created by**: Taniel Winner

**Date**: 07/*18*/22

**Step 0**: Mount (connect to) your google drive folder where you want to save the simulation results and model parameters.

In [None]:
from google.colab import drive
drive.mount('/content/drive')
#drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [None]:
# check python version 
from platform import python_version

print(python_version())

3.8.16


In [None]:
# check tensorflow version
import tensorflow as tf
print(tf.__version__)

2.9.2


In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 54.8 gigabytes of available RAM

You are using a high-RAM runtime!


**Step 1**: Import necessary packages and functions to develop model

In [None]:
from keras.models import Sequential
from keras.layers import Dense
from tensorflow.python.keras.layers.recurrent import LSTM
from sklearn.model_selection import train_test_split
import sklearn.model_selection as model_selection
import matplotlib.pyplot as plt
import math
import keras as k
import pandas as pd
import numpy as np
from copy import copy
import scipy.io
from sklearn.decomposition import PCA

from scipy.signal import find_peaks
from scipy import interpolate
from numpy import sin,cos,pi,array,linspace,cumsum,asarray,dot,ones
from pylab import plot, legend, axis, show, randint, randn, std,lstsq

from sklearn.manifold import MDS
import csv
import os
from tqdm import tqdm 

In [None]:
%cd /content/drive/My\ Drive/GaitSignature_Manuscript/

# Ensure fourierseries.py is in the pathway
!ls -l fourierseries.py

/content/drive/My Drive/GaitSignature_Manuscript
-rw------- 1 root root 7259 Apr  4  2022 fourierseries.py


Phaser code developed by Shai Revzen (2018)

This function takes multivariate periodic time series and finds estimates of phase over time.

In [None]:
import fourierseries
import util
import phaser
import dataloader
# Preprocess data for a single subject - to be send to modeling frameworks
def find_phase(k):
    """
    Detrend and compute the phase estimate using Phaser
    INPUT:
      k -- dataframe
    OUTPUT:
      k -- dataframe
    """
    #l = ['hip_flexion_l','hip_flexion_r'] # Phase variables = hip flexion angles
    y = np.array(k)
    print(y.shape)
    y = util.detrend(y.T).T
    print(y.shape)
    phsr = phaser.Phaser(y=y)
    k[:] = phsr.phaserEval(y)[0,:]
    return k

Running phaser


Von Mises interpolation function 
This function is used to:...

In [None]:
def vonMies(t,t_0, b):
    out = np.exp(b*np.cos(t-t_0))/(2*pi*iv(0, b))
    return out

**Step 2**: Load module in Google Drive

In [None]:
# The path to save the models and read data from

### !!! specify folder name the same as in script 1
folder = 'Gait_signatures/'

path = '/content/drive/My Drive/' + folder

# Insert the directory
import sys
sys.path.insert(0,path)

**Step 3**: Load in data and specify variables/parameters

In [None]:
# Non-changing variables 

# number of trials in dataset 
trialnum = 72 # 72 total trials

# number of samples in each trial
trialsamp = 1500

# number of features collected per trial
feats = 6

#Batch size - same as the number of traintrials
batch_size = trialnum

# Number of Layers
numlayers = 1

# Choose the number of iterations to train the model- if this script has been run previously enter a value greater than was 
# inputted before and rerun the script. 
finalepoch = 10000

# load the input data/kinematics
datafilepath = path + 'PareticvsNonP_RNNData.csv' #input data
all_csvnp = np.loadtxt(datafilepath,delimiter=',').T

# reshape all the input data into a tensor
all_inputdata_s = all_csvnp.reshape(trialnum,trialsamp,feats) 

print('original input data shape is: '+ str(all_csvnp.shape ))
print('input data reshaped is: '+ str(all_inputdata_s.shape))

original input data shape is: (108000, 6)
input data reshaped is: (72, 1500, 6)


**Step 4**: List of model architectures and corresponding variables from script 1

In [None]:
# generate a list of models and corresponding parameters to test 
test_model_nodes = [512] 
seqs = [249,499] #lookback parameter

# run multiple model architechtures many times to test stability of cost function outputs
runs = 1 # stability analysis - repeat each model architecture this many times
test_model_seq = np.repeat(seqs, runs)

All_nodes = np.empty([0,1], dtype='int')
All_seq = np.empty([0,1],dtype='int')
All_valseg = np.empty([0,1],dtype='int')
All_trainseg = np.empty([0,1],dtype='int')
All_modelname = []
All_mod_name = []

count = 0; #initialize model run -- this serves as the model run ID number
for a in test_model_nodes:
  for b in test_model_seq:
    if count < runs:
      count = count + 1 
    else: 
      count = 1 # reset counter when all runs of certain model attained
    #if statement for valseg, trainseg based on sequence length
    if int(b) == 249:
      trainseg = 4
      valseg = 2
    elif int(b) == 499: 
      trainseg = 2
      valseg = 1
    elif int(b) == 749:
      trainseg = 1
      valseg = 1

    All_nodes = np.append(All_nodes, a) 
    All_seq = np.append(All_seq, int(b))
    All_valseg = np.append(All_valseg, valseg)
    All_trainseg = np.append(All_trainseg, trainseg)
    All_modelname = np.append(All_modelname, 'run_' + str(count) + '_UNIT_' + str(a) + '_LB_' + str(b) + '/' )
    All_mod_name = np.append(All_mod_name, 'run_' + str(count) + '_UNIT_' + str(a) + '_LB_' + str(b) )

**Step 5**: Access and load trained model architectures in loop, extract the external and self-driven Hs and Cs and develop signatures. 

In [None]:
for j in range(len(All_mod_name)):
    
    newfoldpath = path + All_mod_name[j]

    # extract path to store each model and generated data
    savepath = path + All_modelname[j]

    mod_name = All_mod_name[j]
    newfigpath = os.path.join(savepath, 'Trial_Prediction_Figures/')
    #newfigpath = savepath + 'Model_Prediction_Figures'  # specify folder path to store predicted kinematics figures
    try: # test if model folder already exists 
      if not os.path.exists(os.path.dirname(newfigpath)): # if fig folder does not exist create
          os.makedirs(os.path.dirname(newfigpath)) # create a new fig folder
    except OSError as err:
          print(err)
    
    modnum = j+1 #model number counter for print statement
    print('Working on: ' + mod_name + ' model ' + str(modnum) + ' / ' + str(len(All_mod_name)))
    
    # Specify variables for model run instance

    # Number of Units
    numunits = All_nodes[j]

    # Lookback parameter
    # A number is chosen called the look back parameter where this many samples are used to predict the outputs, calculate error, attain a gradient which is
    # back propagatted to update the weights, then finally the weights are reset to zero. Our lookback parameter is typically 1 less than a divisor of the trial length. 
    # Thus, lookback + 1 should be divisible by the trialsamp. This set up is specifically for training input and output sequences that are one step time shifted (lag = 1)
    # versions of eachother. E.g. trial length = 500 lookback = 99, input = samples 0:99, output = samples 1:100. Since the lookback + 1 (99 + 1) = 100, trial length/(lookback + 1) = 5;
    # thus there are 5 mini-batches of input/output sequences per trial where we can evaluate error (resetting paramters each time).

    lookback = All_seq[j]

    # Training and Validation Set-up
    # Based on the length of the trials and the lookback parameter you can set how many mini-batches would be used for training vs validation. 
    # For example, if the trial length is 1500 and num = lookback+1 = 250, since num can be divided into the trial length 6 times, there would be 6 minibatches.   
    # One can specify that 4 of the mini-batches be used for training: trainseg = 4 corresponding to the 1st 4 mini-batches of the trial and 2 of the mini-batches used
    # for validation: valseg = 2, corresponding to last 2 mini-batches of trial.
    
    # Select the 1st X segments to be training
    trainseg = All_trainseg[j]
    
    # Select the last Y segments to be validation
    valseg = All_valseg[j]

    # load trained model from script 1 to save time re-training model
    model = k.models.load_model(savepath + mod_name + '_bestwhole.h5') 

    # Need to be on GPU to run the following as well:

    # Set the trained model weights to a new model to generate external and self-driven predictions
    model2=k.models.Sequential()
    model2.add(tf.compat.v1.keras.layers.CuDNNLSTM(units = numunits, stateful=True, return_sequences=True, batch_input_shape =(1,None,feats)))
    model2.add(tf.compat.v1.keras.layers.Dense(units=6))
    model2.compile(loss='mse', optimizer='adam',metrics=['accuracy'])

    # Set model 2 weights from model 1
    for l1, l2 in zip(model.layers, model2.layers):
      assert l1.get_config().keys()==l2.get_config().keys()
    for key, v in l1.get_config().items():
        if key=='name':
            continue
        if not v==l2.get_config()[key]:
            print(l1.name, key, 'not matching.', 'Old,new : ', v, l2.get_config()[key], '\n')

    model2.set_weights(model.get_weights())
    assert np.all([np.array_equal(w1, w2) for (w1,w2) in zip(model.get_weights(), model2.get_weights())]) 

    ####  Generate external and self driving signatures for all data
    print('Running self-driven gait signatures')

    in_data = all_inputdata_s
    ext_drive= trialsamp # externally drive for twice the lookback
    total_drive = trialsamp*2  # in self-driving mode the lower node models won't self-drive more than the look-back- they zero out pretty quickly 

    HC_self_concat = np.empty(shape=[0, numunits*2]) # initialize HC storage for self-driving
    HC_ext_concat = np.empty(shape=[0, numunits*2])  # initialize HC storage for external-driving

    Predicted_KIN = np.empty(shape = [len(in_data),total_drive, feats])
    
    test_ind = np.arange(0, len(all_inputdata_s), 1)  # generate kinematic predictions for all (or specified) indices of the input data
    for g in test_ind:
        trialnum = g+1
        print('external and self-driven HCs parsed: ' + str(trialnum) + ' of ' + str(len(test_ind)))
        model2.layers[0].reset_states([np.zeros((1,numunits)), np.zeros((1,numunits))])
        preds = np.zeros((1, total_drive, 6))
        hcs = np.zeros((2, total_drive, numunits))
        for i in tqdm(range(total_drive)):
            if i<ext_drive:
                preds[:,i,:] = model2.predict(in_data[g,i,:][None,None,:])# index the test trial
            else:
                preds[:,i,:] = model2.predict(preds[0,i-1,:][None,None,:])

            hcs[:,i,:] = np.array(model2.layers[0].states)[:,0]

            Hvalues = hcs[0]
            Cvalues = np.tanh(hcs[1])

            Hvalues_self = Hvalues[ext_drive:total_drive]# only extract the selfdriving Hs
            Hvalues_ext = Hvalues[0:ext_drive] # extract external Hs

            Cvalues_self = Cvalues[ext_drive:total_drive]
            Cvalues_ext = Cvalues[0:ext_drive]

            HC_self = np.concatenate((Hvalues_self, Cvalues_self), axis=1) # concatenate Hs and Cs of single trial
            HC_ext =  np.concatenate((Hvalues_ext, Cvalues_ext), axis=1)

        HC_self_concat = np.append(HC_self_concat, HC_self, axis = 0) # concatenate all trials to eachother
        HC_ext_concat = np.append(HC_ext_concat, HC_ext, axis = 0)
        
        prediction = preds[0,:,:] # external and self-driven prediction (external drive samples run from 0:ext_drive, self-driven samples run from ext_drive:ext_drive*2 )
        in_data_trial = in_data[g] # trial reference kinematics

        Predicted_KIN[g] = prediction # store predicted kinematics from each trial in loop
        #np.save(savepath + mod_name + 'trial_' + str(g) + '_predictedkinematics.npy',  prediction) # save each trial of predicted kinematics
        #np.save(savepath + mod_name + 'trial_' + str(g) + '_referencekinematics.npy', in_data_trial) #save each trial of original/reference kinematics

        # Generate figure with predictive kinematics
        fig = plt.figure(figsize=(20,15))
        plt.subplot(711)
        plt.title("Trial: " + str(g), fontsize=14)
        plt.plot(in_data_trial[:ext_drive+(trialsamp-ext_drive),0], "b-", markersize=8, label="Left Hip_input") #original data
        plt.plot(prediction[:,0], "g-", markersize=20, label="Left Hip_predict") #predicted
        plt.axvline(x=ext_drive, color='r', alpha=0.4)
        plt.legend(loc="upper right")
        plt.xlabel("Sample Number")
        plt.ylabel("Angle")

        plt.subplot(712)
        #plt.title("Evaluating Predictions", fontsize=14)
        plt.plot(in_data_trial[:ext_drive+(trialsamp-ext_drive),1], "b-", markersize=8, label="Left Knee_input") #original data
        plt.plot(prediction[:,1], "g-", markersize=20, label="Left Knee_predict") #predicted
        plt.legend(loc="upper right")
        plt.xlabel("Sample Number")
        plt.ylabel("Angle")
        plt.axvline(x=ext_drive, color='r', alpha=0.4)

        plt.subplot(713)
        #plt.title("Evaluating Predictions", fontsize=14)
        plt.plot(in_data_trial[:ext_drive+(trialsamp-ext_drive),2], "b-", markersize=8, label="Left Ank_input") #original data
        plt.plot(prediction[:,2], "g-", markersize=20, label="Left Ank_predict") #predicted
        plt.legend(loc="upper right")
        plt.xlabel("Sample Number")
        plt.ylabel("Angle")
        plt.axvline(x=ext_drive, color='r', alpha=0.4)

        plt.subplot(714)
        #plt.title("Evaluating Predictions", fontsize=14)
        plt.plot(in_data_trial[:ext_drive+(trialsamp-ext_drive),3], "b-", markersize=8, label="Right Hip_input") #original data
        plt.plot(prediction[:,3], "g-", markersize=20, label="Right Hip_predict") #predicted
        plt.legend(loc="upper right")
        plt.xlabel("Sample Number")
        plt.ylabel("Angle")
        plt.axvline(x=ext_drive, color='r', alpha=0.4)

        plt.subplot(715)
        #plt.title("Evaluating Predictions", fontsize=14)
        plt.plot(in_data_trial[:ext_drive+(trialsamp-ext_drive),4], "b-", markersize=8, label="Right Knee_input") #original data
        plt.plot(prediction[:,4], "g-", markersize=20, label="Right Knee_predict") #predicted
        plt.legend(loc="upper right")
        plt.xlabel("Sample Number")
        plt.ylabel("Angle")
        plt.axvline(x=ext_drive, color='r', alpha=0.4)

        plt.subplot(716)
        #plt.title("Evaluating Predictions", fontsize=14)
        plt.plot(in_data_trial[:ext_drive+(trialsamp-ext_drive),5], "b-", markersize=8, label="Right Ank_input") #original data
        plt.plot(prediction[:,5], "g-", markersize=20, label="Right Ank_predict") #predicted
        plt.legend(loc="upper right")
        plt.xlabel("Sample Number")
        plt.ylabel("Angle")
        plt.axvline(x=ext_drive, color='r', alpha=0.4)

        plt.savefig(newfigpath + 'trial_' + str(trialnum) +  '_PredictiveKinematics.png', dpi = 300)
        plt.close(fig)# close figure in loop

    np.save(savepath + mod_name + '_PredictedKinematics_ext_self.npy', Predicted_KIN) # save predicted kinematics (external and self-dricen) from all trials

    SelfDriveHCs = HC_self_concat # extracted self-driven HCs 
    ExternalDriveHCs = HC_ext_concat # extracted externally driven HCs

    #CombinedExternal_Self_HCs = np.concatenate((ExternalDriveHCs,SelfDriveHCs), axis = 0) # all Hs and Cs

    # save these held out HCs 
    np.save(savepath + mod_name + '_selfdriveHCs.npy', SelfDriveHCs)
    scipy.io.savemat(savepath + mod_name + '_selfdriveHCs.mat', {'self_drive_sigs': SelfDriveHCs})

    np.save(savepath + mod_name + '_extdriveHCs.npy', ExternalDriveHCs)
    scipy.io.savemat(savepath + mod_name + '_extdriveHCs.mat', {'ext_drive_sigs': ExternalDriveHCs})
