# Adiabatic SSM model

## Quantifying prediction accuracy

This notebook quantifies the prediction of adiabatic SSM models by doing finite-horizon predictions across the robot's workspace and evaluating the corresponding prediction errors.

In [44]:
from os import listdir
from os.path import join, split, isdir
import pickle
import yaml
import numpy as np
from numpy.random import randint
from tqdm.auto import tqdm
import time
import yaml
np.set_printoptions(linewidth=100)

In [45]:
%load_ext autoreload
%autoreload 2
import utils as utils
import plot_utils as plot
from interpolators import InterpolatorFactory

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [46]:
%matplotlib qt
import matplotlib.pyplot as plt

In [47]:
# Adiabatic SSM settings
ROMOrder = 3
# N_samples = 100
DT = 0.01
INTERPOLATE_3D = True

Trunk settings

In [48]:
observables = "delay-embedding" # "delay-embedding" # "pos-vel" # 
N_DELAY = 4 # only relevant if observables is "delay-embedding"
TIP_NODE = 51
N_NODES = 709
INPUT_DIM = 8
DT = 0.01

rDOF = 3
oDOF = 3
SSMDim = 6

robot_dir = "../../../soft-robot-control/examples/trunk"
rest_file = join(robot_dir, 'rest_qv.pkl')

# load rest position
with open(rest_file, 'rb') as f:
    rest_data = pickle.load(f)
    rest_q = rest_data['q'][0]

In [49]:
data_dir = "/media/jonas/Backup Plus/jonas_soft_robot_data/trunk_adiabatic_10ms_N=100_sparsity=0.95" # 33_handcrafted" # 9" # 
model_names = [name for name in sorted(listdir(data_dir)) if isdir(join(data_dir, name))]
print(model_names)
N_models = len(model_names)

['000', '001', '002', '003', '004', '005', '006', '007', '008', '009', '010', '011', '012', '013', '014', '015', '016', '017', '018', '019', '020', '021', '022', '023', '024', '025', '026', '027', '028', '029', '030', '031', '032', '033', '034', '035', '036', '037', '038', '039', '040', '041', '042', '043', '044', '045', '046', '047', '048', '049', '050', '051', '052', '053', '054', '055', '056', '057', '058', '059', '060', '061', '062', '063', '064', '065', '066', '067', '068', '069', '070', '071', '072', '073', '074', '075', '076', '077', '078', '079', '080', '081', '082', '083', '084', '085', '086', '087', '088', '089', '090', '091', '092', '093', '094', '095', '096', '097', '098', '099']


Load local SSM models

In [50]:
models = []
test_results = []
for model_name in model_names:
    model_dir = join(data_dir, model_name, f"SSMmodel_{observables}_ROMOrder={ROMOrder}_globalV")
    with open(join(model_dir, "SSM_model.pkl"), "rb") as f:
        model = pickle.load(f)
        models.append(model)
    with open(join(model_dir, "test_results.yaml"), "rb") as f:
        test_results_dict = yaml.safe_load(f)
        test_results.append([test_results_dict['like training data']['RMSE'], test_results_dict['open-loop_circle']['RMSE']])
V = [model['model']['V'] for model in models]
r_coeff = [model['model']['r_coeff'] for model in models]
w_coeff = [model['model']['w_coeff'] for model in models]
v_coeff = [model['model']['v_coeff'] for model in models]
B_r = [model['model']['B'] for model in models]
q_bar = [(model['model']['q_eq'] - rest_q)[TIP_NODE*3:TIP_NODE*3+3] for model in models]
u_bar = [model['model']['u_eq'] for model in models]
ROMOrder = models[0]['params']['ROM_order']
SSMOrder = models[0]['params']['SSM_order']
for model in models:
    assert model['params']['ROM_order'] == ROMOrder
    assert model['params']['SSM_order'] == SSMOrder
test_results = np.array(test_results)

In [51]:
if INTERPOLATE_3D:
    xyz_rest = [q_bar[i][:3] for i in range(N_models)]
else:
    xyz_rest = [q_bar[i][:2] for i in range(N_models)]
# xyz_rest
print(u_bar)
print(np.sum([u == 0 for u in u_bar], axis=1))
print(np.mean(np.sum([u == 0 for u in u_bar], axis=1)))

[array([0., 0., 0., 0., 0., 0., 0., 0.]), array([150.,   0.,   0.,   0.,   0.,   0.,   0.,   0.]), array([  0.,   0.,   0.,   0.,   0.,   0., 150.,   0.]), array([  0.,   0.,   0., 300., 150.,   0.,   0.,   0.]), array([300., 150.,   0.,   0.,   0.,   0.,   0.,   0.]), array([  0., 150.,   0.,   0.,   0.,   0.,   0.,   0.]), array([  0.,   0., 150.,   0., 300.,   0.,   0., 150.]), array([  0., 300.,   0.,   0.,   0.,   0.,   0.,   0.]), array([  0., 150.,   0.,   0.,   0.,   0.,   0., 300.]), array([  0.,   0., 150.,   0.,   0.,   0.,   0.,   0.]), array([  0.,   0.,   0., 300.,   0.,   0.,   0.,   0.]), array([  0.,   0.,   0.,   0.,   0.,   0., 300.,   0.]), array([  0.,   0.,   0.,   0.,   0., 150.,   0.,   0.]), array([  0.,   0.,   0.,   0., 300.,   0., 300.,   0.]), array([  0.,   0.,   0.,   0., 300.,   0.,   0.,   0.]), array([  0., 150., 150.,   0.,   0.,   0.,   0.,   0.]), array([  0.,   0.,   0.,   0.,   0.,   0.,   0., 300.]), array([  0.,   0., 300.,   0.,   0.,   0.,   0

In [52]:
plt.close("all")
fig = plt.figure()

R_linear_part = [r[:, :6] for r in r_coeff]
eigenvalues_real_part = [np.real(np.linalg.eigvals(r)) for r in R_linear_part]
metric = [np.linalg.norm(eigenvalues, ord=-np.inf) for eigenvalues in eigenvalues_real_part]
# metric = [-np.max(eigenvalues) for eigenvalues in eigenvalues_real_part]
# metric = [np.linalg.norm(B, ord='nuc') for B in B_r]
# metric = test_results[:, 1]
# print(metric)
# print(eigenvalues_real_part[24])
# print(eigenvalues_real_part[55])

# metric = test_results[:, 0]
import matplotlib as mpl
cmap = mpl.colors.LinearSegmentedColormap.from_list('rg',["forestgreen", "gold", "firebrick"], N=256)

rest_q = np.array(xyz_rest)

if INTERPOLATE_3D:
    ax = plt.axes(projection="3d")
    sc = ax.scatter(rest_q[:, 0], rest_q[:, 1], rest_q[:, 2], marker="o", c=metric, vmax=None, cmap='viridis') # color="tab:blue")
    for i in range(N_models):
        ax.text(rest_q[i, 0], rest_q[i, 1], rest_q[i, 2], str(i), size=10, zorder=1, color='k')
    ax.set_zlabel(r"$z$ [mm]")
else:
    ax = plt.axes()
    sc = ax.scatter(rest_q[:, 0], rest_q[:, 1], marker="o", c=metric, vmax=None, cmap=cmap) # color="tab:blue")
ax.set_xlabel(r"$x$ [mm]")
ax.set_ylabel(r"$y$ [mm]")
ax.set_aspect("equal")
plt.colorbar(sc, ax=ax, label=r"$||Re(\lambda)||_{\infty}$")
fig.suptitle("Slowest decay mode of the ROM's linear part")
fig.show()

Filter out _"bad"_ models:
- opposite side actuation
- very fast decay modes


In [53]:
bad_models = np.isnan(test_results[:, 1])
# bad_models = np.isnan(test_results[:, 1]) | (test_results[:, 1] > 10)

print(f'Number of "good" models: {np.sum(~bad_models)}/{len(model_names)}')
print(f'Indices of "bad" models: {np.nonzero(bad_models)}')

Number of "good" models: 53/100
Indices of "bad" models: (array([ 3,  4,  7, 10, 17, 18, 20, 24, 28, 29, 30, 32, 34, 35, 37, 38, 39, 40, 41, 44, 46, 51, 52,
       54, 55, 56, 57, 58, 64, 72, 73, 74, 75, 78, 79, 82, 83, 84, 85, 86, 88, 90, 92, 94, 95, 97,
       99]),)


In [54]:
# use_models = [0, 81, 62, 73, 32, 65, 99, 56, 51, 27, 83, 31]
use_models = [int(model_name) for model_name in model_names if not bad_models[int(model_name)]]
print(f"use_models = {use_models}")

use_models = [0, 1, 2, 5, 6, 8, 9, 11, 12, 13, 14, 15, 16, 19, 21, 22, 23, 25, 26, 27, 31, 33, 36, 42, 43, 45, 47, 48, 49, 50, 53, 59, 60, 61, 62, 63, 65, 66, 67, 68, 69, 70, 71, 76, 77, 80, 81, 87, 89, 91, 93, 96, 98]


In [55]:
# only keep models
for i in range(N_models):
    if i not in use_models:
        V[i] = None
        r_coeff[i] = None
        w_coeff[i] = None
        v_coeff[i] = None
        B_r[i] = None
        q_bar[i] = None
        u_bar[i] = None
        xyz_rest[i] = None
V = [V[i] for i in range(len(V)) if V[i] is not None]
r_coeff = [r_coeff[i] for i in range(len(r_coeff)) if r_coeff[i] is not None]
w_coeff = [w_coeff[i] for i in range(len(w_coeff)) if w_coeff[i] is not None]
v_coeff = [v_coeff[i] for i in range(len(v_coeff)) if v_coeff[i] is not None]
B_r = [B_r[i] for i in range(len(B_r)) if B_r[i] is not None]
q_bar = [q_bar[i] for i in range(len(q_bar)) if q_bar[i] is not None]
u_bar = [u_bar[i] for i in range(len(u_bar)) if u_bar[i] is not None]
xyz_rest = [xyz_rest[i] for i in range(len(xyz_rest)) if xyz_rest[i] is not None]
N_models = len(V)

Compute observables (delay embedding)

In [56]:
if observables == "delay-embedding":
    N_DELAY = 4
    # observables are position of tip + n_delay delay embeddings of the tip position
    assemble_observables = lambda oData: utils.delayEmbedding(oData, up_to_delay=N_DELAY)
elif observables == "pos-vel":
    # observables is position and velocity of tip node
    def assemble_observables(oData):
        if oData.shape[0] > 6:
            tip_node_slice = np.s_[3*TIP_NODE:3*TIP_NODE+3]
        else:
            tip_node_slice = np.s_[:3]
        return np.vstack((oData[tip_node_slice, :], np.gradient(oData[tip_node_slice, :], DT, axis=1)))

Load test trajectory (for now: sum of all trajectories used to regress B matrices)

In [57]:
# test_trajectory_dir = "open-loop"
test_trajectories = []
# for name in tqdm(model_names): # ['origin']: # 
traj_dir = "/home/jonas/Projects/stanford/soft-robot-control/examples/trunk/dataCollection/open-loop_500" # join(model_dir, name, test_trajectory_dir)
(t, z), u = utils.import_pos_data(data_dir=traj_dir,
                                  rest_file=rest_file,
                                  output_node=TIP_NODE, return_inputs=True, traj_index=0)
y = assemble_observables(z)
print(y.shape)
test_trajectories.append({
        'name': split(traj_dir)[-1],
        't': t,
        'z': z,
        'u': u,
        'y': y
    })

(15, 20001)


Combine into one long trajectory

In [58]:
z_tot = np.hstack([traj['z'] for traj in test_trajectories])
y_tot = np.hstack([traj['y'] for traj in test_trajectories])
u_tot = np.hstack([traj['u'] for traj in test_trajectories])
t_tot = np.arange(z_tot.shape[1]) * DT

Interpolate local models to obtain adiabatic SSM model

In [75]:
interpolation_methods = ["idw", "origin_only", "linear"] # , "natural_neighbor"] # ["origin_only", "linear", "ct", "idw", "qp"] # , "ridge"] # [ "rmts", "idw", "qp"]
coeff_dict = {
            'w_coeff': w_coeff,
            'V': V,
            'r_coeff': r_coeff,
            'B_r': B_r,
            'u_bar': u_bar,
            'q_bar': q_bar
        }

print(len(xyz_rest))
# if v_coeff:
#     coeff_dict['v_coeff'] = v_coeff
interpolators = {}
for interpolation_method in interpolation_methods:
    interpolators[interpolation_method] = InterpolatorFactory(interpolation_method, xyz_rest, coeff_dict).get_interpolator()

53


Visualize interpolations

In [76]:
display_names = {
    "origin_only": "Origin model",
    "linear": "Barycentric linear",
    "ct": "Clough-Tocher",
    "idw": "Inverse distance weighting",
    "qp": "Quadratic regression",
    "natural_neighbor": "Natural neighbor",
}

In [77]:
if not INTERPOLATE_3D:
    nx, ny = (100, 100)
    x = np.linspace(-50, 50, nx)
    y = np.linspace(-50, 50, ny)
    xv, yv = np.meshgrid(x, y)
    # xy = np.vstack([xv.ravel(), yv.ravel()]).T
    coeff = "r_coeff"
    entry = np.s_[0, 0]
    plt.close("all")
    fig, axs = plt.subplots(1, len(interpolation_methods), figsize=(20, 4), subplot_kw={"projection": "3d"})
    for i, interpolation_method in enumerate(interpolation_methods):
        ax = axs[i]
        z = np.zeros((nx, ny))
        for i in range(nx):
            for j in range(ny):
                z[i, j] = interpolators[interpolation_method].transform([xv[i, j], yv[i, j]], coeff)[entry]
        ax.plot_surface(xv, yv, z, cmap="viridis", vmin=-3.6, vmax=-3.2)
        ax.set_xlabel(r"$x$ [mm]")
        ax.set_ylabel(r"$y$ [mm]")
        ax.set_zlabel(r"$R[0, 0]$")
        ax.set_zlim(-3.6, -3.2)
        # ax.view_init(90, -90)
        ax.set_title(display_names[interpolation_method])
    fig.savefig(join(traj_dir, f"interpolation_landscapes_2d.png"), bbox_inches='tight', dpi=200)
    fig.show()

Sample finite-horizon predictions to evaluate interpolated models

In [82]:
N_horizon = 5
N_samples = 500

In [83]:
print(z_tot.shape)
np.random.seed(seed=0)
sample_indices = randint(0, z_tot.shape[1], N_samples)

(3, 20001)


In [93]:
import warnings
warnings.filterwarnings('ignore')

for interpolation_method in tqdm(interpolation_methods, position=0):
    print(f"==================== {interpolation_method} ====================")
    q_samples = []
    rmse_samples = []
    advect_times = []
    for i in tqdm(range(N_samples), position=1, leave=True):
        start_idx = sample_indices[i]
        end_idx = start_idx + N_horizon
        q_samples.append(z_tot[:3, start_idx])
        # advect ASSM to obtain finite-horizon prediction
        t0 = time.time()
        t, _, y_pred, _, _, _, _ = utils.advect_adiabaticRD_with_inputs(t_tot[start_idx:end_idx], y_tot[:, start_idx], u_tot[:, start_idx:end_idx],
                                                                        y_target=y_tot[:, start_idx:end_idx], interpolator=interpolators[interpolation_method],
                                                                        ROMOrder=ROMOrder)
        t1 = time.time()
        # compute RMSE
        rmse = np.sum(np.sqrt(np.mean((y_tot[:3, start_idx:end_idx] - y_pred[:3, :])**2, axis=0))) / N_horizon
        rmse_samples.append(rmse)
        advect_times.append(t1 - t0)
    # max_rmse_index = np.argmax(rmse_samples)
    print("max RMSE:", np.nanmax(rmse_samples))
    # print("max RMSE sample idx:", sample_indices[max_rmse_index])
    with open(join(traj_dir, f"{interpolation_method}_rmse_samples.pkl"), "wb") as f:
        pickle.dump(rmse_samples, f)
    with open(join(traj_dir, f"{interpolation_method}_q_samples.pkl"), "wb") as f:
        pickle.dump(q_samples, f)
    with open(join(traj_dir, f"{interpolation_method}_advect_times.pkl"), "wb") as f:
        pickle.dump(advect_times, f)

  0%|          | 0/3 [00:00<?, ?it/s]



  0%|          | 0/500 [00:00<?, ?it/s]

max RMSE: 0.8517890791383443


  0%|          | 0/500 [00:00<?, ?it/s]

max RMSE: 2.6155866979927898


  0%|          | 0/500 [00:00<?, ?it/s]

ValueError: Input X contains NaN.
PolynomialFeatures does not accept missing values encoded as NaN natively. For supervised learning, you might want to consider sklearn.ensemble.HistGradientBoostingClassifier and Regressor which accept missing values encoded as NaNs natively. Alternatively, it is possible to preprocess the data, for instance by using an imputer transformer in a pipeline or drop samples with missing values. See https://scikit-learn.org/stable/modules/impute.html You can find a list of all estimators that handle NaN values at the following page: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values

Plot prediction accuracy maps for all the different interpolation methods

In [74]:
interpolation_methods = ["origin_only", "linear", "ct", "idw", "natural_neighbor"] # ["origin_only", "linear", "nn", "qp"] # , "tps", "ls", "qp"]
fig, axs = plt.subplots(3, len(interpolation_methods),
                        figsize=(4*len(interpolation_methods), 7),
                        height_ratios=[5, 2, 2],
                        sharey='row', sharex='row', constrained_layout=True)
for i, interpolation_method in enumerate(interpolation_methods):
    with open(join(traj_dir, f"{interpolation_method}_rmse_samples.pkl"), "rb") as f:
        rmse_samples = np.array(pickle.load(f))
    with open(join(traj_dir, f"{interpolation_method}_q_samples.pkl"), "rb") as f:
        q_samples = np.stack(pickle.load(f))
    with open(join(traj_dir, f"{interpolation_method}_advect_times.pkl"), "rb") as f:
        advect_times = np.array(pickle.load(f))
    colorbar = (i == len(interpolation_methods) - 1)
    plot.prediction_accuracy_map(q_samples[:, [0, 1]], rmse_samples, vmax=.6, ax=axs[0, i], colorbar=colorbar, cax=axs[:, :], show=False)
    plot.boxplot(rmse_samples, ax=axs[1, i], show=False, xlabel="RMSE [mm]")
    plot.boxplot(advect_times, ax=axs[2, i], show=False, xlabel="Advect time [s]")
    axs[0, i].set_title(display_names[interpolation_method])
    # if i > 0:
    #     axs[0, i].set_ylabel("")
# fig.tight_layout()
fig.savefig(join(traj_dir, f"prediction_accuracy.png"), bbox_inches='tight', dpi=200)
fig.show()

In [None]:
plt.close('all')