# ***Hyperparameter optimization for RNN parameters***

In [None]:
# ------------------------------------------------------------------------------
# Download Helper and Data Files
# ------------------------------------------------------------------------------
output = "helper_and_data_files.zip"
use_gdrive = False
if use_gdrive:
    # Download the zip file from Google Drive
    import gdown
    url = "https://drive.google.com/uc?id=1JJ9pFGGlpWIVFgDudmcquVuTdtGvASvq"
    gdown.download(url, output, quiet=False)
else:
    # Download via Git-LFS
    !curl -s https://packagecloud.io/install/repositories/github/git-lfs/script.deb.sh | sudo bash
    !sudo apt-get install git-lfs
    !git lfs install
    !git clone https://github.com/epfl-lasa/sliding-ds-control.git
    !ln -s /content/sliding-ds-control/compliant_bumper/rnn_training/helper_and_data_files.zip .

# Extract zip file
import zipfile
with zipfile.ZipFile(output, "r") as zip_ref:
    zip_ref.extractall(".")

    
# ------------------------------------------------------------------------------
# Imports
# ------------------------------------------------------------------------------
import pickle
from loadmat import loadmat
import numpy as np
from scipy.signal import decimate
import IPython

%tensorflow_version 2.x
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

!pip install -q keras-tuner
from keras_tuner.tuners import Hyperband, BayesianOptimization

%matplotlib notebook
import matplotlib.pyplot as plt

## **Load data files for training and testing**

**Training Dataset**

| File No.  | Description                                       |
|----------:|---------------------------------------------------|
| 1         | Force variations at all rows of the bumper        |
| 2         | Slow force variations at all rows of the bumper   |
| 8         | Impacts at top row of the bumper                  |
| 9         | Impacts at middle & bottom row of the bumper      |
| 10        | Force variations at all rows of the bumper        |
| 11        | Force variations at all rows of the bumper        |

**Testing Dataset**

| File No.  | Description                                       |
|----------:|---------------------------------------------------|
| 2         | Slow force variation                              |
| 4         | High force impacts                                |
| 5         | Low force impacts                                 |
| 6         | Random force inputs (No impacts)                  |
| 7         | Pull dataset                                      |

In [None]:
train_files = ["calibration_struct{:d}.mat".format(i) for i in [1, 8, 9, 2, 10, 11]]
test_files  = ["calibration_struct{:d}.mat".format(i) for i in [2, 4, 6, 7, 5]]
n_history = 5

tuner_type = "hyperband"
# tuner_type = "bayesian"

# ------------------------------------------------------------------------------
# Lambda functions for data extractions
# ------------------------------------------------------------------------------
get_X = lambda tmp: np.vstack((
    tmp["Fx_measured"],
    tmp["Fy_measured"],
    tmp["Fz_measured"],
    tmp["Mx_measured"],
    tmp["My_measured"],
    tmp["Mz_measured"],
)).T

get_y = lambda tmp: np.vstack((
    tmp["Fx_applied"],
    tmp["Fy_applied"],
    tmp["Mz_applied"],
)).T

# ------------------------------------------------------------------------------
# Load Data
# ------------------------------------------------------------------------------
X = np.empty((0,6))
y = np.empty((0,3))
for filename in train_files:
    print(f"Loading {filename} ...")
    data = loadmat(filename)
    data = data["training_data_struct"]
    for test in data.keys():
        X = np.vstack((X, get_X(data[test])))
        y = np.vstack((y, get_y(data[test])))

X_test = {}
y_test = {}
for filename in test_files:
    print(f"Loading {filename} ...")
    data = loadmat(filename)
    data = data["training_data_struct"]
    X_temp = np.empty((0,6))
    y_temp = np.empty((0,3))
    for test in data.keys():
        X_temp = np.vstack((X_temp, get_X(data[test])))
        y_temp = np.vstack((y_temp, get_y(data[test])))
    X_test[filename[:-4]] = X_temp
    y_test[filename[:-4]] = y_temp

# ------------------------------------------------------------------------------
# Downsample from 400Hz to 200 Hz (realworld control frequency)
# ------------------------------------------------------------------------------
X = decimate(X, 2, axis=0)
y = decimate(y, 2, axis=0)
for filename in test_files:
    X_test[filename[:-4]] = decimate(X_test[filename[:-4]], 2, axis=0)
    y_test[filename[:-4]] = decimate(y_test[filename[:-4]], 2, axis=0)
    
# ------------------------------------------------------------------------------
# Stack training data for RNN training
# ------------------------------------------------------------------------------
X_train = np.hstack([
    X[i:len(X)-n_history+i] for i in range(n_history+1)
])
y_train = y[n_history:]

print("Training Data with history: X =", X_train.shape, "; y =", y_train.shape)

In [None]:
# ------------------------------------------------------------------------------
# Model Defination
# ------------------------------------------------------------------------------
def build_model(hp):
    inputs = keras.Input(shape=(X_train.shape[1],))

    x = layers.Reshape((n_history+1, 6))(inputs)
    x = layers.LSTM(hp.Int('units_LSTM', min_value = 4, max_value = 64, step = 4),
                    activation="tanh",
                    recurrent_activation="sigmoid")(x)
    
    x_Fx = x
    for i in range(hp.Int('layers_Fx', min_value = 1, max_value = 4, step = 1)):
        x_Fx = layers.Dense(
            hp.Int('units_Fx_' + str(i), min_value = 4, max_value = 32, step = 2),
            activation=hp.Choice('act_Fx_' + str(i), ["relu", "sigmoid", "tanh"])
        )(x_Fx)
    output_Fx = layers.Dense(1, name="Fx")(x_Fx)

    x_Fy = x
    for i in range(hp.Int('layers_Fy', min_value = 1, max_value = 4, step = 1)):
        x_Fy = layers.Dense(
            hp.Int('units_Fy_' + str(i), min_value = 4, max_value = 32, step = 2),
            activation=hp.Choice('act_Fy_' + str(i), ["relu", "sigmoid", "tanh"])
        )(x_Fy)
    output_Fy = layers.Dense(1, name="Fy")(x_Fy)

    x_Mz = x
    for i in range(hp.Int('layers_Mz', min_value = 1, max_value = 4, step = 1)):
        x_Mz = layers.Dense(
            hp.Int('units_Mz_' + str(i), min_value = 4, max_value = 32, step = 2),
            activation=hp.Choice('act_Mz_' + str(i), ["relu", "sigmoid", "tanh"])
        )(x_Mz)
    output_Mz = layers.Dense(1, name="Mz")(x_Mz)

    model = keras.Model(
        inputs=inputs,
        outputs=[output_Fx, output_Fy, output_Mz],
        name="Bumper",
    )

    model.compile(
        loss=tf.keras.losses.Huber(),
        # loss=keras.losses.MeanSquaredError(),
        loss_weights=[1.0, 1.0, 1.0],
        optimizer=keras.optimizers.RMSprop(),
        metrics=[tf.keras.metrics.MeanAbsoluteError()],
    )
    return model

# Clear Output fix
class ClearTrainingOutput(tf.keras.callbacks.Callback):
    def on_train_end(*args, **kwargs):
        IPython.display.clear_output(wait = True)

# ------------------------------------------------------------------------------
# Setup Tuner
# ------------------------------------------------------------------------------
if tuner_type == "hyperband":
    tuner = Hyperband(
        build_model,
        objective="val_loss", 
        max_epochs=20,
        executions_per_trial=3,
        directory='nn_hyperparam',
        project_name='Bumper_hp_hyperband', 
        overwrite=True,
    )
elif tuner_type == "bayesian":
    tuner = BayesianOptimization(
        build_model,
        objective="val_loss", 
        max_trials=200,
        num_initial_points=10,
        executions_per_trial=5,
        directory='nn_hyperparam',
        project_name='Bumper_hp_bayesian',
        overwrite=True,
    )

# ------------------------------------------------------------------------------
# Run Tuner
# ------------------------------------------------------------------------------
tuner.search(X_train, [y_train[:,i] for i in range(3)],
             validation_split=0.2,
             batch_size=2048,
             epochs=50,
             verbose=2,
             callbacks=[ClearTrainingOutput()])

# ------------------------------------------------------------------------------
# Save tuner
# ------------------------------------------------------------------------------
with open(f"nn_hyperparam/{tuner_type}_tuner.pkl", "wb") as f:
    pickle.dump(tuner, f)

In [None]:
# ------------------------------------------------------------------------------
# Get the optimal hyperparameters
# ------------------------------------------------------------------------------
with open(f"nn_hyperparam/{tuner_type}_tuner.pkl", "rb") as f:
    tuner = pickle.load(f)
best_hps = tuner.get_best_hyperparameters(num_trials=10)

string = """
================================================================================
    units_LSTM = {units_LSTM}
    --------------------------------------------
    layers_Fx = {layers_Fx}
        0 => {units_Fx_0:2d} , {act_Fx_0}
        1 => {units_Fx_1:2d} , {act_Fx_1}
        2 => {units_Fx_2:2d} , {act_Fx_2}
        3 => {units_Fx_2:2d} , {act_Fx_3}
    --------------------------------------------
    layers_Fy = {layers_Fy}
        0 => {units_Fy_0:2d} , {act_Fy_0}
        1 => {units_Fy_1:2d} , {act_Fy_1}
        2 => {units_Fy_2:2d} , {act_Fy_2}
        3 => {units_Fy_3:2d} , {act_Fy_3}
    --------------------------------------------
    layers_Mz = {layers_Mz}
        0 => {units_Mz_0:2d} , {act_Mz_0}
        1 => {units_Mz_1:2d} , {act_Mz_1}
        2 => {units_Mz_2:2d} , {act_Mz_2}
        3 => {units_Mz_3:2d} , {act_Mz_3}
"""

for hps in best_hps:
    if hps.values['units_LSTM'] < 30:
        print(string.format(**hps.values))

In [None]:
# ------------------------------------------------------------------------------
# Show results from best hyperparameter model
# ------------------------------------------------------------------------------
with open(f"nn_hyperparam/{tuner_type}_tuner.pkl", "rb") as f:
    tuner = pickle.load(f)
best_model = tuner.get_best_models(num_models=10)[1]

stats = "| {:^10s} | {:^10s} | {:^10s} | {:^10s} | {:^10s} | {:^10s} | {:^10s} |\n".format(
    "Test Name", "Force Name", "mean", "std", "min", "max", "rms"
)
stats += "| {:^10s} | {:^10s} | {:^10s} | {:^10s} | {:^10s} | {:^10s} | {:^10s} |\n".format(*["---"]*7)
stats_format = "| {:^10s} | {:^10s} | {mean:>10.4f} | {std:>10.4f} | {min:>10.4f} | {max:>10.4f} | {rms:>10.4f} |\n"
stats_dict = {}
err_dict = {}

out_name = ["Fx", "Fy", "Mz"]

y_est = best_model.predict(X_train)
stats_dict["Train"] = {}
err_dict["Train"] = {}
for i in range(2):
    err = (y_train[:,i] - y_est[i].T)
    err_dict["Train"][out_name[i]] = err
    stats_dict["Train"][out_name[i]] = {
        'mean':np.mean(err),
        'std':np.std(err),
        'min':np.min(err),
        'max':np.max(err),
        'rms':np.sqrt(np.mean(err**2)),
    }
    stats += stats_format.format("Train", out_name[i], **stats_dict["Train"][out_name[i]])

for k in X_test:
    X_tester = np.hstack([
        X_test[k][i:len(X_test[k])-n_history+i] for i in range(n_history+1)
    ])
    y_tester = y_test[k][n_history:]
    y_est = best_model.predict(X_tester)
    
    stats_dict["Test " + k[-1]] = {}
    err_dict["Test " + k[-1]] = {}
    for i in range(2):
        err = (y_tester[:,i] - y_est[i].T)
        err_dict["Test " + k[-1]][out_name[i]] = err
        stats_dict["Test " + k[-1]][out_name[i]] = {
            'mean':np.mean(err),
            'std':np.std(err),
            'min':np.min(err),
            'max':np.max(err),
            'rms':np.sqrt(np.mean(err**2)),
        }
        stats += stats_format.format("Test " + k[-1], out_name[i], **stats_dict["Test " + k[-1]][out_name[i]])

In [None]:
testnames = stats_dict.keys()
forcenames = ["Fx", "Fy", "Mz"]
colors = ["#d95f02", "#1b9e77", "#7570b3"]
n = len(forcenames)
m = len(testnames)

plt.figure()

plots = []
for i, forcename in enumerate(forcenames):
    ymean = []
    yerr = []
    ymin = []
    ymax = []
    for testname in testnames:
        ymean.append(stats_dict[testname][forcename]["mean"])
        yerr.append(stats_dict[testname][forcename]["std"])
        ymin.append(stats_dict[testname][forcename]["min"])
        ymax.append(stats_dict[testname][forcename]["max"])

    plots.append(plt.errorbar(
        x=np.arange(m)*n + i*0.6,
        y=ymean,
        yerr=yerr,
        linestyle='None',
        marker='s',
        markersize=4,
        color=colors[i],
        capsize=5,
        label=forcename
    ))
    
    plt.plot(
        np.arange(m)*n + i*0.6,
        ymin,
        linestyle='None',
        marker="2",
        markersize=8,
        color=colors[i]
    )
    plt.plot(
        np.arange(m)*n + i*0.6,
        ymax,
        linestyle='None',
        marker="1",
        markersize=8,
        color=colors[i]
    )

plt.legend(plots, forcenames)
plt.xticks(np.arange(0, n*m, n) + 0.6, testnames)
plt.ylabel("Error in Forces/Moments")
plt.show()