In [3]:
import sys
import os
import time
import numpy as np
import pinocchio as pin

sys.path.append('./python/bsqp')
sys.path.append('./python')
sys.path.append('./examples')  # For force_estimator
from bsqp.mpc_controller import MPC_GATO
from bsqp.common import figure8
from force_estimator import ImprovedForceEstimator

np.set_printoptions(linewidth=99999999)
np.random.seed(42)

# MPC_GATO class is now imported from bsqp.mpc_controller
# It includes all the same functionality as before

In [None]:
# Load URDF model

urdf_path = "indy7_description/indy7.urdf"
model_dir = "indy7_description/"

model, visual_model, collision_model = pin.buildModelsFromUrdf(urdf_path, model_dir)

N = 32
dt = 0.01

f_ext = np.array([0.0, 0.0, -60.0, 0.0, 0.0, 0.0])

xstart = np.hstack((np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), np.zeros(6)))
# xstart = np.hstack((np.array([-1.096711, -0.09903229,  0.83125766, -0.10907673,  0.49704404,  0.01499449]), np.zeros(6)))

fig_8_traj = figure8(dt)

mpc = MPC_GATO(model, N=N, dt=dt, batch_size=1, constant_f_ext=f_ext)
q_trajectory, mpc_stats = mpc.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)
mpc_4 = MPC_GATO(model, N=N, dt=dt, batch_size=4, constant_f_ext=f_ext)
q_trajectory_4, mpc_stats_4 = mpc_4.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)
mpc_8 = MPC_GATO(model, N=N, dt=dt, batch_size=8, constant_f_ext=f_ext)
q_trajectory_8, mpc_stats_8 = mpc_8.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)
mpc_16 = MPC_GATO(model, N=N, dt=dt, batch_size=16, constant_f_ext=f_ext)
q_trajectory_16, mpc_stats_16 = mpc_16.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)
mpc_32 = MPC_GATO(model, N=N, dt=dt, batch_size=32, constant_f_ext=f_ext)
q_trajectory_32, mpc_stats_32 = mpc_32.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)
mpc_64 = MPC_GATO(model, N=N, dt=dt, batch_size=64, constant_f_ext=f_ext)
q_trajectory_64, mpc_stats_64 = mpc_64.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)
mpc_128 = MPC_GATO(model, N=N, dt=dt, batch_size=128, constant_f_ext=f_ext)
q_trajectory_128, mpc_stats_128 = mpc_128.run_mpc_fig8(xstart, fig_8_traj, sim_dt=0.001, sim_time=16.0)

T : f

External force at EE (world frame): [  0.   0. -60.]
avg err: 0.082
avg t_sqp: 0.114ms


External force at EE (world frame): [  0.   0. -60.]


KeyboardInterrupt: 

In [None]:
import meshcat.geometry as g
import meshcat.transformations as tf
from pinocchio.visualize import MeshcatVisualizer
import numpy as np

# load two robot models
model1, visual_model1, collision_model1 = pin.buildModelsFromUrdf(urdf_path, model_dir)
model2, visual_model2, collision_model2 = pin.buildModelsFromUrdf(urdf_path, model_dir)

# single visualizer
viz = MeshcatVisualizer(model1, collision_model1, visual_model1)
viz.initViewer(open=True)
viz.loadViewerModel(rootNodeName="robot1", color=[1.0, 0.0, 0.0, 0.4])
viz_2 = MeshcatVisualizer(model2, collision_model2, visual_model2)
viz_2.initViewer(viz.viewer)
viz_2.loadViewerModel(rootNodeName="robot2", color=[0.0, 1.0, 0.0, 0.7])

# Draw figure 8 trajectory points
for i in range(int(len(fig_8_traj)/30)):  # number of points in one cycle
    viz.viewer[f'point{i}'].set_object(
        g.Sphere(0.015), 
        g.MeshLambertMaterial(color=0xff0000))
    T = tf.translation_matrix(np.array(fig_8_traj[6*i:6*(i+1)][:3]))
    viz.viewer[f'point{i}'].set_transform(T)    

# Create arrow to visualize force
f_ext = np.array([0.0, 0.0, -60.0])
force_magnitude = np.linalg.norm(f_ext[:3])
force_direction = f_ext[:3] / force_magnitude
arrow_length = force_magnitude / 100.0  # Scale down the force for visualization

# Create arrow geometry
arrow = g.Cylinder(height=arrow_length, radius=0.005)
arrow_2 = g.Cylinder(height=arrow_length, radius=0.005)
viz.viewer['force_arrow'].set_object(arrow, g.MeshLambertMaterial(color=0xffff00))
viz_2.viewer['force_arrow_2'].set_object(arrow_2, g.MeshLambertMaterial(color=0xffff00))
    
q_trajectory1, q_trajectory2 = q_trajectory, q_trajectory_32
num_points = int(len(fig_8_traj)/30) 

t = 0
if not q_trajectory1 or not q_trajectory2:
    raise ValueError("Trajectories not loaded correctly")

ratio = len(q_trajectory1) / len(q_trajectory2)
data1 = model1.createData()
data2 = model2.createData()
while True:
    # Calculate indices safely
    idx1 = int(t * ratio) % len(q_trajectory1)
    idx2 = t % len(q_trajectory2)

    # Display robot configurations
    viz.display(q_trajectory1[idx1])
    viz_2.display(q_trajectory2[idx2])

    # --- Update force arrow position and orientation ---
    pin.forwardKinematics(model1, data1, q_trajectory1[idx1])
    pin.updateFramePlacements(model1, data1)
    ee_position = data1.oMi[6].translation
    
    pin.forwardKinematics(model2, data2, q_trajectory2[idx2])
    pin.updateFramePlacements(model2, data2)
    ee_position_2 = data2.oMi[6].translation

    # --- Calculate Rotation ---
    default_axis = np.array([0., 1., 0.])  # Meshcat cylinder's axis
    dot_product = np.dot(default_axis, force_direction)
    dot_product = np.clip(dot_product, -1.0, 1.0)

    rot_angle = np.arccos(dot_product)
    if np.abs(rot_angle) > 0.001:  # Only rotate if needed
        rot_axis = np.cross(default_axis, force_direction)
        if np.linalg.norm(rot_axis) > 0.001:  # Check for non-zero axis
            rot_axis /= np.linalg.norm(rot_axis)  # Normalize rotation axis
            R = tf.rotation_matrix(rot_angle, rot_axis)
        else:
            R = np.eye(4)  # No rotation needed
    else:
        R = np.eye(4)  # No rotation needed

    T = tf.translation_matrix(ee_position - (arrow_length/2) * force_direction)
    T_2 = tf.translation_matrix(ee_position_2 - (arrow_length/2) * force_direction)
    viz.viewer['force_arrow'].set_transform(T @ R)
    viz_2.viewer['force_arrow_2'].set_transform(T_2 @ R)
    
    t += 1
    time.sleep(0.001)  # display rate

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import matplotlib as mpl

# Set publication quality defaults
mpl.rcParams['font.family'] = 'serif'
mpl.rcParams['font.size'] = 10
mpl.rcParams['axes.labelsize'] = 12
mpl.rcParams['axes.titlesize'] = 14
mpl.rcParams['xtick.labelsize'] = 10
mpl.rcParams['ytick.labelsize'] = 10
mpl.rcParams['legend.fontsize'] = 12
mpl.rcParams['figure.dpi'] = 150
mpl.rcParams['savefig.dpi'] = 300

# Extract reference trajectory points
ref_traj = fig_8_traj.reshape(-1, 6)[:, :3]

# Create figure for 3D trajectory
fig = plt.figure(figsize=(14, 6))

# Single solver 3D plot
ax1 = fig.add_subplot(121, projection='3d')
if 'ee_actual' in mpc_stats and len(mpc_stats['ee_actual']) > 0:
    ee_actual = mpc_stats['ee_actual']
    ax1.plot(ee_actual[:, 0], ee_actual[:, 1], ee_actual[:, 2], 
             'b-', linewidth=1.5, label='Actual', alpha=0.8)
ax1.plot(ref_traj[:, 0], ref_traj[:, 1], ref_traj[:, 2], 
         'r--', linewidth=1.0, label='Reference', alpha=0.6)
ax1.set_xlabel('X [m]', labelpad=10)
ax1.set_ylabel('Y [m]', labelpad=10)
ax1.set_zlabel('Z [m]', labelpad=10)
ax1.set_title('Single Solver: End-Effector Trajectory')
ax1.legend(loc='best')
ax1.grid(True, alpha=0.3)
ax1.view_init(elev=20, azim=90)

# Batch solver 3D plot
ax2 = fig.add_subplot(122, projection='3d')
if 'ee_actual' in mpc_stats_32 and len(mpc_stats_32['ee_actual']) > 0:
    ee_actual_batch = mpc_stats_32['ee_actual']
    ax2.plot(ee_actual_batch[:, 0], ee_actual_batch[:, 1], ee_actual_batch[:, 2], 
             'g-', linewidth=1.5, label='Actual', alpha=0.8)
ax2.plot(ref_traj[:, 0], ref_traj[:, 1], ref_traj[:, 2], 
         'r--', linewidth=1.0, label='Reference', alpha=0.6)
ax2.set_xlabel('X [m]', labelpad=10)
ax2.set_ylabel('Y [m]', labelpad=10)
ax2.set_zlabel('Z [m]', labelpad=10)
ax2.set_title('Batch Solver with Force Estimation: End-Effector Trajectory')
ax2.legend(loc='best')
ax2.grid(True, alpha=0.3)
ax2.view_init(elev=20, azim=90)

plt.tight_layout()
plt.savefig('3d_trajectory_comparison.pdf', bbox_inches='tight')
plt.show()

In [None]:
# Tracking Error and Solver Performance Analysis
fig, axes = plt.subplots(1, 1, figsize=(10, 5))
# 1. Total Tracking Error Bar Plot
ax = axes
solvers = []
total_errors = []

solvers.append('1')
total_errors.append(np.sum(mpc_stats['goal_distances'])/len(mpc_stats['goal_distances']))

solvers.append('4')
total_errors.append(np.sum(mpc_stats_4['goal_distances'])/len(mpc_stats_4['goal_distances']))

solvers.append('8')
total_errors.append(np.sum(mpc_stats_8['goal_distances'])/len(mpc_stats_8['goal_distances']))

solvers.append('16')
total_errors.append(np.sum(mpc_stats_16['goal_distances'])/len(mpc_stats_16['goal_distances']))

solvers.append('32')
total_errors.append(np.sum(mpc_stats_32['goal_distances'])/len(mpc_stats_32['goal_distances']))

solvers.append('64')
total_errors.append(np.sum(mpc_stats_64['goal_distances'])/len(mpc_stats_64['goal_distances']))

solvers.append('128')
total_errors.append(np.sum(mpc_stats_128['goal_distances'])/len(mpc_stats_128['goal_distances']))

if solvers and total_errors:
    bars = ax.bar(solvers, total_errors)
    ax.set_xlabel('Batch Size')
    ax.set_ylabel('[m]')
    ax.set_title('Average Tracking Error, F_ext = (0, 0, -100) N')
    ax.grid(True, alpha=0.3, axis='y')
    
    # Add value labels on bars
    for bar, error in zip(bars, total_errors):
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width()/2., height,
                f'{error:.3f}', ha='center', va='bottom')

import pandas as pd
import matplotlib.pyplot as plt
control_data = {}

sum_torque = np.sum(np.abs(mpc_stats['joint_velocities']), axis=1)
control_data['Single Solver'] = np.mean(sum_torque)

sum_torque_4 = np.sum(np.abs(mpc_stats_4['joint_velocities']), axis=1)
control_data['4'] = np.mean(sum_torque_4)

sum_torque_8 = np.sum(np.abs(mpc_stats_8['joint_velocities']), axis=1)
control_data['8'] = np.mean(sum_torque_8)

sum_torque_16 = np.sum(np.abs(mpc_stats_16['joint_velocities']), axis=1)
control_data['16'] = np.mean(sum_torque_16)

sum_torque_32 = np.sum(np.abs(mpc_stats_32['joint_velocities']), axis=1)
control_data['32'] = np.mean(sum_torque_32)

sum_torque_64 = np.sum(np.abs(mpc_stats_64['joint_velocities']), axis=1)
control_data['64'] = np.mean(sum_torque_64)

sum_torque_128 = np.sum(np.abs(mpc_stats_128['joint_velocities']), axis=1)
control_data['128'] = np.mean(sum_torque_128)

# Create bar chart
fig, ax = plt.subplots(figsize=(10, 6))
solvers = list(control_data.keys())
values = list(control_data.values())
bars = ax.bar(solvers, values, color=['blue', 'orange', 'magenta', 'green'])
ax.set_ylabel('[Nm]')
ax.set_title('Average Sum of Joint Velocities')
ax.grid(True, alpha=0.3, axis='y')

# Add value labels on bars
for bar, value in zip(bars, values):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height,
            f'{value:.2f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(7, 3, figsize=(18, 18))

# Reference trajectory
ref_traj = fig_8_traj.reshape(-1, 6)[:, :3]

# Define all batch sizes and stats to plot
batch_configs = [
    ('Single Solver', mpc_stats, 'b-'),
    ('Batch 4', mpc_stats_4, 'orange'),
    ('Batch 8', mpc_stats_8, 'y-'),
    ('Batch 16', mpc_stats_16, 'm-'),
    ('Batch 32', mpc_stats_32, 'c-'),
    ('Batch 64', mpc_stats_64, 'g-'),
    ('Batch 128', mpc_stats_128, 'purple')
]

# Plot projections for each configuration
for row, (label, stats, color) in enumerate(batch_configs):
    if 'ee_actual' in stats and len(stats['ee_actual']) > 0:
        ee_actual = stats['ee_actual']
        
        # XY Projection
        ax = axes[row, 0]
        ax.plot(ref_traj[:, 0], ref_traj[:, 1], 'r--', linewidth=1.0, label='Reference', alpha=0.6)
        ax.plot(ee_actual[:, 0], ee_actual[:, 1], color, linewidth=1.5, label='Actual', alpha=0.8)
        ax.set_xlabel('X [m]')
        ax.set_ylabel('Y [m]')
        ax.set_title(f'{label}: XY Projection')
        ax.legend()
        ax.grid(True, alpha=0.3)
        ax.axis('equal')
        
        # XZ Projection
        ax = axes[row, 1]
        ax.plot(ref_traj[:, 0], ref_traj[:, 2], 'r--', linewidth=1.0, label='Reference', alpha=0.6)
        ax.plot(ee_actual[:, 0], ee_actual[:, 2], color, linewidth=1.5, label='Actual', alpha=0.8)
        ax.set_xlabel('X [m]')
        ax.set_ylabel('Z [m]')
        ax.set_title(f'{label}: XZ Projection')
        ax.legend()
        ax.grid(True, alpha=0.3)
        ax.axis('equal')
        
        # YZ Projection
        ax = axes[row, 2]
        ax.plot(ref_traj[:, 1], ref_traj[:, 2], 'r--', linewidth=1.0, label='Reference', alpha=0.6)
        ax.plot(ee_actual[:, 1], ee_actual[:, 2], color, linewidth=1.5, label='Actual', alpha=0.8)
        ax.set_xlabel('Y [m]')
        ax.set_ylabel('Z [m]')
        ax.set_title(f'{label}: YZ Projection')
        ax.legend()
        ax.grid(True, alpha=0.3)
        ax.axis('equal')

plt.tight_layout()
plt.savefig('trajectory_projections.pdf', bbox_inches='tight')
plt.show()

In [None]:
base_palette = { 
    '1':   '#003192',  # Barnard Blue
    '4':   '#747474',  # gray
    '8':  '#7030A0',   # purple
    '16':   '#F19759', # orange
    '32':  '#00693E',  # Dartmouth Green
    '64':  '#56B4E9',  # sky blue (distinct from dark blue)
    '128': '#C90016',  # Harvard Crimson
}

# Single vs Batch=32 vs Batch=128 XZ Projection Comparison (side by side)
fig, axes = plt.subplots(1, 3, figsize=(10, 4))

# Left plot: Single Solver
ax = axes[0]
ax.plot(ref_traj[:, 0], ref_traj[:, 2], ':', linewidth=1.0, label='Reference', alpha=0.5)
if 'ee_actual' in mpc_stats and len(mpc_stats['ee_actual']) > 0:
    ee_single = mpc_stats['ee_actual']
    ax.plot(ee_single[:, 0], ee_single[:, 2], color=base_palette['1'], linewidth=1.5, label='Batch Size = 1', alpha=0.8)
# ax.set_xlabel('X [m]')
ax.set_ylabel('Z [m]', fontsize=14)
ax.grid(True, alpha=0.3)
ax.axis('equal')
ax.set_xlim(-0.7, 0.0)
ax.set_ylim(0.5, 1.1)

# Middle plot: Batch=32
ax = axes[1]
ax.plot(ref_traj[:, 0], ref_traj[:, 2], ':', linewidth=1.0, alpha=0.5)
if 'ee_actual' in mpc_stats_32 and len(mpc_stats_32['ee_actual']) > 0:
    ee_batch32 = mpc_stats_32['ee_actual']
    ax.plot(ee_batch32[:, 0], ee_batch32[:, 2], color=base_palette['32'], linewidth=1.5, label='Batch Size = 32', alpha=0.8)
ax.set_xlabel('X [m]', fontsize=14)
# ax.set_ylabel('Z [m]')
ax.grid(True, alpha=0.3)
# Remove y-axis tick marks but keep gridlines
ax.tick_params(axis='y', which='both', left=False, labelleft=False)
ax.axis('equal')
ax.set_xlim(-0.7, 0.0)
ax.set_ylim(0.5, 1.1)

# Right plot: Batch=128
ax = axes[2]
ax.plot(ref_traj[:, 0], ref_traj[:, 2], ':', linewidth=1.0, alpha=0.5)
if 'ee_actual' in mpc_stats_128 and len(mpc_stats_128['ee_actual']) > 0:
    ee_batch128 = mpc_stats_128['ee_actual']
    ax.plot(ee_batch128[:, 0], ee_batch128[:, 2], color=base_palette['128'], linewidth=1.5, label='Batch Size = 128', alpha=0.8)
# ax.set_xlabel('X [m]')
# ax.set_ylabel('Z [m]')
ax.grid(True, alpha=0.3)
# Remove y-axis tick marks but keep gridlines
ax.tick_params(axis='y', which='both', left=False, labelleft=False)
ax.axis('equal')
ax.set_xlim(-0.7, 0.0)
ax.set_ylim(0.5, 1.1)

# Add single legend for all subplots
handles, labels = [], []
for ax in axes:
    h, l = ax.get_legend_handles_labels()
    handles.extend(h)
    labels.extend(l)

# Remove duplicates while preserving order
unique_labels = []
unique_handles = []
for handle, label in zip(handles, labels):
    if label not in unique_labels:
        unique_labels.append(label)
        unique_handles.append(handle)

fig.legend(unique_handles, unique_labels, loc='upper center', bbox_to_anchor=(0.5, 1.08), ncol=4)

plt.tight_layout()
plt.subplots_adjust(top=0.97)
plt.savefig('fig8-disturbance.png', bbox_inches='tight')
plt.show()

In [None]:
# Joint-Level Analysis
fig, axes = plt.subplots(6, 2, figsize=(12, 15))

joints_to_plot = [0, 1, 2, 3, 4, 5] 

for idx, joint_idx in enumerate(joints_to_plot):
    # Joint positions
    ax = axes[idx, 0]
    if 'joint_positions' in mpc_stats and len(mpc_stats['joint_positions']) > 0:
        ax.plot(mpc_stats['timestamps'], mpc_stats['joint_positions'][:, joint_idx], 
                'b-', linewidth=1.5, label='Single', alpha=0.8)
    if 'joint_positions' in mpc_stats_4 and len(mpc_stats_4['joint_positions']) > 0:
        ax.plot(mpc_stats_4['timestamps'], mpc_stats_4['joint_positions'][:, joint_idx], 
                'orange', linewidth=1.5, label='4', alpha=0.8) 
#     if 'joint_positions' in mpc_stats_8 and len(mpc_stats_8['joint_positions']) > 0:
#         ax.plot(mpc_stats_8['timestamps'], mpc_stats_8['joint_positions'][:, joint_idx], 
#                 'y-', linewidth=1.5, label='8', alpha=0.8) 
    if 'joint_positions' in mpc_stats_16 and len(mpc_stats_16['joint_positions']) > 0:
        ax.plot(mpc_stats_16['timestamps'], mpc_stats_16['joint_positions'][:, joint_idx], 
                'm-', linewidth=1.5, label='16', alpha=0.8) 
#     if 'joint_positions' in mpc_stats_32 and len(mpc_stats_32['joint_positions']) > 0:
#         ax.plot(mpc_stats_32['timestamps'], mpc_stats_32['joint_positions'][:, joint_idx], 
#                 'm-', linewidth=1.5, label='32', alpha=0.8) 
    if 'joint_positions' in mpc_stats_64 and len(mpc_stats_64['joint_positions']) > 0:
        ax.plot(mpc_stats_64['timestamps'], mpc_stats_64['joint_positions'][:, joint_idx], 
                'g-', linewidth=1.5, label='64', alpha=0.8)
    ax.set_xlabel('Time [s]')
    ax.set_ylabel(f'Joint {joint_idx+1} Position [rad]')
    ax.set_title(f'Joint {joint_idx+1} Position')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # Joint velocities
    ax = axes[idx, 1]
    if 'joint_velocities' in mpc_stats and len(mpc_stats['joint_velocities']) > 0:
        ax.plot(mpc_stats['timestamps'], mpc_stats['joint_velocities'][:, joint_idx], 
                'b-', linewidth=1.5, label='Single', alpha=0.3)
    if 'joint_velocities' in mpc_stats_4 and len(mpc_stats_4['joint_velocities']) > 0:
        ax.plot(mpc_stats_4['timestamps'], mpc_stats_4['joint_velocities'][:, joint_idx], 
                'orange', linewidth=1.5, label='4', alpha=0.3) 
#     if 'joint_velocities' in mpc_stats_8 and len(mpc_stats_8['joint_velocities']) > 0:
#         ax.plot(mpc_stats_8['timestamps'], mpc_stats_8['joint_velocities'][:, joint_idx], 
#                 'y-', linewidth=1.5, label='8', alpha=0.8) 
    if 'joint_velocities' in mpc_stats_16 and len(mpc_stats_16['joint_velocities']) > 0:
        ax.plot(mpc_stats_16['timestamps'], mpc_stats_16['joint_velocities'][:, joint_idx], 
                'm-', linewidth=1.5, label='16', alpha=0.3) 
#     if 'joint_velocities' in mpc_stats_32 and len(mpc_stats_32['joint_velocities']) > 0:
#         ax.plot(mpc_stats_32['timestamps'], mpc_stats_32['joint_velocities'][:, joint_idx], 
#                 'm-', linewidth=1.5, label='32', alpha=0.8) 
    if 'joint_velocities' in mpc_stats_64 and len(mpc_stats_64['joint_velocities']) > 0:
        ax.plot(mpc_stats_64['timestamps'], mpc_stats_64['joint_velocities'][:, joint_idx], 
                'g-', linewidth=1.5, label='64', alpha=0.3)
    ax.set_xlabel('Time [s]')
    ax.set_ylabel(f'Joint {joint_idx+1} Velocity [rad/s]')
    ax.set_title(f'Joint {joint_idx+1} Velocity')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('joint_analysis.pdf', bbox_inches='tight')
plt.show()