In [32]:
%load_ext autoreload

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [33]:
import numpy as np
from functools import partial
import visualization_utils as vis_utils
from iris_plant_visualizer import IrisPlantVisualizer
import ipywidgets as widgets
from IPython.display import display
from scipy.linalg import block_diag
import matplotlib.pyplot as plt
from pathlib import Path
import os

In [34]:
#pydrake imports
from pydrake.common import FindResourceOrThrow
from pydrake.multibody.parsing import Parser
from pydrake.multibody.plant import AddMultibodyPlantSceneGraph
from pydrake.systems.framework import DiagramBuilder
from pydrake.geometry import Role, GeometrySet, CollisionFilterDeclaration
from pydrake.all import RigidTransform, RollPitchYaw, RevoluteJoint
from pydrake.all import RotationMatrix,MeshcatVisualizer, StartMeshcat, Sphere
import pydrake.symbolic as sym
from pydrake.solvers import MosekSolver, CommonSolverOption, SolverOptions, ScsSolver
from pydrake.all import PointCloud, MeshcatVisualizerParams, Role, HalfSpace, CoulombFriction, Box, Rgba, WeldJoint, GeometrySet
from pydrake.polynomial import Polynomial as PolynomialCommon
import time
from scipy.spatial.transform import Rotation as sp_rot
from sampling_based_motion_planners import StraightLineCollisionChecker, PRM, BiRRT, PRMFixedEdges


from pydrake.all import (RationalForwardKinematics, ModelInstanceIndex,
                         SpatialInertia, RevoluteJoint, FixedOffsetFrame, MultibodyPlant)
from pydrake.geometry.optimization import HPolyhedron, Hyperellipsoid
from pydrake.geometry.optimization_dev import CspaceFreePath
from dataclasses import dataclass


In [35]:
import logging
drake_logger = logging.getLogger("drake")
drake_logger.setLevel(logging.DEBUG)

# Build and set up the visualization the plant and the visualization of the C-space obstacle

Note that running this cell multiple times will establish multiple meshcat instances which can fill up your memory. It is a good idea to call "pkill -f meshcat" from the command line before re-running this cell


In [36]:
meshcat = StartMeshcat()
meshcat.Delete()

INFO:drake:Meshcat listening for connections at http://localhost:7001


In [49]:

def AddPendulumLink(
    plant,
    pendulum_model_instance,
    link_dimensions,
    parent_frame,
    link_index: int,
    color=np.array([1, 0, 0, 1]),
):
    pendulum_box = Box(*link_dimensions)
    pendulum_inertia = SpatialInertia.SolidBoxWithDensity(1.0, *link_dimensions)

    link_name = f"link_{link_index}"
    pend_body = plant.AddRigidBody(link_name, pendulum_model_instance, pendulum_inertia)
    parent_to_child_attach_frame = plant.AddFrame(
        FixedOffsetFrame(
            f"{link_name}_joint_frame",
            pend_body.body_frame(),
            RigidTransform(p=(0, link_dimensions[1] / 2, 0)),
        )
    )
    next_frame = plant.AddFrame(
        FixedOffsetFrame(
            f"{link_name}_joint_frame",
            pend_body.body_frame(),
            RigidTransform(p=(0, -link_dimensions[1] / 2, 0)),
        )
    )
    # add the revolute joint,
    joint_lim = np.pi
    plant.AddJoint(
        RevoluteJoint(
            f"joint_{link_index}",
            parent_frame,
            parent_to_child_attach_frame,
            [0, 0, 1],
            -joint_lim,
            joint_lim,
        )
    )

    geom_id = plant.RegisterCollisionGeometry(
        pend_body, RigidTransform(), pendulum_box, link_name, CoulombFriction()
    )
    plant.RegisterVisualGeometry(
        pend_body, RigidTransform(), pendulum_box, link_name, color
    )

    return next_frame, geom_id


def N_Link_Pendulum(
    n: int,
    plant: MultibodyPlant,
    pendulum_model_instance: ModelInstanceIndex,
    link_dimensions: np.ndarray,
):
    assert n > 0
    parent_frame = plant.world_frame()
    colors = np.array(vis_utils.n_colors(n)) / 255
    colors = np.hstack([colors, np.ones((colors.shape[0], 1))])
    geom_ids = []
    for i in range(n):
        parent_frame, geom_id = AddPendulumLink(
            plant,
            pendulum_model_instance,
            link_dimensions,
            parent_frame,
            i + 1,
            colors[i],
        )
        geom_ids.append(geom_id)
    return geom_ids


def AddRandomBox(
    plant: MultibodyPlant,
    obstacle_model_instance: ModelInstanceIndex,
    index: int,
    pos_limits,
    size_limits,
    color=[0, 0, 0, 1],
):
    link_name = f"obstacle_{index}"
    
    
    def random_center():
        dist = pos_limits[1] * np.sqrt(np.random.uniform(0,1))
        ang = np.random.uniform(0,2*np.pi)
        return dist*np.array([np.cos(ang), np.sin(ang), 0])
    center = random_center()#np.append(np.random.uniform(pos_limits[0], pos_limits[1], 2), 0)

    

    dimensions = np.append(np.random.uniform(size_limits[0], size_limits[1], 2), 0.001)
    box = Box(*dimensions)
    inertia = SpatialInertia.SolidBoxWithDensity(1, *dimensions)

    body = plant.AddRigidBody(link_name, obstacle_model_instance, inertia)
    
    random_rp = np.zeros(3)
    lim = np.pi / 10
    random_rp[2] = np.random.uniform(-lim, lim)
    angle = RotationMatrix(RollPitchYaw(random_rp))
    pose = RigidTransform(p=center) @ RigidTransform(R=angle)
    origin_local = pose.inverse()@RigidTransform()
    while np.all([-dimensions[i]/2 <= origin_local.translation()[i] < dimensions[i]/2 for i in range(3)]): 
        random_rp[2] = np.random.uniform(-lim, lim)
        angle = RotationMatrix(RollPitchYaw(random_rp))
        center = random_center()
        pose = RigidTransform(p=center)@RigidTransform(R=angle)
        origin_local = pose.inverse()@RigidTransform()
        
    plant.AddJoint(WeldJoint(link_name, plant.world_frame(), body.body_frame(), pose))
    col_id = plant.RegisterCollisionGeometry(
        body, RigidTransform(), box, link_name, CoulombFriction()
    )
    plant.RegisterVisualGeometry(body, RigidTransform(), box, link_name, color)
    return col_id

def AddNRandomBoxes(
    n: int,
    plant: MultibodyPlant,
    obstacle_model_instance: ModelInstanceIndex,
    pos_limits=(-10, 10),
    size_limits=(0.1, 1),
    color=[0, 0, 0, 1],
):
    assert n > 0
    joint_sphere = Sphere(0.05)
    plant.RegisterVisualGeometry(plant.world_body(), RigidTransform(),
                                joint_sphere, "pend_start", 
                                np.array([0,0,1,1]))
    geom_ids = []
    for i in range(n):
        geom_ids.append(AddRandomBox(plant, obstacle_model_instance, i, pos_limits, size_limits, color))
    return geom_ids


def make_line_polys(plant: MultibodyPlant, prm):
    polys = np.empty(shape=(plant.num_positions(), len(prm.prm.edges())), dtype=object)
    for i, (s0, s1) in enumerate(prm.prm.edges()):
        for j in range(plant.num_positions()):
            polys[j, i] = PolynomialCommon(np.array([s0[j], s1[j] - s0[j]]))
    return polys

class N_Link_K_Boxes_Experiment:
    def __init__(self, n: int, k: int, exclude_pend_self_collisions=True, plane_order=1, maximum_path_degree=1, 
                meshcat_instance = None):
        builder = DiagramBuilder()
        self.plant, self.scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.001)

        self.link_dimensions = (0.1, 0.2, 0.001)
        self.pendulum_model_instance = self.plant.AddModelInstance("pendulum")
        self.obstacle_model_instance = self.plant.AddModelInstance("obstacle")

        num_pend = n
        num_obstacles = k
        np.random.seed(num_pend * num_obstacles)

        self.pend_geom_ids = N_Link_Pendulum(
            num_pend, self.plant, self.pendulum_model_instance, np.array(self.link_dimensions)
        )
        self.pend_geom_set = GeometrySet(self.pend_geom_ids)

        if k > 0:
            obstacle_pos_limits = self.link_dimensions[1] * num_pend * np.array([-1.25, 1.25])
            obstacle_size_limits = (0.01 / np.log(num_obstacles), 0.5 / np.log(num_obstacles))
            self.obstacle_geom_ids=AddNRandomBoxes(
                num_obstacles,
                self.plant,
                self.obstacle_model_instance,
                obstacle_pos_limits,
                obstacle_size_limits,
            )
            self.obstacle_geom_set = GeometrySet(self.obstacle_geom_ids)
        else:
            self.obstacle_geom_ids = None
            self.obstacle_geom_set = None

        self.plant.Finalize()
        if exclude_pend_self_collisions:
            collision_filter_manager = self.scene_graph.collision_filter_manager()
            decl = CollisionFilterDeclaration().ExcludeWithin(self.pend_geom_set)
            collision_filter_manager.Apply(decl)
        
        if meshcat_instance is not None:
            self.visualizer = MeshcatVisualizer.AddToBuilder(builder, self.scene_graph, meshcat_instance)

        self.diagram = builder.Build()

        self.q_star = np.zeros(self.plant.num_positions())

        if maximum_path_degree > 0:
            t0 = time.time()
            self.cspace_free_path = CspaceFreePath(
                self.plant, self.scene_graph, self.q_star, 
                maximum_path_degree=maximum_path_degree, 
                plane_order=plane_order
            )
            t1 = time.time()
            print(f"Time to construct line certifier for {n}-links, {k}-boxes = {t1 - t0}s")
        else:
            print("Did not build cspace free path")
            self.cspace_free_path = None


In [50]:
meshcat.Delete()
n = 12
plane_order = 1
maximum_path_degree = -1
k = 100
experiment = \
    N_Link_K_Boxes_Experiment(n,k,exclude_pend_self_collisions = True,
                                            plane_order=plane_order,
                                            maximum_path_degree=maximum_path_degree,
                             meshcat_instance = meshcat)
diagram_context = experiment.diagram.CreateDefaultContext()
plant_context = experiment.diagram.GetMutableSubsystemContext(experiment.plant, diagram_context)
experiment.diagram.ForcedPublish(diagram_context)

Did not build cspace free path


In [52]:
Ratfk = RationalForwardKinematics(experiment.plant)
diagram_col_context = experiment.diagram.CreateDefaultContext()
plant_col_context = experiment.diagram.GetMutableSubsystemContext(experiment.plant, 
                                                                  diagram_col_context)
scene_graph_col_context = experiment.diagram.GetMutableSubsystemContext(
        experiment.scene_graph, diagram_col_context
    )
query_port = experiment.scene_graph.get_query_output_port()

def check_collision_q_by_query(q):
    if np.all(q >= experiment.plant.GetPositionLowerLimits()) and np.all(
        q <= experiment.plant.GetPositionUpperLimits()
    ):
        experiment.plant.SetPositions(plant_col_context, q)
        query_object = query_port.Eval(scene_graph_col_context)
        return 1 if query_object.HasCollisions() else 0
    else:
        return 1

def check_collision_s_by_query(s):
    s = np.array(s)
    q = Ratfk.ComputeQValue(s, experiment.q_star)
    return check_collision_q_by_query(q)

def sample_col_free_point():
    q = np.random.uniform(
        experiment.plant.GetPositionLowerLimits(), experiment.plant.GetPositionUpperLimits()
    )
    s = Ratfk.ComputeSValue(q, experiment.q_star)
    return s
l = 100
collision_checker = StraightLineCollisionChecker(
                    check_collision_s_by_query, 100
                )
prm = PRMFixedEdges(
                sample_col_free_point, l, collision_checker, dist_thresh=100
            )
diagram_vis_context = experiment.diagram.CreateDefaultContext()
plant_vis_context = experiment.diagram.GetMutableSubsystemContext(experiment.plant, diagram_vis_context)
vis_bundle = vis_utils.VisualizationBundle(
    experiment.diagram, diagram_context, experiment.plant, plant_vis_context,
    Ratfk, meshcat, experiment.q_star
)
end_effector = experiment.plant.GetBodyByName(f"link_{n}")

prm.draw_tree(vis_bundle, end_effector)

PRM has 10 nodes
PRM has 0 edges
PRM has 20 nodes
PRM has 0 edges
PRM has 30 nodes
PRM has 0 edges
PRM has 40 nodes
PRM has 0 edges
PRM has 50 nodes
PRM has 2 edges
PRM has 60 nodes
PRM has 5 edges
PRM has 70 nodes
PRM has 5 edges
PRM has 80 nodes
PRM has 6 edges
PRM has 90 nodes
PRM has 8 edges
PRM has 100 nodes
PRM has 9 edges
PRM has 110 nodes
PRM has 9 edges
PRM has 120 nodes
PRM has 12 edges
PRM has 130 nodes
PRM has 12 edges
PRM has 140 nodes
PRM has 14 edges
PRM has 150 nodes
PRM has 16 edges
PRM has 160 nodes
PRM has 17 edges
PRM has 170 nodes
PRM has 17 edges
PRM has 180 nodes
PRM has 17 edges
PRM has 190 nodes
PRM has 20 edges
PRM has 200 nodes
PRM has 21 edges
PRM has 210 nodes
PRM has 22 edges
PRM has 220 nodes
PRM has 24 edges
PRM has 230 nodes
PRM has 26 edges
PRM has 240 nodes
PRM has 26 edges
PRM has 250 nodes
PRM has 31 edges
PRM has 260 nodes
PRM has 33 edges
PRM has 270 nodes
PRM has 38 edges
PRM has 280 nodes
PRM has 39 edges
PRM has 290 nodes
PRM has 41 edges
PRM h

DEBUG:drake:Meshcat connection closed from 0000:0000:0000:0000:0000:0000:0000:0001
DEBUG:drake:Meshcat connection opened from 0000:0000:0000:0000:0000:0000:0000:0001


PRM has 440 nodes
PRM has 72 edges
PRM has 450 nodes
PRM has 80 edges
PRM has 460 nodes
PRM has 81 edges
PRM has 470 nodes
PRM has 86 edges
PRM has 480 nodes
PRM has 89 edges
PRM has 490 nodes
PRM has 93 edges
PRM has 500 nodes
PRM has 97 edges
PRM has 510 nodes
PRM has 100 edges
PRM has 510 nodes
PRM has 100 edges


In [40]:
def make_line_polys(prm, max_num_edges = -1):
    
    polys = np.empty(shape=(experiment.plant.num_positions(), 
                            len(prm.prm.edges()) if max_num_edges < 0 else max_num_edges), dtype = object)
    for i, (s0, s1) in enumerate(prm.prm.edges()):
        for j in range(experiment.plant.num_positions()):
            if max_num_edges > 0 and not i < max_num_edges:
                break
            polys[j,i] = PolynomialCommon(np.array([s0[j],s1[j]-s0[j]]))
        
    return polys

path_safe = make_line_polys(prm)
# path_unsafe = make_line_polys(PRM_bad)
    

In [10]:
path_safe.shape

(5, 200)

In [11]:
from pydrake.all import PiecewisePolynomial
from tqdm import tqdm
traj = PiecewisePolynomial(path_safe.T,
                           np.arange(path_safe.shape[1]-1))


for idx in tqdm(range(path_safe.shape[1])):
    has_col = False
    for t in np.linspace(0,1,100):
        cur_time = idx + t
        s = traj.value(cur_time)
        q = Ratfk.ComputeQValue(s, experiment.q_star)
        experiment.plant.SetPositions(plant_context, q)
        if check_collision_q_by_query(q):
            print(f"COL at idx {idx}")
            has_col = True
#             break
#     if has_col and idx != 88:
#         experiment.diagram.ForcedPublish(diagram_context)
#         break
#         time.sleep(0.01)

100%|███████████████████████████████████████████████████████████████████████████████████| 200/200 [00:01<00:00, 137.68it/s]


In [12]:
for idx in experiment.plant.GetBodyIndices(experiment.pendulum_model_instance):
    print(experiment.plant.get_body(idx).name())

link_1
link_2
link_3
link_4
link_5


In [13]:
cert_options = CspaceFreePath.FindSeparationCertificateGivenPathOptions()
cert_options.terminate_segment_certification_at_failure = False

cert_options.num_threads = -1
cert_options.verbose = False
cert_options.solver_id = MosekSolver.id()
cert_options.solver_options = SolverOptions()
cert_options.terminate_path_certification_at_failure = False



In [14]:
import pickle
path_to_data_save_folder = Path(
    "/home/amice/Documents/coding_projects/drake/C_Iris_Examples/final_experiment_data/"
)
for num_links in range(2, n+1):
    ignored_col_set = set([elt for elt in zip(experiment.pend_geom_ids[:-1], experiment.pend_geom_ids[1:])])
    for j in range(num_links, experiment.plant.num_positions()): #+1 for num joints +1 to include last joint name
        pend_j_col_geom = experiment.pend_geom_ids[j]
        for obs_id in experiment.obstacle_geom_ids:
            ignored_col_set.add((pend_j_col_geom, obs_id))
    t0 = time.time()
    statistics, cert_result = \
        experiment.cspace_free_path.FindSeparationCertificateGivenPath(path_safe, 
                                                            ignored_col_set,
                                                            cert_options)
    t1 = time.time()
    with open(str(path_to_data_save_folder) + (f"{num_links}_links_{k}_obstacles_{l}_edges.pkl"), "wb") as f:
        pickle.dump(statistics[0], f)    
    print(f"num pairs to certify {len(statistics[0].total_time_to_certify_pair)}")
    print(f"Certification of safe PRM for {num_links} links in {t1-t0}s")
    print(f"Frac edges safe = {sum([1 if s.certified_safe() else 0 for s in statistics])}/{len(statistics)}")
    print(f"Unsafe inds = {[idx for idx, s in enumerate(statistics) if not s.certified_safe()]}")
        

num pairs to certify 200
Certification of safe PRM for 2 links in 41.01732611656189s
Frac edges safe = 198/200
Unsafe inds = [4, 7]
num pairs to certify 300
Certification of safe PRM for 3 links in 80.23141527175903s
Frac edges safe = 198/200
Unsafe inds = [4, 7]
num pairs to certify 400
Certification of safe PRM for 4 links in 122.95765829086304s
Frac edges safe = 193/200
Unsafe inds = [4, 7, 26, 127, 129, 176, 188]
num pairs to certify 500
Certification of safe PRM for 5 links in 199.91900157928467s
Frac edges safe = 181/200
Unsafe inds = [4, 7, 10, 11, 26, 77, 121, 123, 125, 126, 127, 128, 129, 130, 146, 176, 179, 185, 188]


In [15]:
sum_milli = 0
for s in statistics:
    sum_milli +=s.total_time_to_certify()
    print(s.total_time_to_certify())
print(f"Total time = {sum_milli/1000/20}")

14424.502741000026
14157.887852999984
13766.247788000004
12934.161508999998
13281.846458999995
13272.478678999998
13501.053431999997
14019.871520000004
13847.181563999999
13106.661011999993
22111.57580799998
14272.244973000012
13974.257307999997
12692.138853000004
13575.691278999995
12824.262540999993
13461.135281
12411.888934000002
12753.045133999998
12445.86008899999
13563.797626999994
13442.668706000004
14052.530053999997
13389.064289
13924.613913000007
14541.13922499998
14799.986574999994
13475.505764000001
13339.538372000003
13240.087786000002
13473.648512000012
13712.828099999992
13367.138061999996
13928.407122999994
13173.068041999994
13221.488329000002
12474.23599400001
12428.408034999991
13344.636631000025
12768.517033000011
13251.795668000012
14454.278362999996
12898.534838999985
13241.42737200001
14587.99692800001
14430.061253999998
14046.861828999994
13165.015394999991
12528.483374999994
13151.76352
14029.609452999986
13396.608885999982
12934.11378900001
13684.969171000012


In [29]:
inspector = experiment.scene_graph.model_inspector()
for idx,s in enumerate(statistics):
    if not s.certified_safe():
        cert = cert_result[idx]
        for i, (pair, result) in enumerate(cert.items()):
            if not result.result.is_success():
                n0,n1 = inspector.GetName(inspector.GetFrameId(pair[0])), inspector.GetName(inspector.GetFrameId(pair[1]))
                print(f"index= {idx}, pair = {(n0,n1)}, rescode = {s.rescodes[i]}," 
                      f"sol_status = {s.solution_statuses[i]}")
    print()





index= 4, pair = ('pendulum::link_2', 'obstacle::obstacle_1'), rescode = 0,sol_status = 1



index= 7, pair = ('pendulum::link_3', 'obstacle::obstacle_1'), rescode = 0,sol_status = 1
index= 7, pair = ('pendulum::link_3', 'obstacle::obstacle_64'), rescode = 0,sol_status = 1
index= 7, pair = ('pendulum::link_2', 'obstacle::obstacle_1'), rescode = 0,sol_status = 1



index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_62'), rescode = 0,sol_status = 1
index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_87'), rescode = 0,sol_status = 1
index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_37'), rescode = 0,sol_status = 1
index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_50'), rescode = 0,sol_status = 1
index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_93'), rescode = 0,sol_status = 1
index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_96'), rescode = 0,sol_status = 1
index= 10, pair = ('pendulum::link_5', 'obstacle::obstacle_71'), rescode = 0,

In [27]:
statistics[121].solution_statuses

{499: 1,
 498: 1,
 497: 1,
 496: 0,
 495: 1,
 494: 1,
 493: 1,
 492: 1,
 491: 1,
 490: 1,
 489: 1,
 488: 0,
 487: 0,
 486: 1,
 485: 1,
 484: 0,
 483: 1,
 482: 1,
 481: 0,
 480: 0,
 479: 1,
 478: 1,
 477: 1,
 476: 1,
 475: 1,
 474: 1,
 473: 1,
 472: 1,
 471: 0,
 470: 1,
 469: 1,
 468: 1,
 467: 1,
 466: 1,
 465: 1,
 464: 0,
 463: 1,
 462: 1,
 461: 1,
 460: 0,
 459: 0,
 458: 1,
 457: 1,
 456: 1,
 455: 1,
 454: 1,
 453: 1,
 452: 1,
 451: 1,
 450: 1,
 449: 1,
 448: 1,
 447: 0,
 446: 1,
 445: 1,
 444: 1,
 443: 0,
 442: 1,
 441: 0,
 440: 1,
 439: 0,
 438: 1,
 437: 0,
 436: 1,
 435: 1,
 434: 1,
 433: 0,
 432: 0,
 431: 1,
 430: 0,
 429: 1,
 428: 1,
 427: 0,
 426: 0,
 425: 1,
 424: 1,
 423: 1,
 422: 0,
 421: 1,
 420: 1,
 419: 1,
 418: 1,
 417: 1,
 416: 0,
 415: 1,
 414: 1,
 413: 0,
 412: 1,
 411: 1,
 410: 1,
 409: 0,
 408: 1,
 407: 1,
 406: 0,
 405: 1,
 404: 0,
 403: 1,
 402: 1,
 401: 1,
 400: 1,
 399: 1,
 398: 1,
 397: 1,
 396: 1,
 395: 1,
 394: 1,
 393: 1,
 392: 1,
 391: 1,
 390: 1,
 389: 1,
 

In [51]:
cert_result[193]
itm = None
for pair, result in cert.items():
    if not result.result.is_success():
        itm = result
details = itm.result.get_solver_details()

In [55]:
print(details.rescode)
print(details.solution_status)

0
0


In [20]:
for idx in [idx for idx, s in enumerate(statistics) if not s.certified_safe()]:
    cur_traj = PiecewisePolynomial([path_safe[:,idx]], [0,1])
    for t in np.linspace(0,1,1000):
        s = cur_traj.value(t)
        q = Ratfk.ComputeQValue(s, experiment.q_star)
        experiment.plant.SetPositions(plant_context, q)
        experiment.diagram.ForcedPublish(diagram_context)
#         time.sleep(0.01)
        if check_collision_q_by_query(q):
            print("collision detected")
            break
            

collision detected
collision detected


In [None]:
plane_order = 1
maximum_path_degree = 1
t0 = time.time()
cspace_free_path = CspaceFreePath(
    plant, scene_graph, q_star, maximum_path_degree=maximum_path_degree, plane_order=plane_order
)
t1 = time.time()
print(f"Time to construct line certifier for {n}-links, {k}-boxes = {t1 - t0}s")

In [None]:
meshcat.Delete()
builder = DiagramBuilder()
plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.001)

visualizer = MeshcatVisualizer.AddToBuilder(
    builder, scene_graph, meshcat)

link_dimensions = (0.1, 0.2, 0.001)
pendulum_box = Box(*link_dimensions)
pendulum_inertia = SpatialInertia.SolidBoxWithDensity(1, *link_dimensions)
pendulum_model_instance = plant.AddModelInstance("pendulum")
def AddPendulumLink(parent_frame, link_index: int, color = np.array([1,0,0,1])):
    link_name = f"link_{link_index}"
    pend_body = plant.AddRigidBody(link_name, pendulum_model_instance, pendulum_inertia)
    parent_to_child_attach_frame = plant.AddFrame(
        FixedOffsetFrame(f"{link_name}_joint_frame", pend_body.body_frame(), 
                                  RigidTransform(p = (0,link_dimensions[1]/2, 0)))
    )
    next_frame = plant.AddFrame(
        FixedOffsetFrame(f"{link_name}_joint_frame", pend_body.body_frame(), 
                                  RigidTransform(p = (0,-link_dimensions[1]/2, 0)))
    )
    # add the revolute joint,
    joint_lim = np.pi
    plant.AddJoint(RevoluteJoint(
        f"joint_{link_index}", parent_frame, parent_to_child_attach_frame, [0,0,1],
        -joint_lim, joint_lim,
    )
    )
    
    geom_id = plant.RegisterCollisionGeometry(pend_body, RigidTransform(),
                                    pendulum_box, link_name, CoulombFriction())
    plant.RegisterVisualGeometry(pend_body, RigidTransform(),
                                pendulum_box, link_name, color)
    
    
    
    return next_frame, geom_id

def N_Link_Pendulum(n: int):
    assert n > 0
    parent_frame = plant.world_frame()
    colors = np.array(vis_utils.n_colors(n))/255
    colors = np.hstack([colors,np.ones((colors.shape[0],1))])
    geom_ids = []
    for i in range(n):
        parent_frame, geom_id = AddPendulumLink(parent_frame, i+1, colors[i])
        geom_ids.append(geom_id)

    return geom_ids
        
obstacle_model_instance = plant.AddModelInstance("obstacle")
def AddRandomBox(index: int, pos_limits, size_limits, color = [0,0,0,1]):
    link_name = f"obstacle_{index}"     
    
    center = np.append(np.random.uniform(pos_limits[0],pos_limits[1], 2),0)
        
    dimensions = np.append(np.random.uniform(size_limits[0],size_limits[1],2),0.001)
    box = Box(*dimensions)
    inertia = SpatialInertia.SolidBoxWithDensity(1, *dimensions)
    
    body = plant.AddRigidBody(link_name, obstacle_model_instance, inertia)
    
    random_rp = np.zeros(3)
    lim = np.pi/10
    random_rp[2] = np.random.uniform(-lim, lim)
    angle = RotationMatrix(RollPitchYaw(random_rp))
    pose = RigidTransform(p=center)@RigidTransform(R=angle)
    origin_local = pose.inverse()@RigidTransform()
    while np.all([-dimensions[i]/2 <= origin_local.translation()[i] < dimensions[i]/2 for i in range(3)]): 
        random_rp[2] = np.random.uniform(-lim, lim)
        angle = RotationMatrix(RollPitchYaw(random_rp))
        pose = RigidTransform(p=center)@RigidTransform(R=angle)
        origin_local = pose.inverse()@RigidTransform()
    plant.AddJoint(WeldJoint(link_name, plant.world_frame(), body.body_frame(),
                            pose))
    plant.RegisterCollisionGeometry(body, RigidTransform(),
                                    box, link_name, CoulombFriction())
    plant.RegisterVisualGeometry(body, RigidTransform(),
                                box, link_name, color)
    
    

def AddNRandomBoxes(n: int, pos_limits=(-10,10),
                    size_limits = (0.1,1), color= [0,0,0,1],
                   filter_all = False):
    assert n > 0
    for i in range(n):
        AddRandomBox(i, pos_limits, size_limits, color)

# child_frame = AddPendulumLink(plant.world_frame(), 1)
num_pend = 3
num_obstacles = 4
np.random.seed(num_pend * num_obstacles)


pend_geom_ids = N_Link_Pendulum(num_pend)

obstacle_pos_limits = link_dimensions[1]*num_pend*np.array([-1,1])
obstacle_size_limits = (0.01/np.log(num_obstacles),0.5/np.log(num_obstacles))
AddNRandomBoxes(num_obstacles, obstacle_pos_limits, obstacle_size_limits)


plant.Finalize()

end_effector = plant.GetBodyByName(f"link_{num_pend}")

def SetMeshcatPerspective2d(meshcat_instance, X_WC):
    meshcat_instance.SetTransform("/Cameras/default", X_WC)
    meshcat_instance.SetProperty("/Background", "visible", False);
    meshcat_instance.SetProperty("/Grid", "visible", False);
    meshcat_instance.SetProperty("/Axes", "visible", False)

SetMeshcatPerspective2d(meshcat, RigidTransform(p=np.array([0,0,1])))
# meshcat.Set2dRenderMode(RigidTransform(p=np.array([0,0,1])))

diagram = builder.Build()
diagram_context = diagram.CreateDefaultContext()
plant_context = diagram.GetMutableSubsystemContext(plant, diagram_context)
scene_graph_context = diagram.GetMutableSubsystemContext(scene_graph, diagram_context)
query_port = scene_graph.get_query_output_port()
diagram.ForcedPublish(diagram_context)

q_star = np.zeros(plant.num_positions())
Ratfk = RationalForwardKinematics(plant)

diagram_vis_context = diagram.CreateDefaultContext()
plant_vis_context =  diagram.GetMutableSubsystemContext(plant, diagram_vis_context)
vis_bundle = vis_utils.VisualizationBundle(
    diagram, diagram_vis_context, plant, plant_vis_context,
    Ratfk, meshcat, q_star
)

In [None]:
#compute limits in s-space
limits_s = []
lower_q = plant.GetPositionLowerLimits()
upper_q = plant.GetPositionUpperLimits()

for q in [lower_q,upper_q]:
    limits_s.append(Ratfk.ComputeSValue(np.array(q), q_star))
limits_s = np.array(limits_s)
print(limits_s)

In [None]:
from pydrake.all import InverseKinematics
ik = InverseKinematics(plant, diagram.GetMutableSubsystemContext(plant, diagram_context))
min_dist = 1e-5
collision_constraint = ik.AddMinimumDistanceConstraint(
            min_dist, 1e-5)
diagram_col_context = diagram.CreateDefaultContext()
plant_col_context = diagram.GetMutableSubsystemContext(plant, diagram_col_context)
scene_graph_col_context = diagram.GetMutableSubsystemContext(scene_graph, diagram_col_context)
def check_collision_q_by_ik(q, min_dist=1e-5):
    if np.all(q >= plant.GetPositionLowerLimits()) and \
            np.all(q <= plant.GetPositionUpperLimits()):
        return 1 - 1 * \
            float(collision_constraint.evaluator().CheckSatisfied(q, min_dist))
    else:
        return 1
def check_collision_s_by_ik(s, min_dist=1e-5):
    s = np.array(s)
    q = Ratfk.ComputeQValue(s, q_star)
    return check_collision_q_by_ik(q, min_dist)

def check_collision_q_by_query(q):
    if np.all(q >= plant.GetPositionLowerLimits()) and \
            np.all(q <= plant.GetPositionUpperLimits()):
        plant.SetPositions(plant_col_context, q)
        query_object = query_port.Eval(scene_graph_col_context)
        return 1 if query_object.HasCollisions() else 0
    else:
        return 1
    
def check_collision_s_by_query(s):
    s = np.array(s)
    q = Ratfk.ComputeQValue(s, q_star)
    return check_collision_q_by_query(q)

## Set up the sliders so we can move the plant around manually

In [None]:
sliders = []
diagram_context = diagram.CreateDefaultContext()
plant_context = diagram.GetMutableSubsystemContext(plant, diagram_context)
for i in range(plant.num_positions()):
    sliders.append(widgets.FloatSlider(min=plant.GetPositionLowerLimits()[i],
                                       max=plant.GetPositionUpperLimits()[i], 
                                       value=plant.GetPositions(plant_context)[i],
                                       description=f'q{i}'))
# sliders.append(widgets.FloatSlider(min=q_low[2], max=q_high[2], value=0, description='q2'))

q = np.zeros(plant.num_positions())
def handle_slider_change(change, idx):
    q[idx] = change['new']
    plant.SetPositions(plant_context, q)
    diagram.ForcedPublish(diagram_context)

    
idx = 0
for slider in sliders:
    slider.observe(partial(handle_slider_change, idx = idx), names='value')
    idx+=1

for slider in sliders:
    display(slider)


# visualizer.jupyter_cell()

In [None]:
col_checker = StraightLineCollisionChecker(check_collision_s_by_query, 100)
def sample_col_free_point():
    q = np.random.uniform(plant.GetPositionLowerLimits(), plant.GetPositionUpperLimits())
    s = Ratfk.ComputeSValue(q, q_star)
    return s
np.random.seed(0)
prm = PRMFixedEdges(sample_col_free_point,
         100,
         col_checker,
         dist_thresh = 100)
prm.draw_tree(vis_bundle, end_effector)

In [None]:
def make_line_polys(prm, max_num_edges = -1):
    
    polys = np.empty(shape=(plant.num_positions(), 
                            len(prm.prm.edges()) if max_num_edges < 0 else max_num_edges), dtype = object)
    for i, (s0, s1) in enumerate(prm.prm.edges()):
        for j in range(plant.num_positions()):
            if max_num_edges > 0 and not i < max_num_edges:
                break
            polys[j,i] = PolynomialCommon(np.array([s0[j],s1[j]-s0[j]]))
        
    return polys

path_safe = make_line_polys(prm,500)
# path_unsafe = make_line_polys(PRM_bad)
    

In [None]:
# The object we will use to perform our certification.
t0 = time.time()
cspace_free_path = CspaceFreePath(plant, scene_graph, q_star,
                                  maximum_path_degree = 1, 
                                  plane_order = 1)
t1 = time.time()
print(f"Time to construct line certifier = {t1-t0}s")


cert_options = CspaceFreePath.FindSeparationCertificateGivenPathOptions()
cert_options.terminate_segment_certification_at_failure = False

cert_options.num_threads = -1
cert_options.verbose = False
cert_options.solver_id = MosekSolver.id()
cert_options.solver_options = SolverOptions()
cert_options.terminate_path_certification_at_failure = False


In [None]:
print(len(cspace_free_path.separating_planes()))

In [None]:
from pydrake.all import GeometrySet, CollisionFilterDeclaration
collision_filter_manager = scene_graph.collision_filter_manager()
pend_geom_set = GeometrySet(pend_geom_ids)
decl = CollisionFilterDeclaration().ExcludeWithin(pend_geom_set)
collision_filter_manager.Apply(decl)
t0 = time.time()
cspace_free_path2 = CspaceFreePath(plant, scene_graph, q_star,
                                  maximum_path_degree = 1, 
                                  plane_order = 1)
t1 = time.time()
print(f"Time to construct line certifier = {t1-t0}s")
print(len(cspace_free_path2.separating_planes()))

In [None]:
ignored_collision_pairs = set()
for g1 in pend_geom_ids[:-1]:
    for g2 in pend_geom_ids[1:]: 
        ignored_collision_pairs.add((g1,g2))

In [None]:
t0 = time.time()
statistics, cert_result = \
    cspace_free_path2.FindSeparationCertificateGivenPath(path_safe, 
                                                        ignored_collision_pairs,
                                                        cert_options)
t1 = time.time()
print(f"Certification of safe PRM in {t1-t0}s")

In [None]:
solve_times = np.array([list(s.time_to_solve_prog.values()) for s in statistics])
build_times = np.array([list(s.time_to_build_prog.values()) for s in statistics])
total_times = np.array([list(s.total_time_to_certify_pair.values()) for s in statistics])


In [None]:
fig, ax = plt.subplots(1,3)
ax[0].stairs(*np.histogram(solve_times))
ax[0].set_title('program solve times')
ax[1].stairs(*np.histogram(build_times))
ax[1].set_title('program build times')
ax[2].stairs(*np.histogram(total_times))
ax[2].set_title('pair certification time')

In [None]:
plt.stairs(*np.histogram(solve_times))
plt.title(f'Distribution of solve times for pairs, {num_pend} links, {num_obstacles} obstacles')
plt.xlabel("ms")
plt.ylabel("number of pairs")

In [None]:
import os
num_cores = os.cpu_count()
print(num_cores)
total_prog_solve_times = np.array([s.total_time_to_solve_progs()/num_cores for s in statistics])
plt.stairs(*np.histogram(total_prog_solve_times))
plt.title(f'Total Certification Times PRM Edges, {num_pend} links, {num_obstacles} obstacles {500} edges')
plt.xlabel("ms")
plt.ylabel("number of edges requiring time")

In [None]:
import pickle
with open("tmp.pkl", "wb") as f:
    pickle.dump(statistics[0], f)

In [None]:
with open("tmp.pkl", "rb") as f:
    tmp = pickle.load(f)

In [None]:
total_prog_solve_times.shape

In [None]:
def plot_pair_safety(pairs, statistics, body,
                                    path_size = 0.01, num_points = 1000, prefix = "prm"):
    meshcat.Delete(f"{prefix}_safe")
    meshcat.Delete(f"{prefix}_unsafe")
    
    safe_options = vis_utils.TrajectoryVisualizationOptions(
        path_color = Rgba(0,1,0,1), path_size = path_size, num_points = num_points
    )
    unsafe_options = vis_utils.TrajectoryVisualizationOptions(
        path_color = Rgba(1,0,0,1), path_size = path_size, num_points = num_points
    )
    
    for idx, (s0, s1) in enumerate(pairs):
        s0, s1 = np.array(s0), np.array(s1)
        if statistics[idx].certified_safe():
            vis_utils.visualize_s_space_segment(vis_bundle,s0, s1, body,
                                        f"{prefix}_safe/seg_{idx}", safe_options)
        else:
            vis_utils.visualize_s_space_segment(vis_bundle,s0, s1, body, 
                                        f"{prefix}_unsafe/seg_{idx}", unsafe_options)
plot_pair_safety(prm.prm.edges(), statistics, end_effector)

In [None]:
path_safe.shape

In [None]:
unsafe_path = None
for idx, stats in enumerate(statistics):
    if not stats.certified_safe():
        unsafe_path = path_safe[:, idx]
col_found = False
for t in np.linspace(0, 1, int(1e3)):
    s = np.array([unsafe_path[i].EvaluateUnivariate(t) for i in range(unsafe_path.shape[0])])
    if check_collision_s_by_query(s):
        print("COLLISION FOUND")
        col_found = True
        plant.SetPositions(plant_context, Ratfk.ComputeQValue(s, q_star))
        diagram.ForcedPublish(diagram_context)
        break
print(col_found)

In [None]:
import numpy as np
num_links = np.arange(2, 12, 1)
num_boxes = np.array([10, 50, 100])
num_edges = np.array([50, 100, 1000])
good_checker = np.array([True, False])

NN, KK = np.meshgrid(num_links, num_boxes,  indexing = 'ij')
# LL,GG, NN,  KK   = np.meshgrid(num_edges,good_checker,num_links,  num_boxes, indexing = 'ij')
for i in range(NN.shape[0]):
    for j in range(NN.shape[1]):
                print(NN[i,j],KK[i,j])