# Bimanual Planning Example Notebook

This notebook demonstrates the complete workflow for planning constrained bimanual motions using the minimal coordinates strategy, *when using a C++ implementation of the parameterization, costs, and constraints*. Recall from README.md that you must have built Drake from source to use this functionality, and then compiled the [C++ project attached to this repository](../cpp_parameterization).

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# Fix relative paths so contents of the src directory can be imported.
import sys
sys.path.append("..")
sys.path.append("../cpp_parameterization/python")

In [None]:
import numpy as np
import os
import time
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import networkx as nx

In [None]:
from pydrake.all import (
    StartMeshcat,
    CollisionCheckerParams,
    RobotDiagramBuilder,
    MeshcatVisualizerParams,
    Role,
    MeshcatVisualizer,
    Parser,
    LoadModelDirectives,
    ProcessModelDirectives,
    SceneGraphCollisionChecker,
    AutoDiffXd,
    RigidTransform_,
    IrisNp2Options,
    IrisZoOptions,
    SnoptSolver,
    IpoptSolver,
    IrisParameterizationFunction,
    MathematicalProgram,
    HPolyhedron,
    IrisNp2,
    IrisZo,
    Hyperellipsoid,
    RandomGenerator,
    ComputePairwiseIntersections,
    GcsTrajectoryOptimization,
    Point,
    GraphOfConvexSetsOptions,
    FunctionHandleTrajectory,
    InitializeAutoDiff,
    ExtractGradient,
    Toppra,
    PathParameterizedTrajectory,
    PiecewisePolynomial,
    CompositeTrajectory,
    CalcGridPointsOptions,
    BsplineBasis,
    BsplineTrajectory,
    KinematicTrajectoryOptimization,
    MinimumDistanceLowerBoundConstraint,
    PyFunctionConstraint,
    SolverOptions,
    CommonSolverOption,
    Solve,
    sqrt,
    DiagramBuilder,
    TrajectorySource,
    InverseDynamicsController,
    Demultiplexer,
)

In [None]:
from iiwa_ik import (
    MakeParameterization,
    IiwaBimanualReachableConstraint,
    IiwaBimanualJointLimitConstraint,
    IiwaBimanualCollisionFreeConstraint,
    FullFeasibilityConstraint,
    IiwaBimanualPathCost,
)

In [None]:
from src.iiwa_analytic_ik import iiwa_limits_lower, iiwa_limits_upper
import src.common as common
import src.rrt as rrt
import src.shortcut as shortcut

# Parameters

- `directives_file` is a scene description, giving the location of all the models in the world.
- `grasp_distance` is the distance between the end-effectors of the two robot arms.
- `GC2`, `GC4`, and `GC6` are the "global configuration parameters", describing which branch of the IK function to use.
- `q_tilde_bottom`, `q_tilde_middle`, and `q_tilde_top` are three key configurations that we plan between, represented in the parameterized coordinates.
- `seeds` is a list of configurations to use as seed points for the region generation. They were chosen by hand, using a virtual teleoperation notebook, similar to [this one](https://deepnote.com/workspace/Manipulation-ac8201a1-470a-4c77-afd0-2cc45bc229ff/project/0762b167-402a-4362-9702-7d559f0e73bb/notebook/iris_builder-3c25c10bc29d4c9493e48eaced475d03). One can also generate regions to cover a graph in configuration space (shown later in the code for RRT) or use the [clique covers approach](https://ieeexplore.ieee.org/abstract/document/10610005/).

In [None]:
directives_file = os.path.join(common.RepoDir(), "models/old_shelves.dmd.yaml")
grasp_distance = 0.6
shoulder_up = True
elbow_up = True
wrist_up = False
q_tilde_bottom = np.array([-0.6430910102907225, 1.9156121024586796, -1.7968254667817805, 1.2945447141185198, -0.023834531305537934, -0.876966810663043, -1.7041643160834519, 1.45])
q_tilde_middle = np.array([-0.5997312520566763, 1.489780849654964, -1.4739679827359913, 1.2905366081785483, -0.04421061906813227, -0.8793712572715165, -1.1603461715511334, 1.45])
q_tilde_top = np.array([-0.1994994216078726, 0.9140739951190965, -2.236618320862171, 0.5238879195899456, 0.7998441913611017, -1.3575398006936048, -1.0153092816310436, 2.41])

In [None]:
seeds = [
    q_tilde_bottom,
    np.array([-0.7341522021700233, 1.9192492722970935, -1.849050540687353, 1.4690188979347225, -0.022913995470214974, -0.7839567180379224, -1.735834076048031, 1.45]),
    np.array([-0.816394667473979, 1.9228828117510568, -1.9042766014076622, 1.6254903325102958, -0.020884458583263387, -0.6994788210824544, -1.773950224396859, 1.45]),
    np.array([-0.9076736984240236, 1.7999568628541147, -1.8278258357789336, 1.8976493299850326, -0.032028511314404574, -0.5492230012012871, -1.624933169711267, 1.45]),
    np.array([-0.90780384835653, 1.5443282072400564, -1.480882097408486, 1.9741581801564516, -0.07059018895327443, -0.5065618808846135, -1.1610690777465094, 1.45]),
    np.array([-0.877792385473089, 1.283945692440691, -1.1673903163525974, 1.7986279782674526, -0.08798686286997325, -0.605914842625335, -0.7496023024205761, 1.45]),
    np.array([-0.7363360141869535, 1.0835790623705088, -1.102219288049605, 1.3727471630916555, -0.07210415656873102, -0.8362237374759414, -0.6008766712030682, 1.45]),
    np.array([-0.7093225760311644, 0.8650840325295542, -1.4794100092984808, 1.2099934253928784, 0.44173726212402287, -0.9673197772349095, -0.9450827150678346, 2.0]),
    np.array([-0.5237049267440886, 0.7086764066165658, -1.9872212610757156, 1.045742737284787, 0.8594286107005795, -1.171705603794283, -1.1435157398017397, 2.41]),
    np.array([-0.37540312953312194, 0.7958305227244739, -2.112215906760149, 0.8433434932970723, 0.8316630398644385, -1.2430896040746857, -1.1077155278001196, 2.41]),
    q_tilde_top,
    np.array([-0.7686406052800139, 1.504938625148829, -1.4584578152597332, 1.655937158932382, -0.055175677810583384, -0.6834840454669682, -1.1418310479792013, 1.45]),
    q_tilde_middle,
]

# Set Up Environment

This is general, boilerplate code that most Drake projects include. Note the usage of [`RobotDiagramBuilder`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_robot_diagram_builder.html) to construct a [`RobotDiagram`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_robot_diagram.html) and [`CollisionChecker`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_collision_checker.html), as opposed to the less specific [`DiagramBuilder`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1systems_1_1_diagram_builder.html). We specifically use a [`SceneGraphCollisionChecker`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_scene_graph_collision_checker.html).

The meshcat visualization can be viewed in your browser, by opening the link that appears after running the following cell.

In [None]:
# Only run this cell once.
meshcat = StartMeshcat()

In [None]:
params = CollisionCheckerParams()
builder = RobotDiagramBuilder(time_step=0.0)

meshcat_visual_params = MeshcatVisualizerParams()
meshcat_visual_params.delete_on_initialization_event = False
meshcat_visual_params.role = Role.kIllustration
meshcat_visual_params.prefix = "visual"
meshcat_visual = MeshcatVisualizer.AddToBuilder(
    builder.builder(), builder.scene_graph(), meshcat, meshcat_visual_params)

meshcat_collision_params = MeshcatVisualizerParams()
meshcat_collision_params.delete_on_initialization_event = False
meshcat_collision_params.role = Role.kProximity
meshcat_collision_params.prefix = "collision"
meshcat_collision_params.visible_by_default = False
meshcat_collision = MeshcatVisualizer.AddToBuilder(
    builder.builder(), builder.scene_graph(), meshcat, meshcat_collision_params)

plant = builder.plant()
parser = Parser(plant)
package_xml_path = os.path.join(common.RepoDir(), "package.xml")
parser.package_map().AddPackageXml(package_xml_path)
directives = LoadModelDirectives(directives_file)
ProcessModelDirectives(directives, parser)

params.robot_model_instances = [
    plant.GetModelInstanceByName("iiwa_left"),
    plant.GetModelInstanceByName("iiwa_right")
]

plant.Finalize()

# We export these inputs and outputs so we can wrap the RobotDiagram in a larger
# Diagram, which will include a controller, to simulate and visualize.
builder.builder().ExportInput(plant.get_actuation_input_port(), "actuation")
builder.builder().ExportOutput(plant.get_state_output_port(), "state")

diagram = builder.Build()

params.model = diagram
params.edge_step_size = 0.01
checker = SceneGraphCollisionChecker(params)

context = diagram.CreateDefaultContext()
diagram.ForcedPublish(context)

# Build Regions

## Set Up the Parameterization

Check out `cpp_parameterization/cpp/iiwa_ik/iiwa_analytic_ik.h` for more details on the implementation of the analytic IK function itself. We make it available from C++ through an [`IrisParameterizationFunction`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_iris_parameterization_function.html) object, which wraps the callable and maintains some additional necessary information (input dimension and thread safety). Because we are almost entirely dealing with Python references to C++ objects, we do not lose thread safety.

The parameterization itself takes as input the configuration of the controlled arm and the self motion parameter of the subordinate arm, and outputs the configuration of the controlled arm concatenated with the configuration of the follower arm.

[`IrisZo`](https://drake.mit.edu/doxygen_cxx/group__planning__iris.html#ga9b44245010bfdc8163645f0c62f9e9ab) and [`IrisNp2`](https://drake.mit.edu/doxygen_cxx/group__planning__iris.html#gaf5bc571d0ee3753c976d3b521de397c4) take in different options classes ([`IrisZoOptions`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_iris_zo_options.html) and [`IrisNp2Options`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_iris_np2_options.html), respectively), although most of the options are shared in the `sampled_iris_options` field, which has type [`CommonSampledIrisOptions`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1_common_sampled_iris_options.html).

Here, we use the notion of being "in collision" to refer to configurations which are actually in collision, as well as those which are kinematically invalid. In a sense, we treat non-reachable configurations and configurations where the subordinate arm's joint limits are violated as "virtual" obstacles.

The counterexample search program that IrisNp2 (and IrisNp before it) solve can have numerical difficulties or challenging optimization landscapes, causing the solve to fail, even when given a feasible initial guess. The `add_hyperplane_if_solve_fails` option will add a trivial hyperplane at a sample in-collision which leads to a solve failure, to ensure the algorithm doesn't get stuck. We also cap the number of solver iterations for these programs, as most solve quickly, and those that do not solve quickly tend to ultimately fail.

In [None]:
iris_zo_options = IrisZoOptions()
iris_np2_options = IrisNp2Options()
for iris_options in [iris_zo_options, iris_np2_options]:
    iris_options.parameterization = MakeParameterization(shoulder_up,
                                                         elbow_up,
                                                         wrist_up,
                                                         grasp_distance)

    iris_options.sampled_iris_options.random_seed = 2
    iris_options.sampled_iris_options.verbose = True
    iris_options.sampled_iris_options.max_iterations = 1  # Only run a single outer iteration.
    iris_options.sampled_iris_options.relax_margin = True  # Needed if seed points are very close to obstacles.

    # The maximum allowable fraction in collision.
    iris_options.sampled_iris_options.epsilon = 0.01
    
    # The maximum probability the region does not satisfy the above threshold.
    iris_options.sampled_iris_options.delta = 0.01 

    # If you are asking for a very high fraction of the region to be valid, or a very high probability threshold,
    # this option can speed up the runtime by a lot. We leave it off by default.
    iris_options.sampled_iris_options.sample_particles_in_parallel = False

iris_np2_options.add_hyperplane_if_solve_fails = True
iris_np2_options.solver_options.SetOption(SnoptSolver().solver_id(), "Major iterations limit", 50)

## Visualize the Key Configurations

In [None]:
context = diagram.CreateDefaultContext()
plant_context = plant.GetMyContextFromRoot(context)
plant.SetPositions(plant_context,
                   iris_options.parameterization.get_parameterization_double()(q_tilde_bottom))
diagram.ForcedPublish(context)

In [None]:
context = diagram.CreateDefaultContext()
plant_context = plant.GetMyContextFromRoot(context)
plant.SetPositions(plant_context,
                   iris_options.parameterization.get_parameterization_double()(q_tilde_middle))
diagram.ForcedPublish(context)

In [None]:
context = diagram.CreateDefaultContext()
plant_context = plant.GetMyContextFromRoot(context)
plant.SetPositions(plant_context,
                   iris_options.parameterization.get_parameterization_double()(q_tilde_top))
diagram.ForcedPublish(context)

In [None]:
idx = 4

context = diagram.CreateDefaultContext()
plant_context = plant.GetMyContextFromRoot(context)
plant.SetPositions(plant_context,
                   iris_options.parameterization.get_parameterization_double()(seeds[idx]))
diagram.ForcedPublish(context)

## Set Up the Additional Constraints

Additional constraints that a region should satisfy (beyond just being collision-free) are specified by defining a [`MathematicalProgram`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1solvers_1_1_mathematical_program.html), whose only variables correspond to a configuration in the parameterized space, and then adding the constraints to that program.

The constraints used are subclasses of the Drake `Constraint` class, implemented in `cpp_parameterization/cpp/iiwa_ik/constraints.h`. This ensures that they are threadsafe, allowing `IrisZo` to fully take advantage of parallelism. Once the constraints have been created, they can be directly added to the `MathematicalProgram`.

In [None]:
iris_prog = MathematicalProgram()
# Our parameterization is 8 dimensional, and the number of variables must match.
q_tilde_vars = iris_prog.NewContinuousVariables(8, "q_tilde")
for iris_options in [iris_zo_options, iris_np2_options]:
    iris_options.sampled_iris_options.prog_with_additional_constraints = iris_prog

reachability_constraint = IiwaBimanualReachableConstraint(
    shoulder_up,
    elbow_up,
    wrist_up,
    grasp_distance
)
iris_prog.AddConstraint(reachability_constraint, q_tilde_vars)
    
subordinate_arm_joint_limit_constraint = IiwaBimanualJointLimitConstraint(
    iiwa_limits_lower,
    iiwa_limits_upper,
    shoulder_up,
    elbow_up,
    wrist_up,
    grasp_distance
)
iris_prog.AddConstraint(subordinate_arm_joint_limit_constraint, q_tilde_vars)

## Grow the Regions

Regions are grown in the parameterized space. Explicitly, they're grown within a domain polytope (which must be bounded). In the standard case without a parameterization, we frequently just use joint limits. For the parameterization, this may be more challenging. But in general, a bounded superset of the (probably difficult to describe) feasible set in the parameterized space is sufficient.

Explicitly, we use the domain limits of the controlled arm, and take the Cartesian product with the interval $[0,2\pi]$ for the self-motion parameter. The self-motion parameter is actually circle-valued (wraps around at 2pi), but we ignore that for now. You can check out some of our [other](https://www.roboticsproceedings.org/rss19/p057.html) [work](https://journals.sagepub.com/doi/full/10.1177/02783649241302419) for some details on how you might handle circle-valued configuration dimensions.

In [None]:
domain_lower = np.hstack((iiwa_limits_lower, [0.0]))
domain_upper = np.hstack((iiwa_limits_upper, [2.0 * np.pi]))
domain = HPolyhedron.MakeBox(domain_lower, domain_upper)

In [None]:
def grow_region(seed):
    region = IrisNp2(checker,
                     Hyperellipsoid.MakeHypersphere(1e-2,
                                                    seed),
                     domain,
                     iris_np2_options)
#     region = IrisZo(checker,
#                     Hyperellipsoid.MakeHypersphere(1e-2,
#                                                    seed),
#                     domain,
#                     iris_zo_options)
    return region

Although we do not do so here, you can restrict the region generation to use a single thread with the `iris_np2_options.sampled_iris_options.parallelism` field, and then grow multiple regions concurrently (with one thread per region). I've personally used the [Python multiprocessing library](https://docs.python.org/3/library/multiprocessing.html) for this in the past. Note that this doesn't make a huge difference with IrisZo, since it is relying on massive parallelization throughout the entire algorithm.

In [None]:
regions = []
for i in tqdm(range(len(seeds))):
    region = grow_region(seeds[i])
    regions.append(region.ReduceInequalities())
print("Average number of faces:", np.mean([len(region.b()) for region in regions]))

## Introspection

Explicitly computing the overlap graph is not strictly necessary, as GCS will automatically compute the overlaps behind the scenes. But it is nice for checking graph connectivity as you're picking your region seed points.

Similarly, the random walk around a region is just for visualization purposes.

In [None]:
intersections, _ = ComputePairwiseIntersections(regions, [])
mat = np.zeros((len(regions), len(regions)))
if intersections:
    rows, cols = zip(*intersections)
    mat[rows, cols] = 1

plt.imshow(mat)
plt.show()

print("Number of connected components", nx.number_connected_components(nx.from_numpy_array(mat)))

In [None]:
# Optionally visualize a random walk around a region.

# context = diagram.CreateDefaultContext()
# plant_context = plant.GetMyContextFromRoot(context)

# region = regions[0]
# q_tilde = seeds[0].copy()
# rng = RandomGenerator()
# np.random.seed(0)
# for _ in range(20):
#     q_tilde_next = region.UniformSample(rng)
#     for t in np.linspace(0, 1, 50):
#         q = parameterization(t * q_tilde_next  + (1 - t) * q_tilde)
#         plant.SetPositions(plant_context, q)
#         diagram.ForcedPublish(context)
#         time.sleep(0.02)
#     q_tilde = q_tilde_next.copy()

# Planning!

Now, we show how to do each of the three planning approaches. Note that all planning occurs in the parameterized space, and the output should be a trajectory in the parameterized space.

In [None]:
start = q_tilde_bottom.copy()
goal = q_tilde_top.copy()

## Planning with GcsTrajectoryOptimization

The [`GcsTrajectoryOptimization`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1trajectory__optimization_1_1_gcs_trajectory_optimization.html) class implements the approach from the paper [Motion Planning around Obstacles with Convex Optimization](https://www.science.org/doi/10.1126/scirobotics.adf7843). Here, we minimize the path length in the parameterized space, although it is possible to handle the ideal (nonconvex) cost of path length in the full configuration space, using either [NGCSTrajOpt](https://ieeexplore.ieee.org/abstract/document/10802426/) or [PGD-GCS](https://ieeexplore.ieee.org/abstract/document/10976384/).

In [None]:
order = 2
continuity = 1

gcs = GcsTrajectoryOptimization(8)
if continuity >= 1:
    for continuity_order in range(1, continuity + 1):
        gcs.AddPathContinuityConstraints(continuity_order)

# Load the convex sets into GcsTrajectoryOptimization.
main_graph = gcs.AddRegions(regions, order, h_min=0.1, h_max=100, name="")

# Specify the start and goal, and link them to the main graph.
start_graph = gcs.AddRegions([Point(start)], 0)
goal_graph = gcs.AddRegions([Point(goal)], 0)
gcs.AddEdges(start_graph, main_graph)
gcs.AddEdges(main_graph, goal_graph)

# gcs.AddPathEnergyCost()
gcs.AddPathLengthCost()
gcs.AddTimeCost()
gcs.AddVelocityBounds(-np.ones(8), np.ones(8))

options = GraphOfConvexSetsOptions()
options.max_rounding_trials = 100
options.max_rounded_paths = 100
options.convex_relaxation = True

gcs_traj, result = gcs.SolvePath(start_graph, goal_graph, options)
print(result.is_success())

## Planning with Bidirectional RRT

I've included in this repository a simple Python implementation of RRT and BiRRT. They rely on two oracles: a random configuration generator, and a validity checker. (The metric is assumed to be the L2 norm.) In the parameterized space, it is easy to generate a random configuration given the domain that we used for IrisNp2 and IrisZo. And we can check validity by simply checking for reachability, subordinate arm joint limit violations, and collisions. We follow this up by running randomized shortcutting (also a simple Python implementation) to improve the path a bit.

Finally, we construct a formal Drake [`Trajectory`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1trajectories_1_1_trajectory.html) object. We construct a twice-differentiable path for each segment with zero initial and final velocity using [`PiecewisePolynomial::CubicWithContinuousSecondDerivatives`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1trajectories_1_1_piecewise_polynomial.html#aba4275b536c162df6d8e2c06b4036f3a), before concatenating them as a [`CompositeTrajectory`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1trajectories_1_1_composite_trajectory.html).

In [None]:
def RandomConfig():
    return np.random.uniform(low=domain_lower, high=domain_upper)

def ValidityChecker(q_tilde):
    if np.any(reachability_constraint.Eval(q_tilde) > np.ones(4)):
        return False
    if np.any(reachability_constraint.Eval(q_tilde) < -np.ones(4)):
        return False
    
    q_full = iris_np2_options.parameterization.get_parameterization_double()(q_tilde)
    q_sub = q_full[7:]
    if np.any(q_sub < iiwa_limits_lower):
        return False
    if np.any(q_sub > iiwa_limits_upper):
        return False
    
    if np.any(np.abs(q_sub[[1, 3, 5]]) < 1e-2):
        return False
    
    return checker.CheckConfigCollisionFree(q_full)

In [None]:
rrt_options = rrt.RRTOptions(
    step_size = 2e-1,
    check_size = 1e-2,
    max_vertices = 1e4,
    max_iters = 1e6,
    goal_sample_frequency = 0.01,
    always_swap = False
)
rrt_planner = rrt.BiRRT(RandomConfig, ValidityChecker)

np.random.seed(0)
path = rrt_planner.plan(start, goal, rrt_options)

In [None]:
# We also run randomized shortcutting to improve the paths at least a little bit.
np.random.seed(0)
shortcut_path = shortcut.shortcut(path.copy(), ValidityChecker, num_tries=1e2, check_size=rrt_options.check_size)

In [None]:
rrt_traj_segments = [
    PiecewisePolynomial.CubicWithContinuousSecondDerivatives(
        np.array([float(i) - 1.0, float(i)]),
        np.array([shortcut_path[i-1], shortcut_path[i]]).T,
        np.zeros(8),
        np.zeros(8)
    )
    for i in range(1, len(shortcut_path))
]
rrt_traj = CompositeTrajectory(rrt_traj_segments)

## (Optional) Grow Regions along RRT Path

The following code demonstrates how to use the `sampled_iris_options.containment_points` field to specify points which must be contained by the region (in addition to the seed point). This can be used to grow regions to cover a graph, or in this case, the piecewise-linear path that was produced by the BiRRT + shortcutting. [Recent](https://arxiv.org/abs/2504.18978) [approaches](https://arxiv.org/abs/2504.10783) have especially considered the problem of planning through a sequence of convex sets.

In [None]:
# regions = []
# for i in tqdm(range(1, len(shortcut_path))):
#     q_tilde_0 = shortcut_path[i-1]
#     q_tilde_1 = shortcut_path[i]
#     if regions:
#         skip = False
#         for idx, region in enumerate(regions):
#             if region.PointInSet(q_tilde_1):
#                 print("Point is in region %d, skipping" % idx)
#                 skip = True
#                 break
#         if skip:
#             continue
#     for iris_options in [iris_zo_options, iris_np2_options]:
#         iris_options.sampled_iris_options.containment_points = np.array([
#             q_tilde_0,
#             q_tilde_1,
#         ]).T
#     q_tilde = (q_tilde_0 + q_tilde_1) / 2
# #     region = IrisZo(checker, Hyperellipsoid.MakeHypersphere(1e-2, q_tilde), domain, iris_zo_options)
#     region = IrisNp2(checker, Hyperellipsoid.MakeHypersphere(1e-2, q_tilde), domain, iris_np2_options)
#     regions.append(region.ReduceInequalities())

In [None]:
# intersections, _ = ComputePairwiseIntersections(regions, [])
# mat = np.zeros((len(regions), len(regions)))
# if intersections:
#     rows, cols = zip(*intersections)
#     mat[rows, cols] = 1

# plt.imshow(mat)
# plt.show()

# print("Number of connected components", nx.number_connected_components(nx.from_numpy_array(mat)))

## Planning with Kinematic Trajectory Optimization

Here, we show how to use Drake's [`KinematicTrajectoryOptimization`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1trajectory__optimization_1_1_kinematic_trajectory_optimization.html) to make plans with general (nonconvex) optimization.

We begin by constructing an initial trajectory that either approximately or exactly follows the BiRRT + shortcutting path. Some of the complexity is due to representations -- we initially produced a piecewise-linear path, then a sequence of `PiecewisePolynomial` trajectories, and then a `CompositeTrajectory`. But `KinematicTrajectoryOptimization` uses a [`BsplineTrajectory`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1trajectories_1_1_bspline_trajectory.html).

Next, we set up the optimization problem constraints. These are similar to the constraints used for region generation, although we explicitly use the all-in-one `FullFeasibilityConstraint` object, which checks subordinate arm reachability, subordinate arm joint limits, and collisions with a single parameterization call. (One could also use `IiwaBimanualReachableConstraint`, `IiwaBimanualJointLimitConstraint`, and `IiwaBimanualCollisionFreeConstraint` separately. We construct the constraint object, and then apply it at many points along the path. We also need to explicitly constrain the start and end configurations, and enforce that the path be collision-free.

Note that `IiwaBimanualCollisionFreeConstraint` and `FullFeasibilityConstraint` are not threadsafe, even though they are written in C++. (This is because they have to query a [`MultibodyPlant`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1multibody_1_1_multibody_plant.html), and making such an operation threadsafe is more complicated.

Finally, we show a few possible costs, including path length and path energy in the parameterized space, as well as custom costs that measure the path length and path energy in the full configuration space.

In [None]:
spline_order = 4  # Cubic spline
t0 = rrt_traj.start_time()
t1 = rrt_traj.end_time()

In [None]:
# Option 1: approximately follow the PL trajectory by directly using the vertices as the knot points.
# This doesn't guarantee feasibility, but is generally easier for the optimizer to improve past the initial guess.
control_points_matrix = np.array([
    point
    for point in shortcut_path
]).T

In [None]:
# # Option 2: exactly follow the PL trajectory by duplicating control points.
# # This better guarantees feasibility, but is harder for the optimizer to improve past the initial guess.
# control_points_matrix = np.array([
#     point
#     for point in shortcut_path
#     for _ in range(spline_order - 1)
# ]).T

# # You can also perturb the control points along the path a little bit, to avoid making
# # the initial guess super numerically unstable.
# eps = 1e-4

# num_points = len(shortcut_path)
# block_size = spline_order - 1

# for i in range(num_points):
#     start_idx = i * block_size
#     end_idx = start_idx + block_size

#     if i == 0:
#         direction = shortcut_path[1] - shortcut_path[0]
#     elif i == num_points - 1:
#         direction = shortcut_path[-1] - shortcut_path[-2]
#     else:
#         direction = shortcut_path[i + 1] - shortcut_path[i - 1]

#     norm = np.linalg.norm(direction)
#     direction = direction / norm if norm > 1e-12 else np.zeros_like(direction)

#     if i == 0:
#         offsets = eps * np.arange(block_size)
#     elif i == num_points - 1:
#         offsets = -eps * np.arange(block_size)
#     else:
#         offsets = eps * (np.arange(block_size) - (block_size - 1) / 2)

#     control_points_matrix[:, start_idx:end_idx] += np.outer(direction, offsets)

In [None]:
# Construct the initial guess, and the trajectory optimization object
basis = BsplineBasis(spline_order,
                     control_points_matrix.shape[1],
                     initial_parameter_value=t0,
                     final_parameter_value=t1)
initial_traj = BsplineTrajectory(basis, control_points_matrix)

trajopt = KinematicTrajectoryOptimization(initial_traj)

In [None]:
# Start and end points
trajopt.AddPathPositionConstraint(initial_traj.value(initial_traj.start_time()),
                                  initial_traj.value(initial_traj.start_time()),
                                  0)
trajopt.AddPathPositionConstraint(initial_traj.value(initial_traj.end_time()),
                                  initial_traj.value(initial_traj.end_time()),
                                  1)
pass

For collision avoidance, we use Drake's [`MinimumDistanceLowerBoundConstraint`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1multibody_1_1_minimum_distance_lower_bound_constraint.html), which (roughly speaking) takes the pairwise distances between the geometries in the world and requires the minimum one be above the given threshold. It's really doing something a lot more complicated, but if the output from evaluating the constraint is less than or equal to 1, then the configuration satisfies the requested distance bound.

Note that the `MinimumDistanceLowerBoundConstraint` object is being wrapped by `IiwaBimanualCollisionFreeConstraint` or `FullFeasibilityConstraint`. But because these are all just Python references to Drake C++ objects, the optimizer ultimately doesn't have to call into any Python code.

In [None]:
# Define full feasibility constraints
minimum_distance = 0.001 # 1mm
influence_distance = 0.05 # 5cm
minimum_distance_constraint_diagram_context = diagram.CreateDefaultContext()
minimum_distance_constraint_plant_context = plant.GetMyContextFromRoot(minimum_distance_constraint_diagram_context)
minimum_distance_lower_bound_constraint = MinimumDistanceLowerBoundConstraint(
    plant,
    minimum_distance,
    minimum_distance_constraint_plant_context,
    None,
    influence_distance - minimum_distance
)

full_feasibility_constraint = FullFeasibilityConstraint(
    iiwa_limits_lower,
    iiwa_limits_upper,
    shoulder_up,
    elbow_up,
    wrist_up,
    grasp_distance,
    minimum_distance_lower_bound_constraint
)

In [None]:
# Add the constraints. The AddPathPositionConstraint method applies the constraint
# at a given point along the length of the trajectory, where 0 is the start, and 1
# is the end.
num_constraints_to_apply = 100
for s in np.linspace(0, 1, num_constraints_to_apply):
    trajopt.AddPathPositionConstraint(full_feasibility_constraint, s)

For the parameterized path length and parameterized path energy costs, we take the control points in the parameterized space, map them to the full configuration space, and then take the length (or energy) of the implied piecewise linear path. Although this is only approximate (and not even a strict upper bound, as it would be without the parameterization due to nice properties of Bsplines), this works well in practice. But it is slower than the parameterized path length and energy costs. Check out `cpp_parameterization/cpp/iiwa_ik/costs.h` for more details.

Observe that we add these costs directly to the underlying `MathematicalProgram` within `trajopt` -- in general, you can make any change you want to the underlying program. Although doing things like removing decision variables is probably a bad idea!

In [None]:
# Various builtin path costs that can be added.
# trajopt.AddPathEnergyCost()
# trajopt.AddPathLengthCost()
pass

In [None]:
# Parameterized path costs

parameterized_path_energy = IiwaBimanualPathCost(
    trajopt.num_positions(),
    trajopt.num_control_points(),
    shoulder_up,
    elbow_up,
    wrist_up,
    grasp_distance,
    True
)
trajopt.prog().AddCost(parameterized_path_energy,
                       trajopt.control_points().flatten())

parameterized_path_length = IiwaBimanualPathCost(
    trajopt.num_positions(),
    trajopt.num_control_points(),
    shoulder_up,
    elbow_up,
    wrist_up,
    grasp_distance,
    False
)
# trajopt.prog().AddCost(parameterized_path_length,
#                        trajopt.control_points().flatten())

In [None]:
# Check constraint satisfaction at initial guess
satisfied = True
for binding in trajopt.prog().GetAllConstraints():
    c = binding.evaluator()
    val = c.Eval(trajopt.prog().GetInitialGuess(binding.variables()))
    lb, ub = c.lower_bound(), c.upper_bound()
    ok = np.all(val >= lb - 1e-8) and np.all(val <= ub + 1e-8)
    if not ok:
        mag1 = np.max(lb - val)
        mag2 = np.max(val - ub)
        mag = np.max([mag1, mag2])
        print(f"{c.__class__.__name__}: value={val}, bounds=[{lb}, {ub}], ok={ok}")
        print(f"    violation magnitude={mag:.2e}")
    satisfied &= ok

print("\nAll constraints satisfied at initial guess?", satisfied)

### Solving the Trajectory Optimization Problem

This is generally a slow optimization problem, although it does get a decent solution very quickly. You can play with the optimality tolerance.

Also, for nonconvex optimization, one must be careful about solver options. In particular, IPOPT seems to stray too far from feasibility and get stuck on these problems. Switching the [line search method](https://coin-or.github.io/Ipopt/OPTIONS.html#OPT_Line_Search) from filter to penalty seems to help. SNOPT did not require significant fiddling with options to get decent results, although your mileage may vary.

Finally, we have enabled logging to console for IPOPT, but this is unavailable for SNOPT. We've instead directed SNOPT to log to a file, `snopt.log`, which will be created automatically.

**Warning:** Drake includes SNOPT when you install via pip, but not when you build from source. If you are an academic, you can get a [SNOPT trial license](https://ccom.ucsd.edu/~optimizers/downloads/request/academic/).

In [None]:
trajopt_options = SolverOptions()

In [None]:
# IPOPT Settings
trajopt_options.SetOption(CommonSolverOption.kPrintToConsole, True)
trajopt_options.SetOption(IpoptSolver().solver_id(), "print_level", 5)
trajopt_options.SetOption(IpoptSolver().solver_id(), "max_wall_time", 60)

# Accept if feasible and cost decreases by less than 1%, 5 iterations in a row.
trajopt_options.SetOption(IpoptSolver().solver_id(), "acceptable_tol", 1e-2)
trajopt_options.SetOption(IpoptSolver().solver_id(), "acceptable_iter", 5)
trajopt_options.SetOption(IpoptSolver().solver_id(), "acceptable_constr_viol_tol", 1e-6)

# This option tries to prevent IPOPT from relaxing the constraint bounds very much.
trajopt_options.SetOption(IpoptSolver().solver_id(), "bound_relax_factor", 1e-12)

# Project the final solution onto the original bounds. (Unclear how essential this is.)
trajopt_options.SetOption(IpoptSolver().solver_id(), "honor_original_bounds", "yes")

# This option strongly encourages IPOPT to maintain feasibility, at the cost of worse globalization.
trajopt_options.SetOption(IpoptSolver().solver_id(), "line_search_method", "penalty")

# Needed temporarily due to an issue in drake or IPOPT.
trajopt_options.SetOption(IpoptSolver().solver_id(), "watchdog_shortened_iter_trigger", 0)

In [None]:
# SNOPT Settings
trajopt_options.SetOption(CommonSolverOption.kPrintToConsole, False)
trajopt_options.SetOption(CommonSolverOption.kPrintFileName, "snopt.log")
trajopt_options.SetOption(SnoptSolver().solver_id(), "Major print level", 1)  # 1 = summary, 0 = none, >1 = verbose
trajopt_options.SetOption(SnoptSolver().solver_id(), "Timing level", 3)  # Need to enable timing for time limits to work
trajopt_options.SetOption(SnoptSolver().solver_id(), "Time Limit", 60)
trajopt_options.SetOption(SnoptSolver().solver_id(), "Major optimality tolerance", 1e-1)

In [None]:
# solver = IpoptSolver()
solver = SnoptSolver()

# We pass in None for the initial guess, so it falls back to the already-specified initial guess.
result = solver.Solve(trajopt.prog(), None, trajopt_options)

In [None]:
trajopt_traj = trajopt.ReconstructTrajectory(result)
plt.plot(trajopt_traj.vector_values(np.linspace(trajopt_traj.start_time(), trajopt_traj.end_time(), 100)).T)
plt.show()

# Check constraint satisfaction at solution
satisfied = True
for binding in trajopt.prog().GetAllConstraints():
    c = binding.evaluator()
    val = c.Eval(result.GetSolution(binding.variables()))
    lb, ub = c.lower_bound(), c.upper_bound()
    ok = np.all(val >= lb - 1e-8) and np.all(val <= ub + 1e-8)
    if not ok:
        mag1 = np.max(lb - val)
        mag2 = np.max(val - ub)
        mag = np.max([mag1, mag2])
        print(f"{c.__class__.__name__}: value={val}, bounds=[{lb}, {ub}], ok={ok}")
        print(f"    violation magnitude={mag:.2e}")
    satisfied &= ok

print("\nAll constraints satisfied at solution?", satisfied)

# Retiming for Dynamics Limits

The trajectories produced by the planners generally do not satisfy the dynamics limits of the robots. (There are builtin methods to impose dynamics limits in `KinematicTrajectoryOptimization`, but they do not know about the parameterization.) But given a path, we can compute the best-possible speed at which we can follow the path using [Drake's implementation](https://drake.mit.edu/doxygen_cxx/classdrake_1_1multibody_1_1_toppra.html) of [Time-Optimal Path Parameterization based on Reachability Analysis (TOPPRA)](https://ieeexplore.ieee.org/iel7/8860/4359257/08338417.pdf).

To apply this class, we must first lift the trajectory into the full configuration space. We do this with Drake's [`FunctionHandleTrajectory`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1trajectories_1_1_function_handle_trajectory.html). We then must explicitly specify its first and second derivative. The first derivative is computed with automatic differentiation and a simple application of the chain rule. The second is calculated in terms of the first using a [five-point stencil](https://en.wikipedia.org/wiki/Five-point_stencil), since [Drake's automatic differentiation does not support Hessians](https://stackoverflow.com/a/71028053/9796174).

In [None]:
# Specify which planner's trajectory you want to retime and visualize.

# traj_to_show = GcsTrajectoryOptimization.NormalizeSegmentTimes(gcs_traj)
# traj_to_show = rrt_traj.Clone()
traj_to_show = trajopt_traj.Clone()

In [None]:
traj_function = lambda t : iris_np2_options.parameterization.get_parameterization_double()(traj_to_show.value(t).flatten())
full_traj = FunctionHandleTrajectory(traj_function, 14, 1, traj_to_show.start_time(), traj_to_show.end_time())

In [None]:
# ChatGPT wrote this. Can you tell? ;)

def full_traj_derivative(t, order, dt=1e-6):
    """
    Compute trajectory derivatives using finite differences.
    
    Args:
        t: time at which to evaluate.
        order: 1 for first derivative, 2 for second derivative.
        dt: small finite difference step for second derivative.
        
    Returns:
        dydt: (14,1) derivative vector.
    """
    if order == 1:
        # First derivative: use original AutoDiff approach
        x_val = traj_to_show.value(t).flatten()           # (8,)
        xdot_val = traj_to_show.EvalDerivative(t, 1).flatten()  # (8,)

        x_ad = InitializeAutoDiff(x_val).flatten()  # AutoDiff vector
        y_ad = iris_np2_options.parameterization.get_parameterization_autodiff()(x_ad)  # (14,)
        J = ExtractGradient(y_ad)  # (14x8)
        dydt = J @ xdot_val.reshape(-1, 1)  # (14,1)
        return dydt

    elif order == 2:
        return second_derivative_stencil(t, dt)

    else:
        raise RuntimeError(f"Only first and second derivatives supported (requested order={order})")

def second_derivative_stencil(t, dt=1e-6):
    """
    Compute second derivative using a 5-point central difference (more stable).
    """
    f0 = full_traj_derivative(t, order=1)
    f1 = full_traj_derivative(t + dt, order=1)
    f2 = full_traj_derivative(t - dt, order=1)
    f3 = full_traj_derivative(t + 2*dt, order=1)
    f4 = full_traj_derivative(t - 2*dt, order=1)

    d2y_dt2 = (-f3 + 16*f1 - 30*f0 + 16*f2 - f4) / (12 * dt**2)
    return d2y_dt2

# We have to explicitly specify the derivative function as another callable.
full_traj.set_derivative(full_traj_derivative)

In [None]:
# Check derivatives

def finite_difference_derivative(traj_function, t, eps=1e-6):
    """Return dy/dt via centered finite differencing."""
    y_plus = traj_function(t + eps)
    y_minus = traj_function(t - eps)
    dydt_fd = (y_plus - y_minus) / (2 * eps)
    return dydt_fd.reshape(-1)

t_test = (full_traj.start_time() + full_traj.end_time()) / 2

y = full_traj.value(t_test).flatten()
ydot_analytic = full_traj.EvalDerivative(t_test, 1).flatten()
ydot_fd = finite_difference_derivative(traj_function, t_test)

diff = ydot_analytic - ydot_fd
rel_err = np.linalg.norm(diff) / np.linalg.norm(ydot_fd)

print("y(t):", y)
print("Analytic ẏ(t):", ydot_analytic)
print("FD approx ẏ(t):", ydot_fd)
print("‖error‖ =", np.linalg.norm(diff))
print("‖error‖ / ‖ẏ_fd‖ =", rel_err)

Choosing grid points for TOPPRA can be a bit frustrating in Drake, and [there are some potential numerical issues in the implementation](https://github.com/RobotLocomotion/drake/issues/20619). Hopefully, some of the possibilities below work for you. Definitely check out [`CalcGridPointsOptions`](https://drake.mit.edu/doxygen_cxx/structdrake_1_1multibody_1_1_calc_grid_points_options.html).

For piecewise trajectories, it is often helpful (or even necessary) to add grid points at precisely the time when the segments meet. [`GcsTrajectoryOptimization::NormalizeSegmentTimes`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1planning_1_1trajectory__optimization_1_1_gcs_trajectory_optimization.html#af905be61c4d0357e5240da1af044fd72) will retime a path to spend one second per segment to make this easier, if you're planning on retiming with TOPPRA anyways.

In [None]:
# n_segments = traj_to_show.get_number_of_segments()
# n_gridpoints = 30 * n_segments + 1
# gridpoints = np.linspace(full_traj.start_time(), full_traj.end_time(), n_gridpoints)

gridpoints = Toppra.CalcGridPoints(full_traj, CalcGridPointsOptions(max_iter=2, min_points=200))
print(len(gridpoints))

# Add extra gridpoints on the boundary of each trajectory segment.
# n_segments = traj_to_show.get_number_of_segments()
# extra_gridpoints = np.arange(n_segments + 1, dtype=float)

# print(full_traj.end_time())

# gridpoints = np.unique(np.concatenate((gridpoints, extra_gridpoints)))
# print(gridpoints)
# print(len(gridpoints))

In [None]:
toppra = Toppra(full_traj, plant, gridpoints)
toppra.AddJointVelocityLimit(plant.GetVelocityLowerLimits(), plant.GetVelocityUpperLimits())
toppra.AddJointAccelerationLimit(plant.GetAccelerationLowerLimits(), plant.GetAccelerationUpperLimits())
toppra.AddJointTorqueLimit(plant.GetEffortLowerLimits(), plant.GetEffortUpperLimits())
time_traj = toppra.SolvePathParameterization()

In [None]:
retimed_full_traj = PathParameterizedTrajectory(full_traj, time_traj)
print("Trajectory duration:", (retimed_full_traj.end_time() - retimed_full_traj.start_time()))
# If this is `inf`, then try setting the `max_iter` or `min_points` options higher in Toppra.CalcGridPoints

# Visualize the Plan

You're done! Assuming everything worked, you now have a collision-free, kinematically valid trajectory that obeys the dynamic limits of your robot. The following code will visualize it, approximately in realtime!

In [None]:
# qs = [retimed_full_traj.value(t).flatten() for t in np.linspace(retimed_full_traj.start_time(), retimed_full_traj.end_time(), 1000)]
# plt.figure(figsize=(20, 12)) # Creates a figure with 10 inches width and 6 inches height
# plt.plot(np.linspace(retimed_full_traj.start_time(), retimed_full_traj.end_time(), 1000), qs)
# plt.show()

In [None]:
# context = diagram.CreateDefaultContext()
# plant_context = plant.GetMyContextFromRoot(context)

# frame_delay = 1.0 / 60
# for t in np.arange(retimed_full_traj.start_time(), retimed_full_traj.end_time(), frame_delay):
#     q = retimed_full_traj.value(t).flatten()
#     plant.SetPositions(plant_context, q)
#     diagram.ForcedPublish(context)
#     time.sleep(frame_delay)

You can also set up a small `Diagram` with a [`TrajectorySource`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1systems_1_1_trajectory_source.html) in order to record the plan and store it as a static HTML file. To actually simulate the trajectory, we use an [`InverseDynamicsController`](https://drake.mit.edu/doxygen_cxx/classdrake_1_1systems_1_1controllers_1_1_inverse_dynamics_controller.html). Uncomment the diagram visualization block below to see this.

In [None]:
simulation_diagram_builder = DiagramBuilder()

trajectory_source = TrajectorySource(retimed_full_traj, 2)
simulation_diagram_builder.AddSystem(trajectory_source)

nq = diagram.plant().num_positions()
kp = np.full(nq, 1000.0)
ki = np.full(nq, 1.0)
kd = np.full(nq, 20.0)
controller = InverseDynamicsController(
    diagram.plant(),
    kp, ki, kd,
    has_reference_acceleration=True
)
simulation_diagram_builder.AddSystem(controller)

state_acceleration_demultiplexer = Demultiplexer([28, 14])
simulation_diagram_builder.AddSystem(state_acceleration_demultiplexer)

simulation_diagram_builder.AddSystem(diagram)

simulation_diagram_builder.Connect(
    trajectory_source.get_output_port(),
    state_acceleration_demultiplexer.get_input_port()
)
simulation_diagram_builder.Connect(
    state_acceleration_demultiplexer.get_output_port(0),
    controller.get_input_port_desired_state()
)
simulation_diagram_builder.Connect(
    state_acceleration_demultiplexer.get_output_port(1),
    controller.get_input_port_desired_acceleration()
)
simulation_diagram_builder.Connect(
    diagram.GetOutputPort("state"),
    controller.get_input_port_estimated_state()
)
simulation_diagram_builder.Connect(
    controller.get_output_port_control(),
    diagram.GetInputPort("actuation")
)

simulation_diagram = simulation_diagram_builder.Build()

In [None]:
# # Optionally visualize the plant and diagram. You'll need pydot for this.
# import pydot
# from IPython.display import display, Image, SVG

# def plot_dot(dot_str):
#     display(
#         Image(
#             pydot.graph_from_dot_data(dot_str)[0].create_png()
#         )
#     )

# def notebook_plot_plant(plant):
#     plot_dot(plant.GetTopologyGraphvizString())

# def notebook_plot_diagram(diagram):
#     plot_dot(diagram.GetGraphvizString())

# notebook_plot_plant(plant)
# notebook_plot_diagram(simulation_diagram)

In [None]:
from pydrake.all import Simulator

simulator_diagram_context = simulation_diagram.CreateDefaultContext()
plant.SetPositions(
    plant.GetMyContextFromRoot(simulator_diagram_context),
    retimed_full_traj.value(retimed_full_traj.start_time())
)

recorder = diagram.GetSubsystemByName("meshcat_visualizer(visual)")

simulator = Simulator(simulation_diagram, simulator_diagram_context)
simulator.set_target_realtime_rate(1.0)

recorder.StartRecording()
simulator.AdvanceTo(retimed_full_traj.end_time() + 1.0)
recorder.PublishRecording()

In [None]:
# Save the simulation playback into a standalone HTML file.
with open("trajectory.html", "w") as f:
    f.write(meshcat.StaticHtml())