## MonoForce Inference with RobInGas Data

We are going to load the RobInGas data and perform inference with the pretrained MonoForce model.

In [None]:
# add the path to the source code of the MonoForce package
import sys
sys.path.append('../src')

### Load the Terrain Encoder model

In [None]:
import torch
from monoforce.models.terrain_encoder.lss import compile_model

def load_model(model_path, robot='tradr'):
    """
    Load the MonoForce model from a given path.
    :param model_path: str, path to the model
    :param robot: str, robot type
    :return: MonoForce model
    """
    lss_cfg_path = f'../config/lss_cfg_{robot}.yaml'
    lss_cfg = read_yaml(lss_cfg_path)
    
    model = compile_model(lss_cfg['grid_conf'], lss_cfg['data_aug_conf'], inpC=3, outC=1)
    
    # https://discuss.pytorch.org/t/how-to-load-part-of-pre-trained-model/1113/3
    model_dict = model.state_dict()
    pretrained_model = torch.load(model_path)
    print(f"Loading pretrained LSS model from {model_path}")
    model_dict.update(pretrained_model)
    model.load_state_dict(model_dict)
    model.eval()
    
    return model

In [None]:
from monoforce.utils import read_yaml

robot = 'tradr'
model_path = f'../config/weights/lss/lss_robingas_{robot}.pt'
model = load_model(model_path, robot=robot)
# print(model)

In [None]:
import os
import numpy as np
from monoforce.datasets import RobinGasVis, robingas_seq_paths
from monoforce.dphys_config import DPhysConfig

In [None]:
def compile_data(seq_i=None, robot='tradr', small=False, is_train=False):
    """
    Compile the RobInGas dataset for a given sequence index and robot type.
    :param seq_i: int, sequence index
    :param robot: str, robot type
    :param small: bool, if True, return a small subset of the dataset
    :param is_train: bool, if True, return the training set, otherwise the test set
    :return: RobinGas dataset
    """
    dphys_cfg = DPhysConfig()

    lss_cfg_path = f'../config/lss_cfg_{robot}.yaml'
    assert os.path.isfile(lss_cfg_path)
    lss_cfg = read_yaml(lss_cfg_path)

    if seq_i is not None:
        path = robingas_seq_paths[robot][seq_i]
    else:
        path = np.random.choice(robingas_seq_paths[robot])

    ds = RobinGasVis(path=path, dphys_cfg=dphys_cfg, lss_cfg=lss_cfg, is_train=is_train)
    if small:
        ds = ds[np.random.choice(len(ds), 4, replace=False)]

    return ds

In [None]:
ds = compile_data(seq_i=0, robot=robot, small=False, is_train=False)
print(f"Number of samples: {len(ds)}")

### Visualizing a sample from the dataset.

In [None]:
sample = ds[0]
(imgs, rots, trans, intrins, post_rots, post_trans,
 hm_geom, hm_terrain,
 control_ts, controls,
 traj_ts, Xs, Xds, Rs, Omegas,
 points) = sample
print(f"Images shape: {imgs.shape}")
print(f"Extrinsic Rotations shape: {rots.shape}")
print(f"Extrinsic Translations shape: {trans.shape}")
print(f"Intrinsic camera matrix shape: {intrins.shape}")
print(f"Images augmentation Post-rotation shape: {post_rots.shape}")
print(f"Images augmentation Post-translation shape: {post_trans.shape}")
print(f"Heightmap geometric shape: {hm_geom.shape}")
print(f"Heightmap terrain shape: {hm_terrain.shape}")
print(f"Control timestamps shape: {control_ts.shape}")
print(f"Controls shape: {controls.shape}")
print(f"Trajectory timestamps shape: {traj_ts.shape}")
print(f"Robot's positions shape: {Xs.shape}")
print(f"Robot's linear velocities shape: {Xds.shape}")
print(f"Robot's orientations shape: {Rs.shape}")
print(f"Robot's angular velocities shape: {Omegas.shape}")
print(f"Lidar points shape: {points.shape}")

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib as mpl
from monoforce.models.terrain_encoder.utils import ego_to_cam, get_only_in_img_mask, denormalize_img


def explore_data(sample: list, raw_img_size: tuple, model: torch.nn.Module):
    """
    Explore the RobInGas data sample.
    :param sample: list, sample from the dataset
    :param raw_img_size: tuple, raw image size
    :param model: torch.nn.Module, MonoForce model
    """
    H, W = raw_img_size
    (imgs, rots, trans, intrins, post_rots, post_trans,
     hm_geom, hm_terrain,
     control_ts, controls,
     traj_ts, Xs, Xds, Rs, Omegas,
     points) = sample
    height_geom, mask_geom = hm_geom[0], hm_geom[1]
    height_terrain, mask_terrain = hm_terrain[0], hm_terrain[1]
    n_cams = len(imgs)

    frustum_pts = model.get_geometry(rots[None], trans[None], intrins[None], post_rots[None], post_trans[None])
    frustum_pts = frustum_pts.squeeze(0)

    n_rows, n_cols = 2, int(np.ceil(n_cams / 2) + 3)
    plt.figure(figsize=(n_cols * 4, n_rows * 4))
    gs = mpl.gridspec.GridSpec(n_rows, n_cols)
    gs.update(wspace=0.0, hspace=0.0, left=0.0, right=1.0, top=1.0, bottom=0.0)

    final_ax = plt.subplot(gs[:, -1:])
    for imgi, img in enumerate(imgs):
        cam_pts = ego_to_cam(points, rots[imgi], trans[imgi], intrins[imgi])
        mask = get_only_in_img_mask(cam_pts, H, W)
        plot_pts = post_rots[imgi].matmul(cam_pts) + post_trans[imgi].unsqueeze(1)

        ax = plt.subplot(gs[imgi // int(np.ceil(n_cams / 2)), imgi % int(np.ceil(n_cams / 2))])
        plt.imshow(denormalize_img(img))
        plt.scatter(plot_pts[0, mask], plot_pts[1, mask], c=points[2, mask],
                    s=1, alpha=0.4, cmap='jet', vmin=-1., vmax=1.)
        plt.axis('off')
        
        plt.sca(final_ax)
        plt.plot(frustum_pts[imgi, :, :, :, 0].view(-1), frustum_pts[imgi, :, :, :, 1].view(-1), '.')
    
    # plot height maps
    ax = plt.subplot(gs[:, -3:-2])
    plt.imshow(height_geom.T, origin='lower', cmap='jet', vmin=-1., vmax=1.)
    plt.title('Heightmap Geometric')
    plt.axis('off')

    ax = plt.subplot(gs[:, -2:-1])
    plt.imshow(height_terrain.T, origin='lower', cmap='jet', vmin=-1., vmax=1.)
    plt.title('Heightmap Terrain')
    plt.axis('off')
    
    final_ax.set_aspect('equal')

    plt.show()

In [None]:
raw_img_shape = ds.lss_cfg['data_aug_conf']['H'], ds.lss_cfg['data_aug_conf']['W']
print(f"Raw image shape: {raw_img_shape}")

explore_data(sample, raw_img_size=raw_img_shape, model=model)

### Inference with the MonoForce model

In [None]:
from torch.utils.data import DataLoader
from timeit import default_timer as timer

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

data_loader = DataLoader(ds, batch_size=1, shuffle=False)
batch = next(iter(data_loader))
(imgs, rots, trans, intrins, post_rots, post_trans,
 hm_geom, hm_terrain,
 control_ts, controls,
 traj_ts, Xs, Xds, Rs, Omegas,
 points) = batch

start = timer()
with torch.inference_mode():
    inputs = [imgs, rots, trans, intrins, post_rots, post_trans]
    inputs = [i.to(device) for i in inputs]
    heightmap_pred = model(*inputs)
end = timer()
print(f"Heightmap prediction shape: {heightmap_pred.shape}")
print(f"Time taken for inference: {end - start} seconds")
print(f"Device: {list(model.parameters())[0].device}")

### Visualizing the predicted heightmap

In [None]:
heightmap_pred_np = heightmap_pred.squeeze(1).cpu().numpy()
for b in range(heightmap_pred_np.shape[0]):
    plt.figure(figsize=(5, 5))
    plt.imshow(heightmap_pred_np[b].T, origin='lower', cmap='jet', vmin=-1., vmax=1.)
    plt.title('Predicted Heightmap')
    plt.axis('off')
    plt.colorbar()
    plt.show()

## Predicting robot's trajectory with $\nabla$Physics

Defining the control inputs
- Left and right velocity commands: $u_1 = u_2 = 1$
- Simulation time: $T = 5$ s
- Simulation step: $\Delta t = 0.01$ s

In [None]:
# constant linear and angular velocities as control inputs
dphys_cfg = DPhysConfig()

T, dt = dphys_cfg.traj_sim_time, dphys_cfg.dt  # dt = T / N = 0.01 s, simulation step
N = int(T / dt)
controls = torch.tensor([[[1.0, 1.0]] * int(T / dt)]).to(device)  # vel_left, vel_right

In [None]:
from monoforce.models.dphysics import DPhysics

dphysics = DPhysics(dphys_cfg=dphys_cfg, device=device)

with torch.inference_mode():
    states, forces = dphysics(z_grid=heightmap_pred.squeeze(1), controls=controls)

In [None]:
Xs, Rs, Xds, Omegas, X_points = states
print(f"Robot's positions shape: {Xs.shape}")
print(f"Robot's orientation shape: {Rs.shape}")
print(f"Linear velocity shape: {Xds.shape}")
print(f"Angular velocity shape: {Omegas.shape}")
print(f"Robot's body points shape: {X_points.shape}")

### Visualizing the robot's trajectory and heightmap

In [None]:
xs_grid = Xs.cpu().numpy()
xs_grid = xs_grid.reshape(-1, 3)
xs_grid = xs_grid[::10]  # downsample for visualization
xs_grid = (xs_grid[:, :2] + dphys_cfg.d_max) / dphys_cfg.grid_res  # normalize to grid resolution

for b in range(Xs.shape[0]):
    plt.figure(figsize=(5, 5))
    plt.imshow(heightmap_pred_np.T, origin='lower', cmap='jet', vmin=-1., vmax=1.)
    plt.title('Predicted Heightmap')
    plt.axis('off')
    plt.colorbar()
    plt.scatter(xs_grid[0, 0], xs_grid[0, 1], c='red', s=100, label='Start')
    plt.scatter(xs_grid[:, 0], xs_grid[:, 1], c='blue', s=1, label='Trajectory')
    plt.scatter(xs_grid[-1, 0], xs_grid[-1, 1], c='green', s=100, label='End')
    plt.legend()
    plt.show()