In [14]:
import time
from random import random

import numpy as np
from pydrake.all import (
    DiagramBuilder,
    MeshcatVisualizer,
    MultibodyPlant,
    Parser,
    RigidTransform,
    RollPitchYaw,
    RotationMatrix,
    Simulator,
    SolutionResult,
    Solve,
    StartMeshcat,
)
from pydrake.multibody import inverse_kinematics

from manipulation import running_as_notebook
from manipulation.exercises.trajectories.rrt_planner.robot import (
    ConfigurationSpace,
    Range,
)
from manipulation.exercises.trajectories.rrt_planner.rrt_planning import (
    Problem,
)
from manipulation.meshcat_utils import AddMeshcatTriad
from manipulation.scenarios import MakeManipulationStation
import numpy as np
import random
import math
from pydrake.common import temp_directory
from pydrake.geometry import StartMeshcat
from pydrake.systems.analysis import Simulator
from pydrake.visualization import ModelVisualizer
from pydrake.all import Simulator, StartMeshcat, LogVectorOutput

from manipulation import running_as_notebook
from manipulation.station import MakeHardwareStation, load_scenario
from IPython.display import HTML, display
from matplotlib import pyplot as plt
from pydrake.all import (
    AddMultibodyPlantSceneGraph,
    Box,
    ConnectPlanarSceneGraphVisualizer,
    DiagramBuilder,
    FixedOffsetFrame,
    JointIndex,
    Parser,
    PlanarJoint,
    RandomGenerator,
    RigidTransform,
    RotationMatrix,
    Simulator,
    StartMeshcat,
    UniformlyRandomRotationMatrix,
)

from manipulation import ConfigureParser, running_as_notebook
from manipulation.scenarios import AddShape, ycb
from manipulation.station import MakeHardwareStation, load_scenario
from pydrake.common import temp_directory

# sponana/src/sponana/utils.py
import sponana.utils
from sponana.controller import SpotController
from sponana.debug_logger import DebugLogger
from sponana.perception import (
    add_camera_pose_extractor,
    add_body_pose_extractor,
    BananaSpotter,
)
import sponana.sim

In [15]:
# Start the visualizer.
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at http://localhost:7008


In [16]:
# Clean up the Meshcat instance.
meshcat.Delete()
meshcat.DeleteAddedControls()
rng = np.random.default_rng(145)  # this is for python
generator = RandomGenerator(rng.integers(0, 1000))  # this is for c++

add_spot = True
# simulation_time = -1  # run indefinitely until ESC is pressed
simulation_time = -1
table_height = 0.2
debug = True
add_fixed_cameras = False
enable_arm_ik = True  # turn this off if you find the arm too annoying
use_teleop = False

simulator, diagram = sponana.sim.clutter_gen(
    meshcat,
    rng,
    table_height=table_height,
    debug=debug,
    simulation_time=simulation_time,
    add_spot=add_spot,
    add_fixed_cameras=add_fixed_cameras,
    enable_arm_ik=enable_arm_ik,
    use_teleop=use_teleop,
)

x_sample [-0.1   0.05]
y_sample [-2.00000000e-01  2.77555756e-17]
appended: x_points_append: [0.04999999999999999] y_points_append: [2.7755575615628914e-17]
dist 0.15
appended: x_points_append: [0.04999999999999999, -0.1] y_points_append: [2.7755575615628914e-17, 2.7755575615628914e-17]
dist 0.0
dist 0.2
dist 0.25
appended: x_points_append: [0.04999999999999999, -0.1, 0.04999999999999999] y_points_append: [2.7755575615628914e-17, 2.7755575615628914e-17, -0.19999999999999998]
x_sample [-0.1   0.05]
y_sample [-2.00000000e-01  2.77555756e-17]
appended: x_points_append: [-0.1] y_points_append: [-0.19999999999999998]
x_sample [-0.1   0.05]
y_sample [-2.00000000e-01  2.77555756e-17]
appended: x_points_append: [0.04999999999999999] y_points_append: [2.7755575615628914e-17]
dist 0.2
appended: x_points_append: [0.04999999999999999, 0.04999999999999999] y_points_append: [2.7755575615628914e-17, -0.19999999999999998]
dist 0.15
dist 0.25
appended: x_points_append: [0.04999999999999999, 0.049999999

LCM detected that large packets are being received, but the kernel UDP
receive buffer is very small.  The possibility of dropping packets due to
insufficient buffer space is very high.

For more information, visit:
   http://lcm-proj.github.io/lcm/multicast_setup.html



In [17]:
context = simulator.get_mutable_context()
station = diagram.GetSubsystemByName("station")
#context = station.GetMyContextFromRoot(context)
scene_graph = station.GetSubsystemByName("scene_graph")
plant = station.GetSubsystemByName("plant")

In [18]:
def check_collision_pair_name(pair_name0, pair_name1):
    pair0_is_spot = pair_name0.startswith("spot")
    pair1_is_spot = pair_name1.startswith("spot")
    if pair0_is_spot == False and pair1_is_spot == False:
        return False
    if pair0_is_spot == True and pair1_is_spot == True: 
        return False
    if pair0_is_spot == True and (pair_name1.startswith("table_top") or 
        pair_name1.startswith("back_wall")):
        return True
    if pair1_is_spot == True and (pair_name0.startswith("table_top") or 
        pair_name0.startswith("back_wall")):
        return True
    return False
        


In [19]:

def in_collision(plant, scene_graph, context, print_collisions=True, thresh=1e-12):
    plant_context = plant.GetMyContextFromRoot(context)
    sg_context = scene_graph.GetMyContextFromRoot(context)
    query_object = plant.get_geometry_query_input_port().Eval(plant_context)
    inspector = scene_graph.get_query_output_port().Eval(sg_context).inspector()
    pairs = inspector.GetCollisionCandidates()
    dists = []

    for pair in pairs:
        pair_name0 = inspector.GetName(inspector.GetFrameId(pair[0]))
        pair_name1 = inspector.GetName(inspector.GetFrameId(pair[1]))
        
        if check_collision_pair_name(pair_name0, pair_name1) == False:
            continue
        #print("after check",inspector.GetName(inspector.GetFrameId(pair[0])),
        #          inspector.GetName(inspector.GetFrameId(pair[1])))
        dists.append(query_object.ComputeSignedDistancePairClosestPoints(pair[0], pair[1]).distance)
        if dists[-1] < thresh and print_collisions:
            print(inspector.GetName(inspector.GetFrameId(pair[0])),
                  inspector.GetName(inspector.GetFrameId(pair[1])))
    return np.min(dists) < thresh




In [20]:
collision_check = in_collision(plant, scene_graph,context, print_collisions=True)
print("Check Collision:", collision_check)

Check Collision: False


In [29]:
def ExistsCollision(q_spot):
    context = simulator.get_mutable_context()
    station = diagram.GetSubsystemByName("station")
    context = station.GetMyContextFromRoot(context)
    scene_graph = station.GetSubsystemByName("scene_graph")
    plant = station.GetSubsystemByName("plant")

    spot_20_desired = current_location
    station.GetInputPort("spot.desired_state").FixValue(context, q_spot)
    new_spot_location = station.GetOutputPort("spot.state_estimated").Eval(context)
    print("new_spot_location",new_spot_location)
    collision_check = in_collision(plant, scene_graph,context, print_collisions=True)
    print("Check Collision:", collision_check)
    return collision_check
    

In [30]:
class SpotProblem(Problem):
    def __init__(
        self,
        q_start: np.array,
        q_goal: np.array):
        nq = 3
        joint_limits = np.zeros((nq, 2))
        #x
        joint_limits[0, 0] = -100
        joint_limits[0, 1] = 100
        #y
        joint_limits[1, 0] = -100
        joint_limits[1, 1] = 100
        #z theta
        joint_limits[2, 0] = 0.
        joint_limits[2, 1] = 2 * np.pi

        range_list = []
        for joint_limit in joint_limits:
            range_list.append(Range(joint_limit[0], joint_limit[1]))

        def l2_distance(q: tuple):
            sum = 0
            for q_i in q:
                sum += q_i**2
            return np.sqrt(sum)
        max_steps = nq * 1/4 #random max_steps for now
        #max_steps = nq * [np.pi / 180 * 2]  # three degrees
        cspace_spot = ConfigurationSpace(range_list, l2_distance, [max_steps for _ in range_list])

        # Call base class constructor.
        Problem.__init__(
            self,
            x=10,  # not used.
            y=10,  # not used.
            robot=None,  # not used.
            obstacles=None,  # not used.
            start=tuple(q_start),
            goal=tuple(q_goal),
            cspace=cspace_spot,
        )

    def collide(self, configuration):
        collision = ExistsCollision(np.array(configuration))
        return collision


In [25]:
class TreeNode:
    def __init__(self, value, parent=None):
        self.value = value  # tuple of floats representing a configuration
        self.parent = parent  # another TreeNode
        self.children = []  # list of TreeNodes

In [26]:
class RRT:
    """
    RRT Tree.
    """

    def __init__(self, root: TreeNode, cspace: ConfigurationSpace):
        self.root = root  # root TreeNode
        self.cspace = cspace  # robot.ConfigurationSpace
        self.size = 1  # int length of path
        self.max_recursion = 1000  # int length of longest possible path

    def add_configuration(self, parent_node, child_value):
        child_node = TreeNode(child_value, parent_node)
        parent_node.children.append(child_node)
        self.size += 1
        return child_node

    # Brute force nearest, handles general distance functions
    def nearest(self, configuration):
        """
        Finds the nearest node by distance to configuration in the
             configuration space.

        Args:
            configuration: tuple of floats representing a configuration of a
                robot

        Returns:
            closest: TreeNode. the closest node in the configuration space
                to configuration
            distance: float. distance from configuration to closest
        """
        assert self.cspace.valid_configuration(configuration)

        def recur(node, depth=0):
            closest, distance = node, self.cspace.distance(
                node.value, configuration
            )
            if depth < self.max_recursion:
                for child in node.children:
                    (child_closest, child_distance) = recur(child, depth + 1)
                    if child_distance < distance:
                        closest = child_closest
                        child_distance = child_distance
            return closest, distance

        return recur(self.root)[0]

In [27]:
meshcat.Delete()
meshcat.DeleteAddedControls()

base_pose = np.array([1.00000000e+00, 1.50392176e-12, 3.15001955e+00])
q_start = base_pose
q_goal = np.array([0.20894849, -0.47792893, 0.2475])

In [28]:
spot_problem = SpotProblem(
    q_start=q_start,
    q_goal=q_goal)

RuntimeError: System::FixInputPortTypeCheck(): expected value of type drake::systems::BasicVector<double> with size=20 for input port 'spot.desired_state' (index 0) but the actual type was drake::systems::BasicVector<double> with size=3. (System ::_::station)

In [None]:
class RRT_tools:
    def __init__(self, problem):
        # rrt is a tree
        self.rrt_tree = RRT(TreeNode(problem.start), problem.cspace)
        problem.rrts = [self.rrt_tree]
        self.problem = problem

    def find_nearest_node_in_RRT_graph(self, q_sample):
        nearest_node = self.rrt_tree.nearest(q_sample)
        return nearest_node

    def sample_node_in_configuration_space(self):
        q_sample = self.problem.cspace.sample()
        return q_sample

    def calc_intermediate_qs_wo_collision(self, q_start, q_end):
        """create more samples by linear interpolation from q_start
        to q_end. Return all samples that are not in collision

        Example interpolated path:
        q_start, qa, qb, (Obstacle), qc , q_end
        returns >>> q_start, qa, qb
        """
        print(f"q start: {q_start}")
        print(f"q end: {q_end}")
        return self.problem.safe_path(q_start, q_end)

    def grow_rrt_tree(self, parent_node, q_sample):
        """
        add q_sample to the rrt tree as a child of the parent node
        returns the rrt tree node generated from q_sample
        """
        child_node = self.rrt_tree.add_configuration(parent_node, q_sample)
        return child_node

    def node_reaches_goal(self, node):
        return node.value == self.problem.goal

    def backup_path_from_node(self, node):
        path = [node.value]
        while node.parent is not None:
            node = node.parent
            path.append(node.value)
        path.reverse()
        return path

In [None]:
def rrt_planning(problem, max_iterations=1000, prob_sample_q_goal=0.05):
    """
    Input:
        problem (IiwaProblem): instance of a utility class
        max_iterations: the maximum number of samples to be collected
        prob_sample_q_goal: the probability of sampling q_goal

    Output:
        path (list): [q_start, ...., q_goal].
                    Note q's are configurations, not RRT nodes
    """
    """ Input: q_start, q_goal, max_interation, prob_sample_goal
      Output: path

      G.init(q_start)
      for k = 1 to max_interation:
        q_sample ← Generate Random Configuration
        random number ← random()
        if random_number < prob_sample_goal:
            q_sample ← q_goal
        n_near ← Find the nearest node in the tree(q_sample)
        (q_1, q_2, ... q_N) ← Find intermediate q's from n_near to q_sample
        
        // iteratively add the new nodes to the tree to form a new edge
        last_node ← n_near
        for n = 1 to N:
            last_node ← Grow RRT tree (parent_node, q_{n}) 
        
        if last node reaches the goal:
            path ← backup the path recursively
            return path
        
      return None"""
    rrt_tools = RRT_tools(spot_problem)
    q_goal = problem.goal
    q_start = problem.start
    i = 0 
    while i < max_iterations:
        #generate random
        q_sample = rrt_tools.sample_node_in_configuration_space()
        rand_numb = random.random()
        if rand_numb < prob_sample_q_goal:
            q_sample = q_goal
        n_near = rrt_tools.find_nearest_node_in_RRT_graph(q_sample)
        intermediate_qs = rrt_tools.calc_intermediate_qs_wo_collision(n_near.value, q_sample)

        last_node = n_near
        for n in range(len(intermediate_qs)):
            last_node = rrt_tools.grow_rrt_tree(n_near, intermediate_qs[n])

        if rrt_tools.node_reaches_goal(last_node):
            path = rrt_tools.backup_path_from_node(last_node)
            return path
        i +=1
    return None

In [None]:
path = rrt_planning(spot_problem, 600, 0.05)

q start: (1.0, 1.50392176e-12, 3.15001955)
q end: (-32.981937990959125, 63.237257166440145, 5.274229169950263)
q start: (-32.981937990959125, 63.237257166440145, 5.274229169950263)
q end: (-92.86901350599241, 56.74398142136084, 4.566846861326708)
q start: (-29.34101606335636, 56.46183675575029, 5.046635282098449)
q end: (-37.87070531194294, 15.811851887580744, 1.8926536638859244)
q start: (1.0, 1.50392176e-12, 3.15001955)
q end: (-5.2861348898233445, -51.405694490119515, 4.300137977322869)
q start: (-32.981937990959125, 63.237257166440145, 5.274229169950263)
q end: (0.9425867974927229, 42.232877529721435, 2.512732097794073)
q start: (-1.403522163755985, -19.65511848151536, 3.589770713388156)
q end: (38.88464659710576, -14.904531261899407, 5.985814338627568)
q start: (-92.86901350599241, 56.74398142136084, 4.566846861326708)
q end: (-70.05230902141228, 33.27332780261651, 0.24165210506404536)
q start: (-28.458668019165543, 60.43667321487765, 4.906029560329437)
q end: (-33.802006726946956

In [None]:
# spot_problem.visualize_path(path)

In [None]:
path

[(1.0, 1.50392176e-12, 3.15001955),
 (0.907556839855539, -0.7559660954414523, 3.1669330562841598),
 (0.20894849, -0.47792893, 0.2475)]