##

## Workshop Day 2

In [None]:

# Dependencies (pay attention to tf version)
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os

from util import *

import tensorflow as tf
from tensorflow.keras import datasets, layers, models

print(tf.__version__)
np.random.seed(42)
tf.random.set_seed(42)

from sklearn.metrics import ConfusionMatrixDisplay
from sklearn import metrics

# Part 1: Extract data

Extract data of person walking with exoskeleton.
For the purpose of this second experiment we will only observe Walking (both in transparent mode and in State Machine mode).

We create two different df, one for SM one for transparent

In [None]:
df_list = []

# Iterate over all files in the directory
for filename in os.listdir("../data/"):
    if filename.endswith('_ml_dataset_extracted.csv'):
        # Construct full file path
        file_path = os.path.join("../data/", filename)
        # Read CSV file into DataFrame
        df = pd.read_csv(file_path, index_col=0)
        # Append DataFrame to list
        df_list.append(df)

# Concatenate all DataFrames in the list
df = pd.concat(df_list, ignore_index=True)

df_Transparent = df[(df['condition'] == 'transparent_WALKING')& df.step_complete_r]
df_StateMachine = df[(df['condition'] == 'SM_WALKING') & df.step_complete_r]


This time we don't interpolate the input and we also return as output the gait phase [0, 100]. 

In [None]:
def segment_stride(stance_interpolate_factor, joint, segment_length=100):
    """
    Segments gait using stance interpolation factor

    """
    segments_alpha = []
    segments_theta = []
    start_index = 0
    completed_gait = False

    for i in range(1, len(stance_interpolate_factor)):
        if stance_interpolate_factor[i] == 1:
            completed_gait = True
        elif completed_gait and stance_interpolate_factor[i] == 0:
            segments_alpha.append(stance_interpolate_factor[start_index:i])
            segments_theta.append(joint[start_index:i])
            start_index = i
            completed_gait = False


    gait_phases =[]
    segments_alpha_2 = []
    segments_theta_2 = []
    for segment_al, segment_th in zip(segments_alpha, segments_theta):
        x = np.linspace(0, 100, len(segment_al))
        gait_phases.append(x)
        segments_alpha_2.append(segment_al)
        segments_theta_2.append(segment_th)

    return segments_alpha_2, segments_theta_2, gait_phases


# transparent
segmented_joints_transparent ={0:[], 1: [], 2:[], 3:[]}
segmented_velocity_transparent ={0:[], 1: [], 2:[], 3:[]}
segmented_joints_SM= {0:[], 1: [], 2:[], 3:[]}
segmented_velocity_SM= {0:[], 1: [], 2:[], 3:[]}
for i in range(4):
    segmented_alpha_transparent, segmented_joints_transparent[i], gait_phase_transparent = segment_stride( df_Transparent[' StanceInterpolationFactor'].to_numpy(), df_Transparent[' JointPositions_'+str(i+1)].to_numpy())
    _, segmented_velocity_transparent[i], _ = segment_stride( df_Transparent[' StanceInterpolationFactor'].to_numpy(), df_Transparent[' JointVelocities_'+str(i+1)].to_numpy())
    
    segmented_alpha_SM, segmented_joints_SM[i], gait_phase_SM = segment_stride(df_StateMachine[' StanceInterpolationFactor'].to_numpy(), df_StateMachine[' JointPositions_'+str(i+1)].to_numpy())
    _, segmented_velocity_SM[i], _ = segment_stride(df_StateMachine[' StanceInterpolationFactor'].to_numpy(), df_StateMachine[' JointVelocities_'+str(i+1)].to_numpy())

N_STEPS_TRANSPARENT = len(segmented_alpha_transparent)
N_STEPS_SM = len(segmented_alpha_SM)
print("Number steps in transparent:",N_STEPS_TRANSPARENT, "Number steps in SM: ", N_STEPS_SM)

Concatenate all steps

In [None]:
segmented_alpha_transparent = np.concatenate(segmented_alpha_transparent)
gait_phase_transparent = np.concatenate(gait_phase_transparent)
segmented_alpha_SM = np.concatenate(segmented_alpha_SM)
gait_phase_SM = np.concatenate(gait_phase_SM)

for i in range(4):
      segmented_joints_transparent[i] = np.concatenate(segmented_joints_transparent[i])
      segmented_velocity_transparent[i] = np.concatenate(segmented_velocity_transparent[i])
      segmented_joints_SM[i] = np.concatenate(segmented_joints_SM[i])
      segmented_velocity_SM[i] = np.concatenate(segmented_velocity_SM[i])

print(segmented_alpha_transparent.shape, segmented_joints_transparent[0].shape, segmented_joints_transparent[1].shape, segmented_joints_transparent[2].shape,
      segmented_joints_transparent[3].shape, gait_phase_transparent.shape)
print(segmented_alpha_SM.shape, segmented_joints_SM[0].shape, segmented_joints_SM[1].shape,
      segmented_joints_SM[2].shape, segmented_joints_SM[3].shape, gait_phase_SM.shape)

Plot steps (with mean and variance)

In [None]:
fig, axs = plt.subplots(4, figsize=(10, 8))
axs[0].set_title('Left Hip')
axs[1].set_title('Left Knee')
axs[2].set_title('Right hip')
axs[3].set_title('Right knee')

#transparent
for i in range(4):
    axs[i].plot(np.linspace(0, 100, 10000), (segmented_joints_transparent[i][:10000])*180/np.pi, 'g-', label= "Transparent")
    axs[i].plot(np.linspace(0, 100, 10000),  segmented_alpha_transparent[:10000]*20, 'r-')
    axs[i].plot(np.linspace(0, 100, 10000), gait_phase_transparent[:10000]/4, 'm-', label = "Gait Phase Transparent")

    # STate machine
    axs[i].plot(np.linspace(0, 100, 10000), segmented_joints_SM[i][:10000]*180/np.pi, 'b-', label= "State Machine")
    axs[i].plot(np.linspace(0, 100, 10000), segmented_alpha_transparent[:10000]*20, 'r-')
    axs[i].plot(np.linspace(0, 100, 10000), gait_phase_SM[:10000]/4, 'y-', label = "Gait Phase SM")

plt.legend()
plt.show()

# Part 2: Regression of the gait phase


**Exercice 2** Build Neural Network that receives as input the kinematics and the stance interpolation factor and returns as output the gait phase (x coordidinate previous plot)

We will build two models: one trained using transparent data, one using SM data and we will compare their performance on two validation sets (one SM one transparent). 



### Prepare Input / output data

<div>
<img src="imgs/data_CNN.jpg" width="300"/>
</div>

**Inputs:** The model had a data input size of 10 channel values (joint position/velocity) for every time step of 5 ms with a window size of 80 data samples. 

**Exercice** All input and output data is normalized (subtract mean and divide by std). 


In [None]:
# concatenate joint angles and steps
input_transparent= np.array([segmented_joints_transparent[0],
                    segmented_joints_transparent[1],
                    segmented_joints_transparent[2],
                    segmented_joints_transparent[3]])
# concatenate to input the joint velocity
joint_velocity = np.array([segmented_velocity_transparent[0],
                    segmented_velocity_transparent[1],
                    segmented_velocity_transparent[2],
                    segmented_velocity_transparent[3]])
input_transparent = np.concatenate((input_transparent, joint_velocity, [segmented_alpha_transparent]), 0)

# normalize input data
mean= np.mean(input_transparent, 1)[:, np.newaxis]
std = np.std(input_transparent, 1)[:, np.newaxis]
input_transparent= (input_transparent ## TODO##
print("Transparent")
print("Input Shape ", input_transparent.shape)
print("input mean: ", mean.transpose())
print("input std: ", std.transpose())

# concatenate joint angles and steps
input_SM= np.array([segmented_joints_SM[0],
                    segmented_joints_SM[1],
                    segmented_joints_SM[2],
                    segmented_joints_SM[3]])
# concatenate to input the joint velocity
joint_velocity = np.array([segmented_velocity_SM[0],
                    segmented_velocity_SM[1],
                    segmented_velocity_SM[2],
                    segmented_velocity_SM[3]])
input_SM = np.concatenate((input_SM, joint_velocity, [segmented_alpha_SM]), 0)

# normalize input data
mean= np.mean(input_SM, 1)[:, np.newaxis]
std = np.std(input_SM, 1)[:, np.newaxis]
input_SM= (input_SM ## TODO ##
print("State Machine")
print("Input Shape ", input_SM.shape)
print("input mean: ", mean.transpose())
print("input std: ", std.transpose())


Prepare Output

In [None]:
output_transparent = np.array(gait_phase_transparent)/100.0 # created normalized
print("Output transparent shape ", output_transparent.shape)

output_SM = gait_phase_SM/100.0 # created normalized
print("Output SM shape ", output_SM.shape)

Divide data in time windows: save one value every 5ms, since data is recorded with a frequency of 333Hz the data must be reshaped to have a frequency of 200Hz

**EX:** Change the size of data to adapt it to the size of the new frequency

In [None]:
import scipy.signal as signal

def divide_time_windows(input, output, window_length, step):
    divided_input = []
    divided_output = []

    # downsampling
    ORIGINAL_FREQUENCY = 333 #Hz
    DESIRED_FREQUENCY = 200
    num_samples = int(input.shape[1] ## TODO###

    input = signal.resample(input, num_samples, axis=1)
    output = signal.resample(output, num_samples)
    output[output > 1] = 1
    output[output < 0] = 0

    # divide in window
    for i in range(window_length, input.shape[1], step):
        divided_input.append(input[:, i-window_length: i])
        divided_output.append(output[i])

    return np.array(divided_input), np.array(divided_output)

In [None]:
WINDOW_LENGTH = 80 #check frequency
STEP = 3
input_transparent, output_transparent = divide_time_windows(input_transparent, output_transparent, WINDOW_LENGTH, STEP)
print("Data transparent Shape: ", input_transparent.shape, output_transparent.shape)

input_SM, output_SM = divide_time_windows(input_SM, output_SM, WINDOW_LENGTH, STEP)
print("Data SM Shape: ", input_SM.shape, output_SM.shape)

Divide training set and output set in training and validation set (70%, 30%).

In [None]:
train_input_transparent = np.swapaxes(input_transparent[:int(input_transparent.shape[0]*0.7), :, :], 1, 2)
train_output_transparent = output_transparent[:int(input_transparent.shape[0]*0.7)]
val_input_transparent =  np.swapaxes(input_transparent[int(input_transparent.shape[0]*0.7):, :, :], 1, 2)
val_output_transparent = output_transparent[int(input_transparent.shape[0]*0.7):]

print("Transparent ")
print("Training set shape: ", train_input_transparent.shape, train_output_transparent.shape)
print("Validation set shape: ", val_input_transparent.shape, val_output_transparent.shape)

train_input_SM = np.swapaxes(input_SM[:int(input_SM.shape[0]*0.7), :, :], 1, 2)
train_output_SM = output_SM[:int(input_SM.shape[0]*0.7)]
val_input_SM =  np.swapaxes(input_SM[int(input_SM.shape[0]*0.7):, :, :], 1, 2)
val_output_SM = output_SM[int(input_SM.shape[0]*0.7):]

print("State Machine")
print("Training set shape: ", train_input_SM.shape, train_output_SM.shape)
print("Validation set shape: ", val_input_SM.shape, val_output_SM.shape)

### Create Model:

<div>
<img src="imgs/models.jpg" width="700"/>
</div>

The model had a data input size of 9 channel values (left/right hip, knee position/velocity and alpha) for every time step of 5 ms with a window size of 80 data samples (80*9). The first layer consisted of a 1D convolution of 10 filters striding temporally, where each filter had a kernel size of 20. The third layer was another 1D convolution of 10 filters, where each filter had a kernel size of the previous layer’s window size and compressed the data into a single row vector (representing the extracted feature information). A sigmoid activation function was applied to this vector and passed the data to two fully connected dense layers with relu activation function (20 nodes the first 10 the second). The final dense layer of the network output two dimensions (to describe the periodic nature of the gait phase we represent the output as the sin and cos of the gait phase) and uses a tanh transfer function

In [None]:
model_transparent = models.Sequential()
model_transparent.add(layers.GaussianNoise(0.01, input_shape=(80,9)))
model_transparent.add(layers.Conv1D(## TODO##
model_transparent.add(layers.Conv1D(10, ## TODO, activation = 'sigmoid'))
model_transparent.add(layers.Flatten())
model_transparent.add(layers.Dense(## TODO ##
model_transparent.add(## TODO ##
model_transparent.add(layers.Dense(2, activation='tanh'))

model_transparent.summary()

model_SM = models.Sequential()
model_SM.add(layers.GaussianNoise(0.01, input_shape=(80,9)))
model_SM.add(layers.Conv1D(## TODO##
model_SM.add(layers.Conv1D(10, ## TODO##, activation = 'sigmoid'))
model_SM.add(layers.Flatten())
model_SM.add(layers.Dense(## TODO##
model_SM.add(## TODO##, activation = 'relu'))
model_SM.add(layers.Dense(2, activation=## TODO##))

**Loss function:** To ensure that the gait phase error was evaluated accurately during the transition from one gait cycle to the next, where 100 and 0 represent the same value, we used an angular similarity metric by computing the cosine and sin of the sine of the gait phase and we compared it the predicted two outputs. The root mean square error (RMSE) of was computed between them.

In [None]:
def loss_function(y_true, y_pred):
    gait_phase_cos_error =## TODO##(tf.reduce_mean(tf.square(y_pred[:, 0] - tf.## TODO ##(y_true*2*np.pi))))
    gait_phase_sin_error = tf.sqrt(tf.reduce_mean(tf.square(## TODO## - ## TODO ##)))
    return gait_phase_cos_error + gait_phase_sin_error


### Train the two models

**Training:** The CNN was trained for 15 epochs using a Adam optimizer with a learning rate of 0.001 

In [None]:
optimizer_transparent = tf.keras.optimizers.Adam(learning_rate=## TODO##)

model_transparent.compile(loss=loss_function,
          optimizer=optimizer_transparent,
          metrics=['accuracy'])

history = model_transparent.fit(train_input_transparent, train_output_transparent, epochs=## TODO##,
                    validation_data=(val_input_transparent, val_output_transparent))


In [None]:
optimizer_SM = tf.keras.optimizers.Adam(learning_rate=## TODO##)

model_SM.compile(loss=loss_function,
          optimizer=optimizer_SM,
          metrics=['accuracy'])

history = model_SM.fit(train_input_SM, train_output_SM, epochs=## TODO##,
                    validation_data=(val_input_SM, val_output_SM))

### Validation models

Validate the two models with the two validation sets to evaluate the generalization ability, 4 combinations:
- Model trained with transparent data <--> tested on transparent data
- Model trained with transparent data <--> tested on SM data
- Model trained with SM data <--> tested on transparent data
- Model trained with SM data <--> tested on SM data

In [None]:
y_pred_trans_trans = model_transparent.predict(val_input_transparent)
y_pred_trans_SM = model_transparent.predict(val_input_SM)
y_pred_SM_SM = model_SM.predict(val_input_SM)
y_pred_SM_trans = model_SM.predict(val_input_transparent)

**Exercice** Create a function that given the output in (cos, sin) transform it in the gait phase: $$arctan(2\pi cos / 2\pi sin)mod(2\pi)/2\pi$$. 

Function np.arctan2 takes as input (sin, cos)

In [None]:
def transform_output(y_pred):
    cos = y_pred[:, 0]
    sin = y_pred[:, 1]
    return (np.arctan2(## TODO ##, ## TODO ##) % (## TODO ##)) /(## TODO ##)

Visualize the output of the two models trying to predicting the validation set

In [None]:

fig, axs = plt.subplots(2, figsize=(10, 8))
axs[0].plot(transform_output(y_pred_trans_trans[:2000]), color="b",  label = "Trans model prediction")
axs[0].plot(transform_output(y_pred_SM_trans[:2000]), color="g", label = "SM model prediction")
axs[0].plot(val_output_transparent[:2000], color='r', label="Transparent gt data")
axs[0].legend()

axs[1].plot(transform_output(y_pred_trans_SM[:2000]), color= "b", label = "Trans model prediction")
axs[1].plot(transform_output(y_pred_SM_SM[:2000]), color ="g", label = "SM model prediction")
axs[1].plot(val_output_SM[:2000], color='r',  label="SM gt data")

axs[1].legend()
plt.show()

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

trans_trans_error = np.sqrt(np.mean(np.square(transform_output(y_pred_trans_trans) - val_output_transparent)))
trans_SM_error = np.sqrt(np.mean(np.square(transform_output(y_pred_trans_SM) - val_output_SM)))
SM_trans_error = np.sqrt(np.mean(np.square(transform_output(y_pred_SM_trans) - val_output_transparent)))
SM_SM_error = np.sqrt(np.mean(np.square(transform_output(y_pred_SM_SM) - val_output_SM)))

confusion_matrix = np.array([[trans_trans_error, trans_SM_error],
                             [SM_trans_error, SM_SM_error]])

fig, ax = plt.subplots()
disp = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=["transp", "SM"])
disp.plot(ax=ax, cmap='viridis')
plt.xlabel('Validation data')
plt.ylabel('Training data')
plt.title("Regression loss depending by model and data")
plt.show()

Discuss with the rest of the group what this performances means. In particular, the values at the top-right and botton-left.

Save model

In [None]:
import keras

model_transparent.export("../models/transparent/")
model_SM.export("../models/SM/")


Send the two created models as zip file with the following format: *transparent(SM)_groupNumber* at the following email adress: lvianello@sralab.org

# Part 3: Locomotion mode classification
- transparent_WALKING, SM_WALKING --> 1
- SM_RAMPS_DOWN, transparent_RAMPS_DOWN --> 3
- SM_RAMPS_UP, transparent_RAMPS_UP --> 2
- SM_STAIRS_UP, transparent_STAIRS_UP --> 4
- SM_STAIRS_DOWN, transparent_STAIRS_DOWN --> 5


In [None]:
df_locomotion_modes = df[df.step_complete_r]
print(np.unique(df_locomotion_modes['condition']))
print(np.unique(df_locomotion_modes['condition_encoded']))
CLASSES = ["Walking", "Ramp\nAsc", "Ramp\nDesc", "Stairs\nAsc", "Stair\nDesc"]

Plot conditions over time 

In [None]:
plt.plot(df_locomotion_modes['condition_encoded'], label = "condition encoded")
plt.legend()
plt.show()

Prepare dataset (input same as before, joint position and velocity, stance interpol), output the 5 classes

In [None]:
# concatenate joint angles and steps
joint_position_lm= np.array([df_locomotion_modes[' JointPositions_1'],
                    df_locomotion_modes[' JointPositions_2'],
                    df_locomotion_modes[' JointPositions_3'],
                    df_locomotion_modes[' JointPositions_4']])
# concatenate to input the joint velocity
joint_velocity_lm = np.array([df_locomotion_modes[' JointVelocities_1'],
                    df_locomotion_modes[' JointVelocities_2'],
                    df_locomotion_modes[' JointVelocities_3'],
                    df_locomotion_modes[' JointVelocities_4']])
input_lm = np.concatenate((joint_position_lm, joint_velocity_lm, [df_locomotion_modes[' StanceInterpolationFactor']]), 0)

# normalize input data
mean= np.mean(input_lm, 1)[:, np.newaxis]
std = np.std(input_lm, 1)[:, np.newaxis]
input_lm= (input_lm - mean) / std
print("Input Shape ", input_lm.shape)
print("input mean: ", mean.transpose())
print("input std: ", std.transpose())

output_lm = np.array(np.eye(5)[df_locomotion_modes['condition_encoded']- 1]) # created normalized and convert to categorical output
print("Output locomotion mode classification shape ", output_lm.shape)


WINDOW_LENGTH = 80 #check frequency
STEP = 3
input_lm, output_lm = divide_time_windows(input_lm, output_lm, WINDOW_LENGTH, STEP)
print("Data Shape: ", input_lm.shape, output_lm.shape)


train_input_lm = np.swapaxes(input_lm[:int(input_lm.shape[0]*0.7), :, :], 1, 2)
train_output_lm = output_lm[:int(input_lm.shape[0]*0.7)]
val_input_lm =  np.swapaxes(input_lm[int(input_lm.shape[0]*0.7):, :, :], 1, 2)
val_output_lm = output_lm[int(input_lm.shape[0]*0.7):]

print("Training set shape: ", train_input_lm.shape, train_output_lm.shape)
print("Validation set shape: ", val_input_lm.shape, val_output_lm.shape)

**Exercice** Create the model (similar to the previous one but this time remember that the output must be categorical (value 0 or 1) and should have 5 dimensions), some suggestions:
- suggested activation function :softmax
- as loss function a common one is CategoricalCrossentropy (no need to design one by your self 'tf.keras.losses.CategoricalCrossentropy()')


In [None]:
model_lm = models.Sequential()
model_lm.add(##TODO##)
##TODO##
model_lm.add(layers.Dense(5, activation=##TODO##))

#model_lm.summary()

optimizer_lm = tf.keras.optimizers.Adam(learning_rate=0.001)

model_lm.compile(loss=## TODO##,
          optimizer=optimizer_lm,
          metrics=['accuracy'])

history = model_lm.fit(train_input_lm, train_output_lm, epochs=15,
                    validation_data=(val_input_lm, val_output_lm))


Predict validation set for evaluation of the model

In [None]:
predicted_values = model_lm.predict(val_input_lm)

plt.plot(np.argmax(predicted_values, 1), label= "Prediction")
plt.plot(np.argmax(val_output_lm, 1), label= "GT")
plt.legend()

Plot prediction probability (between 0 and 1) and max values

In [None]:
fig, axs = plt.subplots(2, figsize=(10, 8))

axs[1].plot(predicted_values[:, 0], label= "Walking prediction probability")
axs[1].plot(predicted_values[:, 1], label= "Ramps Asc prediction probability")
axs[1].plot(predicted_values[:, 2], label= "Ramps Desc prediction probability")
axs[1].plot(predicted_values[:, 3], label= "Stair Asc prediction probability")
axs[1].plot(predicted_values[:, 4], label= "Stair Desc prediction probability")

axs[0].plot(np.argmax(predicted_values, 1), label= "Max value Prediction")
axs[0].plot(np.argmax(val_output_lm, 1), label= "GT")

axs[0].legend()
axs[1].legend()
plt.legend()

Display confusion matrix

In [None]:
confusion_matrix = metrics.confusion_matrix(np.argmax(val_output_lm, 1), np.argmax(predicted_values, 1))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = CLASSES)
cm_display.plot()
plt.show()

# Part 4: Real time prediction using tensorflow lite