In [6]:
from PandaStation import *
from PandaGrasping import *

# Start a single meshcat server instance to use for the remainder of this notebook.
from meshcat.servers.zmqserver import start_zmq_server_as_subprocess
proc, zmq_url, web_url = start_zmq_server_as_subprocess(server_args=[])

# Let's do all of our imports here, too.
import numpy as np
import ipywidgets
import pydot
import pydrake.all
import os
from IPython.display import display, SVG



import pydrake.all
from pydrake.geometry import Cylinder, Box
from pydrake.all import (
    RigidTransform, RotationMatrix, AngleAxis, RollPitchYaw, InverseKinematics, MultibodyPlant, Parser,
    FindResourceOrThrow, Solve, PiecewisePolynomial, TrajectorySource, SceneGraph, DiagramBuilder,
    AddMultibodyPlantSceneGraph, LinearBushingRollPitchYaw, MathematicalProgram, AutoDiffXd, GenerateHtml, Role,
    LeafSystem, AbstractValue, PublishEvent, TriggerType, BasicVector, PiecewiseQuaternionSlerp,
    LoadModelDirectives, ProcessModelDirectives, ConnectMeshcatVisualizer, Diagram
    )
import pydrake.perception as mut
import open3d as o3d
from ompl import base as ob
from ompl import geometric as og
import time
from enum import Enum

def add_ycb(env, name, pose):
    ycb = {"cracker": "drake/manipulation/models/ycb/sdf/003_cracker_box.sdf", 
        "sugar": "drake/manipulation/models/ycb/sdf/004_sugar_box.sdf", 
        "soup": "drake/manipulation/models/ycb/sdf/005_tomato_soup_can.sdf", 
        "mustard": "drake/manipulation/models/ycb/sdf/006_mustard_bottle.sdf", 
        "gelatin": "drake/manipulation/models/ycb/sdf/009_gelatin_box.sdf", 
        "meat": "drake/manipulation/models/ycb/sdf/010_potted_meat_can.sdf",
        "brick": "drake/examples/manipulation_station/models/061_foam_brick.sdf"}
    env.AddModelFromFile(FindResourceOrThrow(ycb[name]), pose)

In [63]:
def draw_grasp_candidate(X_WG, prefix='gripper'):
    
    builder = DiagramBuilder()
    plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.001)
    gripper = AddPandaHand(plant, welded = True)
    plant.WeldFrames(plant.world_frame(), plant.GetFrameByName("panda_hand"), X_WG)
    plant.Finalize()
    
    meshcat = ConnectMeshcatVisualizer(builder, scene_graph, zmq_url=zmq_url, prefix=prefix, delete_prefix_on_load=False)#, frames_to_draw=frames_to_draw)
    diagram = builder.Build()
    context = diagram.CreateDefaultContext()

    meshcat.load()
    diagram.Publish(context)
    

def draw_grasp_and_arm_candidate(X_WG, prefix = 'arm'):
    builder = DiagramBuilder()
    plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.001)
    
    
    panda = AddPanda(plant)
    gripper = AddPandaHand(plant, panda_model_instance = panda, welded = True)
    
    
    plant.Finalize()
    
    meshcat = ConnectMeshcatVisualizer(builder, scene_graph, zmq_url=zmq_url, prefix=prefix, delete_prefix_on_load=False)#, frames_to_draw=frames_to_draw)
    diagram = builder.Build()
    context = diagram.CreateDefaultContext()
    plant_context = plant.GetMyContextFromRoot(context)
    
    q = X_WG_to_q(plant, plant_context, X_WG)
    
    plant.SetPositions(plant_context, panda, q)

    meshcat.load()
    diagram.Publish(context)
    
def X_WG_to_q(plant, plant_context, X_WG, warm_start = np.array([ 0., 0.55, 0., -1.45, 0., 1.58, 0.])):
    ik = InverseKinematics(plant, plant_context)
    p_tol = np.ones(3)*0.01
    theta_tol = 0.01
    q_nominal = np.array([ 0., 0.55, 0., -1.45, 0., 1.58, 0.])
    ik.AddPositionConstraint(
            plant.GetFrameByName("panda_hand"),
            np.zeros(3),
            plant.world_frame(),
            X_WG.translation() - p_tol,
            X_WG.translation() + p_tol)
    ik.AddOrientationConstraint(
            plant.GetFrameByName("panda_hand"),
            RotationMatrix(),
            plant.world_frame(),
            X_WG.rotation(),
            theta_tol)
    ik.AddMinimumDistanceConstraint(0, 0.1)
    q = ik.q()
    prog = ik.prog()
    prog.AddQuadraticErrorCost(np.identity(len(q)), q_nominal, q)
    prog.SetInitialGuess(q, warm_start)

    result = Solve(prog)
    #assert result.is_success()
    print("Success?", result.is_success())
    if not result.is_success():
        names = result.GetInfeasibleConstraintNames(prog)
        print("Infeasible Constraints:")
        for n in names:
            print(n)
    return result.GetSolution(q)

def process_table_point_cloud(env, env_context):
    
    plant = env.get_multibody_plant()
    plant_context = env.GetSubsystemContext(plant, env_context)

    margin = 0.001
    # defines space above table
    crop_min = np.array([0.25, -0.5, 0.05+margin])
    crop_max = np.array([1.25, 0.5, 0.5])

    merged_pcd = o3d.geometry.PointCloud()
    
    cameras = ['camera_left', 'camera_middle', 'camera_right']

    for name in cameras:
        pcd = env.GetOutputPort(f"{name}_point_cloud").Eval(env_context)
        pcd = create_open3d_point_cloud(pcd)

        pcd = pcd.crop(o3d.geometry.AxisAlignedBoundingBox(min_bound = crop_min, max_bound = crop_max))
        pcd.estimate_normals(search_param = o3d.geometry.KDTreeSearchParamHybrid(radius = 0.1, max_nn = 30))
        camera = plant.GetModelInstanceByName(name)
        body = plant.GetBodyByName("base", camera)
        X_C = plant.EvalBodyPoseInWorld(plant_context, body)
        pcd.orient_normals_towards_camera_location(X_C.translation())

        merged_pcd +=pcd

    return merged_pcd.voxel_down_sample(voxel_size = 0.005)

    
class Env(Diagram):
    
    def __init__(self, time_step = 0.001):
        Diagram.__init__(self)
        self.time_step = time_step
        self.builder = DiagramBuilder()
        self.plant, self.scene_graph = AddMultibodyPlantSceneGraph(self.builder, time_step = self.time_step)
        self.set_name("env")
        self.plant.set_name("plant")
        self.hand = None   
        self.object_ids = []
        self.object_poses = []
        
    def Finalize(self):
        #assert self.hand.is_valid(), "You forgot to setup the environment"
        self.plant.Finalize()
        
        for i in range(len(self.object_ids)):
            body_index = self.object_ids[i]
            body = self.plant.get_body(body_index)
            self.plant.SetDefaultFreeBodyPose(body, self.object_poses[i])
        
        AddRgbdSensors(self.builder, self.plant, self.scene_graph)
        self.builder.ExportOutput(self.scene_graph.get_query_output_port(), "query_object")
        self.builder.BuildInto(self)
        
        
    def SetupTableEnv(self):
        parser = Parser(self.plant)
        AddPackagePaths(parser)
        directive = FindResource("models/table_top.yaml")
        ProcessModelDirectives(LoadModelDirectives(directive), self.plant, parser)
        
        
    def AddModelFromFile(self, path, X_WO, name = None):
        parser = Parser(self.plant)
        if name is None:
            num = str(len(self.object_ids))
            name = "added_model_" + num
        model = parser.AddModelFromFile(path, name)
        indices = self.plant.GetBodyIndices(model)
        assert len(indices) == 1, "Currently, we only support adding models with one body"
        self.object_ids.append(indices[0])
        self.object_poses.append(X_WO)
        
    def AddHand(self):
        self.hand = AddPandaHand(self.plant, welded = True)
        
    def get_multibody_plant(self):
        return self.plant
    
    def get_scene_graph(self):
        return self.scene_graph

def grasp_candidate_cost(plant_context, cloud, plant, scene_graph, scene_graph_context, adjust_X_G=False):

    body = plant.GetBodyByName("panda_hand")
    X_G = plant.GetFreeBodyPose(plant_context, body)

    X_GW = X_G.inverse()
    pts = np.asarray(cloud.points).T
    p_GC = X_GW.multiply(pts)

    # Crop to a region inside of the finger box.
    crop_min = [-0.009, -0.04, 0.06]
    crop_max = [0.009, 0.04, 0.115]
    indices = np.all((crop_min[0] <= p_GC[0,:], p_GC[0,:] <= crop_max[0],
                      crop_min[1] <= p_GC[1,:], p_GC[1,:] <= crop_max[1],
                      crop_min[2] <= p_GC[2,:], p_GC[2,:] <= crop_max[2]), 
                     axis=0)

    if adjust_X_G and np.sum(indices)>0:
        p_GC_y = p_GC[1, indices]
        p_Gcenter_y = (p_GC_y.min() + p_GC_y.max())/2.0
        X_G.set_translation(X_G.translation() + X_G.rotation().multiply([0, p_Gcenter_y, 0]))
        plant.SetFreeBodyPose(plant_context, body, X_G)
        X_GW = X_G.inverse()

    query_object = scene_graph.get_query_output_port().Eval(scene_graph_context)
    
    if query_object.HasCollisions():
        cost = np.inf
        return cost

    # Check collisions between the gripper and the point cloud
    margin = 0.0  # must be smaller than the margin used in the point cloud preprocessing.
    for pt in cloud.points:
        distances = query_object.ComputeSignedDistanceToPoint(pt, threshold=margin)
        if distances:
            return np.inf

    n_GC = X_GW.rotation().multiply(np.asarray(cloud.normals)[indices,:].T)

    # Penalize deviation of the gripper from vertical.
    # weight * -dot([0, 0, -1], R_G * [0, 1, 0]) = weight * R_G[2,1]
    cost = 20.0*X_G.rotation().matrix()[2, 1]

    # Reward sum |dot product of normals with gripper x|^2
    cost -= np.sum(n_GC[0,:]**2)

    return cost
    
def generate_grasp_candidate_antipodal(env, env_context, cloud, rng): 
    """
    Picks a random point in the cloud, and aligns the robot finger with the normal of that pixel. 
    The rotation around the normal axis is drawn from a uniform distribution over [min_roll, max_roll].
    """
    
    plant = env.get_multibody_plant()
    plant_context = env.GetSubsystemContext(plant, env_context)
    scene_graph = env.get_scene_graph()
    scene_graph_context = env.GetSubsystemContext(scene_graph, env_context)

    body = plant.GetBodyByName("panda_hand")

    index = rng.integers(0,len(cloud.points)-1)

    # Use S for sample point/frame.
    p_WS = np.asarray(cloud.points[index])
    n_WS = np.asarray(cloud.normals[index])

    assert np.isclose(np.linalg.norm(n_WS), 1.0)

    Gy = n_WS # gripper y axis aligns with normal
    # make orthonormal z axis, aligned with world down
    z = np.array([0.0, 0.0, -1.0])
    if np.abs(np.dot(z,Gy)) < 1e-6:
        return np.inf, None

    Gz = z - np.dot(z,Gy)*Gy
    Gx = np.cross(Gy, Gz)
    R_WG = RotationMatrix(np.vstack((Gx, Gy, Gz)).T)
    p_GS_G = [0, 0.04, 0.1]

    # Try orientations from the center out
    min_pitch=-np.pi/3.0
    max_pitch=np.pi/3.0
    alpha = np.array([0.5, 0.65, 0.35, 0.8, 0.2, 1.0, 0.0])
    thetas = (min_pitch + (max_pitch - min_pitch)*alpha)

    for theta in thetas: 
        # Rotate the object in the hand by a rotation (around the normal).
        R_WG2 = R_WG.multiply(RotationMatrix.MakeYRotation(theta))
        # Use G for gripper frame.
        p_SG_W = - R_WG2.multiply(p_GS_G)
        p_WG = p_WS + p_SG_W 

        X_G = RigidTransform(R_WG2, p_WG)
        
        plant.SetFreeBodyPose(plant_context, body, X_G)

        cost= grasp_candidate_cost(plant_context, cloud, plant, scene_graph, scene_graph_context, adjust_X_G=True)
        
        X_G = plant.GetFreeBodyPose(plant_context, body)
        
        if np.isfinite(cost):
            return cost, X_G

    return np.inf, None
    
def make_internal_env():
    builder = DiagramBuilder()
    env = builder.AddSystem(Env())
    env.SetupTableEnv()
    env.AddHand()
    env.Finalize()
    return env
    

In [65]:
"""
Here we:
 - create a plant with the table, gripper, cameras, and manipuland
 - create an "internal plant" without the manipuland
 - get the pointcloud from that environemnt 
 - sample and get the costs of the grasp poses
 - display the top 5 poses
"""
if v is not None:
    v.vis.delete() # clear the current display

rng = np.random.default_rng()

builder = DiagramBuilder()
env = builder.AddSystem(Env())
env.SetupTableEnv()
add_ycb(env, "soup", RigidTransform(RotationMatrix.MakeXRotation(-np.pi/2), [0.6, 0, 0.2]))
env.Finalize()
env_context = env.CreateDefaultContext()
scene_graph = env.get_scene_graph()
v = ConnectMeshcatVisualizer(builder,
                          scene_graph,
                          output_port=env.GetOutputPort("query_object"),
                          delete_prefix_on_load=True,                                      
                          zmq_url=zmq_url, role = Role.kProximity)
v.load()
diagram = builder.Build()
diagram_context = diagram.CreateDefaultContext()
diagram.Publish(diagram_context)
env_context = env.GetMyContextFromRoot(diagram_context)

internal_env = make_internal_env()
internal_env_context = internal_env.CreateDefaultContext()

cloud = process_table_point_cloud(env, env_context)
draw_open3d_point_cloud(v.vis["cloud"], cloud, size=0.003, normals_scale = 0.01)
num_samples = 100

costs = []
X_Gs = []

for i in range(num_samples):
    cost, X_G = generate_grasp_candidate_antipodal(internal_env, internal_env_context, cloud, rng)
    #print(cost)
    if np.isfinite(cost):
        costs.append(cost)
        X_Gs.append(X_G)

indices = np.asarray(costs).argsort()[:3]
for i, index in enumerate(indices):
    #draw_grasp_candidate(X_Gs[index], prefix=f"{i}th best hand")#, draw_frames=False)
    draw_grasp_and_arm_candidate(X_Gs[index], prefix=f"{i}th best")

Connecting to meshcat-server at zmq_url=tcp://127.0.0.1:6048...
You can open the visualizer by visiting the following URL:
http://127.0.0.1:7048/static/
Connected to meshcat-server.
Connecting to meshcat-server at zmq_url=tcp://127.0.0.1:6048...
You can open the visualizer by visiting the following URL:
http://127.0.0.1:7048/static/
Connected to meshcat-server.
Success? False
Infeasible Constraints:
drake::multibody::MinimumDistanceConstraint[0]: -inf <= 1.266931 <= 1.000000
Connecting to meshcat-server at zmq_url=tcp://127.0.0.1:6048...
You can open the visualizer by visiting the following URL:
http://127.0.0.1:7048/static/
Connected to meshcat-server.
Success? False
Infeasible Constraints:
drake::multibody::MinimumDistanceConstraint[0]: -inf <= 1.266932 <= 1.000000
Connecting to meshcat-server at zmq_url=tcp://127.0.0.1:6048...
You can open the visualizer by visiting the following URL:
http://127.0.0.1:7048/static/
Connected to meshcat-server.
Success? False
Infeasible Constraints:
d