## **Movement and Static Setup**

In [1]:
import os

import numpy as np
from pydrake.all import (
    AddDefaultVisualization,
    AddMultibodyPlantSceneGraph,
    DiagramBuilder,
    LoadModelDirectives,
    LoadModelDirectivesFromString,
    Parser,
    ProcessModelDirectives,
    RigidTransform,
    RollPitchYaw,
    Simulator,
    StartMeshcat,
    MeshcatVisualizer
)
from pydrake.common import temp_directory
from pydrake.geometry import StartMeshcat
from pydrake.systems.analysis import Simulator
from pydrake.visualization import ModelVisualizer


from manipulation.utils import RenderDiagram
from manipulation.meshcat_utils import AddMeshcatTriad
from manipulation import running_as_notebook
from manipulation.station import LoadScenario, MakeHardwareStation
from manipulation.utils import ConfigureParser

from pydrake.all import (
    DiagramBuilder, Simulator, RigidTransform, RotationMatrix,
    InverseKinematics, Solve, MultibodyPlant, Parser, DirectCollocation, MathematicalProgram,
)

from manipulation.meshcat_utils import AddMeshcatTriad
from pydrake.all import RotationMatrix, FixedOffsetFrame, MultibodyPlant, InverseKinematics, Solve, RotationMatrix
from pydrake.all import PiecewisePolynomial, TrajectorySource, ConstantVectorSource, Box, Rgba
from pydrake.symbolic import Formula
from pydrake.all import PiecewisePolynomial, TrajectorySource, ConstantVectorSource, Box, Rgba
import pydot
from scipy.spatial import KDTree
import random
import matplotlib.pyplot as plt

import os

import numpy as np
import tempfile
from copy import deepcopy
from urllib.request import urlretrieve
from pydrake.all import (
    AddDefaultVisualization,
    AddMultibodyPlantSceneGraph,
    DiagramBuilder,
    LoadModelDirectives,
    LoadModelDirectivesFromString,
    Parser,
    ProcessModelDirectives,
    RigidTransform,
    RollPitchYaw,
    Simulator,
    StartMeshcat,
    ProcessModelDirectives, 
    LoadModelDirectives,
    RgbdSensor, 
    RigidTransform, 
    RollPitchYaw,
    ColorRenderCamera,
    DepthRenderCamera,
    MeshcatVisualizer,
    MeshcatVisualizerParams
)
from pydrake.common import temp_directory
from pydrake.geometry import StartMeshcat
from pydrake.systems.analysis import Simulator
from pydrake.visualization import ModelVisualizer
from manipulation import running_as_notebook
from manipulation.station import AddPointClouds, LoadScenario, MakeHardwareStation, Scenario
from manipulation.utils import ConfigureParser
from manipulation.scenarios import AddMultibodyTriad
from manipulation.meshcat_utils import AddMeshcatTriad
from pydrake.all import RotationMatrix, FixedOffsetFrame, MultibodyPlant, InverseKinematics, Solve, RotationMatrix
from pydrake.all import PiecewisePolynomial, TrajectorySource, ConstantVectorSource, Box, Rgba
from pydrake.symbolic import Formula
from pydrake.all import PiecewisePolynomial, TrajectorySource, ConstantVectorSource, Box, Rgba
import pydot
from scipy.spatial import KDTree
from scipy.stats import mode
import random
import matplotlib.pyplot as plt
from torchvision.models.detection import MaskRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
import os
from copy import deepcopy
from urllib.request import urlretrieve

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.utils.data
import torchvision
import torchvision.transforms.functional as Tf
from pydrake.all import (
    BaseField,
    Concatenate,
    Fields,
    MeshcatVisualizer,
    MeshcatVisualizerParams,
    PointCloud,
    StartMeshcat,
)
from pydrake.multibody.parsing import Parser
from pydrake.multibody.plant import AddMultibodyPlantSceneGraph
from pydrake.systems.framework import DiagramBuilder
from torchvision.models.detection import MaskRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

from manipulation import running_as_notebook
from manipulation.clutter import GenerateAntipodalGraspCandidate
from manipulation.scenarios import AddRgbdSensors
from manipulation.utils import ConfigureParser, FindDataResource
from manipulation.meshcat_utils import AddMeshcatTriad

In [2]:
# Start the visualizer. The cell will output an HTTP link after the execution.
# Click the link and a MeshCat tab should appear in your browser.
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at http://localhost:7000


## Load The Masking Model

In [3]:
if running_as_notebook:
    model_file = "clutter_maskrcnn_model.pt"
    if not os.path.exists(model_file):
        urlretrieve(
            "https://groups.csail.mit.edu/locomotion/clutter_maskrcnn_model.pt",
            model_file,
        )
# ycb = [
#     "003_cracker_box.sdf",
#     "004_sugar_box.sdf",
#     "005_tomato_soup_can.sdf",
#     "006_mustard_bottle.sdf",
#     "009_gelatin_box.sdf",
#     "010_potted_meat_can.sdf",
# ]

In [4]:
if running_as_notebook:

    def get_instance_segmentation_model(num_classes):
        # load an instance segmentation model pre-trained on COCO
        model = torchvision.models.detection.maskrcnn_resnet50_fpn(
            weights=MaskRCNN_ResNet50_FPN_Weights.DEFAULT, progress=False
        )

        # get the number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

        # now get the number of input features for the mask classifier
        in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        # and replace the mask predictor with a new one
        model.roi_heads.mask_predictor = MaskRCNNPredictor(
            in_features_mask, hidden_layer, num_classes
        )

        return model

    num_classes = 7
    model = get_instance_segmentation_model(num_classes)
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    model.load_state_dict(torch.load("clutter_maskrcnn_model.pt", map_location=device))
    model.eval()

    model.to(device)

Use the get_merged_masked_pcd function below to get the voxelized point cloud configuration from prediction data via the model and the image data from the cameras

In [5]:
def get_merged_masked_pcd(
   predictions,
   rgb_ims,
   depth_ims,
   project_depth_to_pC_funcs,
   X_WCs,
   label,
   mask_threshold=150,
):
   """
   predictions: The output of the trained network (one for each camera)
   rgb_ims: RGBA images from each camera
   depth_ims: Depth images from each camera
   project_depth_to_pC_funcs: Functions that perform the pinhole camera operations to convert pixels
       into points. See the analogous function in problem 5.2 to see how to use it.
   X_WCs: Poses of the cameras in the world frame
   mask_threshold: Use this to determine which pixels in
   """

   pcd = []
   for prediction, rgb_im, depth_im, project_depth_to_pC_func, X_WC in zip(
       predictions, rgb_ims, depth_ims, project_depth_to_pC_funcs, X_WCs
   ):
       if len(prediction[0]["labels"]) == 0:
           continue
       # These arrays aren't the same size as the correct outputs, but we're
       # just initializing them to something valid for now.
       spatial_points = np.zeros((3, 1))  # 3xN: (x,y,z) x Number of masked points
       rgb_points = np.zeros((3, 1))  # 3xN: Color channels x Number of masked points

       ######################################
       # Your code here (populate spatial_points and rgb_points)

       mask_idx = np.argmax(prediction[0]["labels"] == label)
       mask = prediction[0]["masks"][mask_idx, 0]
       idxs = mask > mask_threshold

       # Mask pixels, then get points in camera frame
       u_range = np.arange(depth_im.shape[0])
       v_range = np.arange(depth_im.shape[1])
       depth_v, depth_u = np.meshgrid(v_range, u_range)
       depth_pnts = np.dstack([depth_u, depth_v, depth_im])
       masked_depth_pnts = depth_pnts[idxs]
       pC = np.expand_dims(project_depth_to_pC_func(masked_depth_pnts), 2)

       # Convert to world frame
       R = np.expand_dims(X_WC.rotation().matrix(), 0)
       p = np.expand_dims(X_WC.translation(), [0, 2])
       pC_in_world = np.matmul(R, pC) + p
       spatial_points = np.squeeze(pC_in_world).T

       color_pnts = rgb_im
       color_pnts = color_pnts[idxs]
       rgb_points = color_pnts[:, :3].T

       ######################################

       # You get an unhelpful RunTime error if your arrays are the wrong
       # shape, so we'll check beforehand that they're the correct shapes.
       assert (
           len(spatial_points.shape) == 2
       ), "Spatial points is the wrong size -- should be 3 x N"
       assert (
           spatial_points.shape[0] == 3
       ), "Spatial points is the wrong size -- should be 3 x N"
       assert (
           len(rgb_points.shape) == 2
       ), "RGB points is the wrong size -- should be 3 x N"
       assert (
           rgb_points.shape[0] == 3
       ), "RGB points is the wrong size -- should be 3 x N"
       assert rgb_points.shape[1] == spatial_points.shape[1]

       N = spatial_points.shape[1]
       pcd.append(PointCloud(N, Fields(BaseField.kXYZs | BaseField.kRGBs)))
       pcd[-1].mutable_xyzs()[:] = spatial_points
       pcd[-1].mutable_rgbs()[:] = rgb_points
       # Estimate normals
       pcd[-1].EstimateNormals(radius=0.1, num_closest=30)
       # Flip normals toward camera
       pcd[-1].FlipNormalsTowardPoint(X_WC.translation())

   # Merge point clouds.
   merged_pcd = Concatenate(pcd)

   # Voxelize down-sample.  (Note that the normals still look reasonable)
   return merged_pcd.VoxelizedDownSample(voxel_size=0.005)

Use the find_antipodal_grasp function below to get the optimal gripper position based on the isolated point cloud of the object of interest. This function assumes no obstacles.

In [6]:
def find_antipodal_grasp(environment_diagram, environment_context, cameras, label, predictions):
    rng = np.random.default_rng()

    # Another diagram for the objects the robot "knows about": gripper, cameras, bins.  Think of this as the model in the robot's head.
    builder = DiagramBuilder()
    plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.001)
    parser = Parser(plant)
    ConfigureParser(parser)
    parser.AddModelsFromUrl(
        "package://manipulation/schunk_wsg_50_welded_fingers.dmd.yaml"
    )
    plant.Finalize()

    params = MeshcatVisualizerParams()
    params.prefix = "planning"
    visualizer = MeshcatVisualizer.AddToBuilder(builder, scene_graph, meshcat, params)
    diagram = builder.Build()
    context = diagram.CreateDefaultContext()

    #    # Set the gripper fingers to the open position
    # plant_context = plant.GetMyMutableContextFromRoot(context)
    # left_finger = plant.GetJointByName('left_finger_sliding_joint')
    # right_finger = plant.GetJointByName('right_finger_sliding_joint')
    # open_position = 0.054  # Maximum opening width of the WSG 50 gripper

    # left_finger.set_translation(plant_context, open_position / 2.0)
    # right_finger.set_translation(plant_context, -open_position / 2.0)
    
    diagram.ForcedPublish(context)

    for c in cameras:
        c.compute_camera_data()
    rgb_ims = [c.rgb_im for c in cameras]
    depth_ims = [c.depth_im for c in cameras]
    project_depth_to_pC_funcs = [c.project_depth_to_pC for c in cameras]
    X_WCs = [c.X_WC for c in cameras]

    cloud = get_merged_masked_pcd(
        predictions, rgb_ims, depth_ims, project_depth_to_pC_funcs, X_WCs, label
    )

    plant_context = plant.GetMyContextFromRoot(context)
    scene_graph_context = scene_graph.GetMyContextFromRoot(context)

    min_cost = np.inf
    best_X_G = RigidTransform()
    for i in range(100):
        cost, X_G = GenerateAntipodalGraspCandidate(diagram, context, cloud, rng)
        if np.isfinite(cost) and cost < min_cost:
            min_cost = cost
            best_X_G = X_G

    plant.SetFreeBodyPose(plant_context, plant.GetBodyByName("body"), best_X_G)
    diagram.ForcedPublish(context)
    print("Antipodal grasp:", best_X_G)

    return best_X_G


# if running_as_notebook:
#     find_antipodal_grasp(environment_diagram, environment_context, cameras)

In [7]:
class CameraSystem:
    def __init__(self, idx, meshcat, diagram, context):
        self.idx = idx
        self.context = context
        self.diagram = diagram

        # # Read images
        # depth_im_read = (
        #     diagram.GetOutputPort("camera{}_depth_image".format(idx))
        #     .Eval(context)
        #     .data.squeeze()
        # )
        # self.depth_im = deepcopy(depth_im_read)
        # self.depth_im[self.depth_im == np.inf] = 10.0
        # self.rgb_im = (
        #     diagram.GetOutputPort("camera{}_rgb_image".format(idx)).Eval(context).data
        # )

        # # Get other info about the camera
        # # cam = diagram.GetSubsystemByName("camera" + str(idx))
        # # cam_context = cam.GetMyMutableContextFromRoot(context)
        # pose_subsystem = diagram.GetSubsystemByName(f"camera{idx}.pose")
        # pose_subsystem_context = pose_subsystem.GetMyContextFromRoot(context)  # Get specific context for ExtractPose subsystem
        # pose_output = pose_subsystem.GetOutputPort("pose")
        # pose_data = pose_output.Eval(pose_subsystem_context)
        # #camera_frame = plant.GetFrameByName("base", plant.GetModelInstanceByName(f"camera{idx}_model"))
        # cam = diagram.GetSubsystemByName("station").GetSubsystemByName("rgbd_sensor_camera" + str(idx))
        # self.X_WC = pose_data
        # self.cam_info = cam.default_depth_render_camera().core().intrinsics()
        # # self.fx = 525  # Focal length in pixels
        # self.fy = 525  # Focal length in pixels
        # self.cx = 320  # Principal point x-coordinate for a 640x480 image
        # self.cy = 240  # Principal point y-coordinate for a 640x480 image


        

    def project_depth_to_pC(self, depth_pixel):
        """
        project depth pixels to points in camera frame
        using pinhole camera model
        Input:
            depth_pixels: numpy array of (nx3) or (3,)
        Output:
            pC: 3D point in camera frame, numpy array of (nx3)
        """
        # switch u,v due to python convention
        v = depth_pixel[:, 0]
        u = depth_pixel[:, 1]
        Z = depth_pixel[:, 2]
        cx = self.cam_info.center_x()
        cy = self.cam_info.center_y()
        fx = self.cam_info.focal_x()
        fy = self.cam_info.focal_y()
        X = (u - cx) * Z / fx
        Y = (v - cy) * Z / fy
        pC = np.c_[X, Y, Z]
        return pC
    
    def compute_camera_data(self):
        # Read images
        context = self.context
        diagram = self.diagram
        depth_im_read = (
            diagram.GetOutputPort("camera{}_depth_image".format(self.idx))
            .Eval(context)
            .data.squeeze()
        )
        self.depth_im = deepcopy(depth_im_read)
        self.depth_im[self.depth_im == np.inf] = 10.0
        self.rgb_im = (
            diagram.GetOutputPort("camera{}_rgb_image".format(self.idx)).Eval(context).data
        )

        # Get other info about the camera
        # cam = diagram.GetSubsystemByName("camera" + str(idx))
        # cam_context = cam.GetMyMutableContextFromRoot(context)
        pose_subsystem = diagram.GetSubsystemByName(f"camera{self.idx}.pose")
        pose_subsystem_context = pose_subsystem.GetMyContextFromRoot(context)  # Get specific context for ExtractPose subsystem
        pose_output = pose_subsystem.GetOutputPort("pose")
        pose_data = pose_output.Eval(pose_subsystem_context)
        #camera_frame = plant.GetFrameByName("base", plant.GetModelInstanceByName(f"camera{idx}_model"))
        cam = diagram.GetSubsystemByName("station").GetSubsystemByName("rgbd_sensor_camera" + str(self.idx))
        self.X_WC = pose_data
        self.cam_info = cam.default_depth_render_camera().core().intrinsics()


In [8]:
# - add_model:
#     name: table_top
#     file: package://dummy_project/table_top.sdf

# - add_weld:
#     parent: world
#     child: table_top::table_top_center

# - add_model:
#         name: dual_disk_model
#         file: package://dummy_project/belt.sdf
#         default_free_body_pose:
#             disk_2:
#                 translation: [-5.15, 0, 0.8]
#                 rotation: !Rpy { deg: [0, 0, 0] }

# - add_weld:
#     parent: table_top::table_top_center
#     child: dual_disk_model::disk_2_center
#     X_PC:
#         rotation: !Rpy { deg: [0.0, 0.0, 0.0] }
#         translation: [-5.15, 0, 0.25]

# - add_model:
#         name: bin_model
#         file: package://dummy_project/recyclable_bin.sdf
#         default_free_body_pose:
#             bin_base:
#                 translation: [-4.65, -0.5, 0.8]
#                 rotation: !Rpy { deg: [0, 0, 0] }

# - add_weld:
#     parent: table_top::table_top_center
#     child: bin_model::bin_front_top_center
#     X_PC:
#         rotation: !Rpy { deg: [0.0, 0.0, 0.0] }
#         translation: [0.35, 0.8, 0.25]

# - add_model:
#     name: non_bin_model
#     file: package://dummy_project/non_recyclable_bin.sdf
#     default_free_body_pose:
#         bin_base:
#             translation: [-5.65, 0.5, 0.8]
#             rotation: !Rpy { deg: [0, 0, 0] }

# - add_weld:
#     parent: table_top::table_top_center
#     child: non_bin_model::bin_front_top_center
#     X_PC:
#         rotation: !Rpy { deg: [0.0, 0.0, 0.0] }
#         translation: [0.35, -0.8, 0.25]

# - add_model:
#     name: cube_model
#     file: package://drake_models/ycb/004_sugar_box.sdf
#     default_free_body_pose:
#         base_link_sugar:
#             translation: [-0.45, 0, 0.8]
#             rotation: !Rpy { deg: [0, 0, 0] }

scenario_data = """
directives:
- add_model:
    name: table_top
    file: package://dummy_project/table_top.sdf

- add_weld:
    parent: world
    child: table_top::table_top_center

- add_model:
    name: dual_disk_model
    file: package://dummy_project/belt.sdf
    default_free_body_pose:
        disk_2:
            translation: [-5.15, 0, 0.8]
            rotation: !Rpy { deg: [0, 0, 0] }

- add_weld:
    parent: table_top::table_top_center
    child: dual_disk_model::disk_2_center
    X_PC:
        rotation: !Rpy { deg: [0.0, 0.0, 0.0] }
        translation: [-5.15, 0, 0.25]

- add_model:
    name: bin_model
    file: package://dummy_project/recyclable_bin.sdf
    default_free_body_pose:
        bin_base:
            translation: [-4.65, -0.5, 0.8]
            rotation: !Rpy { deg: [0, 0, 0] }

- add_weld:
    parent: table_top::table_top_center
    child: bin_model::bin_front_top_center
    X_PC:
        rotation: !Rpy { deg: [0.0, 0.0, 0.0] }
        translation: [0.35, 0.8, 0.25]

- add_model:
    name: non_bin_model
    file: package://dummy_project/non_recyclable_bin.sdf
    default_free_body_pose:
        bin_base:
            translation: [-5.65, 0.5, 0.8]
            rotation: !Rpy { deg: [0, 0, 0] }

- add_weld:
    parent: table_top::table_top_center
    child: non_bin_model::bin_front_top_center
    X_PC:
        rotation: !Rpy { deg: [0.0, 0.0, 0.0] }
        translation: [0.35, -0.8, 0.25]

- add_model:
    name: cube_model
    file: package://drake_models/ycb/004_sugar_box.sdf
    default_free_body_pose:
        base_link_sugar:
            translation: [-0.7, 0, 0.35]
            rotation: !Rpy { deg: [0, 0, 0] }

- add_model:
    name: iiwa
    file: package://drake_models/iiwa_description/sdf/iiwa7_no_collision.sdf
    default_joint_positions:
        iiwa_joint_1: [-1.57]
        iiwa_joint_2: [0.1]
        iiwa_joint_3: [0]
        iiwa_joint_4: [-1.2]
        iiwa_joint_5: [0]
        iiwa_joint_6: [1.6]
        iiwa_joint_7: [0]

- add_weld:
    parent: world
    child: iiwa::iiwa_link_0
    X_PC:
        translation: [0, 0, 0]
        rotation: !Rpy { deg: [0, 0, -90] }

- add_model:
    name: wsg
    file: package://drake_models/wsg_50_description/sdf/schunk_wsg_50_with_tip.sdf

- add_weld:
    parent: iiwa::iiwa_link_7
    child: wsg::body
    X_PC:
        translation: [0, 0, 0.09]
        rotation: !Rpy { deg: [90, 0, 90] }

- add_frame:
    name: cameras_frame
    X_PF:
        base_frame: world
        translation: [-0.55, 0, 0.4]
- add_frame:
    name: camera0_frame
    X_PF:
        base_frame: cameras_frame
        translation: [0.4, 0, 0.2]
- add_frame:
    name: camera1_frame
    X_PF:
        base_frame: cameras_frame
        translation: [-0.4, 0, 0.2]

- add_frame:
    name: camera2_frame
    X_PF:
        base_frame: cameras_frame
        translation: [0, 0.4, 0.2]


- add_model:
    name: camera0_model
    file: package://manipulation/camera_box.sdf

- add_weld:
    parent: camera0_frame
    child: camera0_model::base
    X_PC:
        rotation: !Rpy { deg: [-120, 0, 90] }

- add_model:
    name: camera1_model
    file: package://manipulation/camera_box.sdf

- add_weld:
    parent: camera1_frame
    child: camera1_model::base
    X_PC:
        rotation: !Rpy { deg: [-120, 0, -90] }

- add_model:
    name: camera2_model
    file: package://manipulation/camera_box.sdf

- add_weld:
    parent: camera2_frame
    child: camera2_model::base
    X_PC:
        rotation: !Rpy { deg: [-120, 0, 180] }
plant_config:
    time_step: 1e-2
    contact_model: "hydroelastic_with_fallback"
    discrete_contact_approximation: "sap"


model_drivers:
    iiwa: !IiwaDriver
        control_mode: position_only
        hand_model_name: wsg
    wsg: !SchunkWsgDriver {}
        
cameras:
    main_camera:
        name: camera0
        depth: True
        X_PB:
            base_frame: camera0_model::base
    secondary_camera:
        name: camera1
        depth: True
        X_PB:
            base_frame: camera1_model::base
    third_camera:
        name: camera2
        depth: True
        X_PB:
            base_frame: camera2_model::base
         
"""

class IIWA_ARM:
    def __init__(self, builder, scenario, meshcat):
        self.builder = builder
        self.meshcat = meshcat
        self.station = self.builder.AddSystem(
            MakeHardwareStation(
                scenario, 
                self.meshcat, 
                package_xmls=[os.getcwd() + "/package.xml"])
        )
        self.plant = self.station.GetSubsystemByName("plant")

        self.max_tries = 10

        self.iiwa_model_instance = self.plant.GetModelInstanceByName("iiwa")

        self.gripper_frame = self.plant.GetFrameByName("body")
        self.world_frame = self.plant.world_frame()

        self.visualizer = MeshcatVisualizer.AddToBuilder(
            self.builder, self.station.GetOutputPort("query_object"), self.meshcat
        )

        self.context = self.station.CreateDefaultContext()
        self.station_context = self.station.GetMyMutableContextFromRoot(self.context)
        self.plant_context = self.plant.GetMyMutableContextFromRoot(self.context)

    def get_station(self):
        return self.station
    
    def get_plant(self):
        return self.plant

    def num_input_ports(self):
        return self.station.num_input_ports()

    def visualize_frame(self, name, X_WF, length=0.15, radius=0.006):
        AddMeshcatTriad(
            self.meshcat, name, length=length, radius=radius, X_PT=X_WF
        )
    
    def visualize_gripper_at_target(self, target_joint_positions):
        context = self.plant.CreateDefaultContext()
        self.plant.SetPositions(context, self.iiwa_model_instance, target_joint_positions)

        X_WG_target = self.plant.CalcRelativeTransform(
            context, frame_A=self.world_frame, frame_B=self.gripper_frame
        )
        self.visualize_frame("gripper_target_pose", X_WG_target)

    def clear_all_triads(self):
        self.meshcat.Delete()

    def get_current_gripper_joint_position(self):
        return self.plant.GetPositions(self.plant_context, self.plant.GetModelInstanceByName("iiwa"))

    def get_X_WG(self):
        return self.plant.CalcRelativeTransform(
            self.plant_context, self.world_frame, self.gripper_frame
        )

    def get_X_WTarget(self, translation):
        X_WG = self.get_X_WG()
        rotation_matrix = RotationMatrix.Identity()
        X_GTarget = RigidTransform(
            rotation_matrix, 
            translation
        )

        X_WTarget = X_WG @ X_GTarget
        return X_WTarget

    def find_joint_positions(self, X_WTarget):
        context = self.plant.CreateDefaultContext()
        end_effector_frame = self.plant.GetFrameByName('body')

        ik = InverseKinematics(self.plant, context)
        ik.AddPositionConstraint(
            frameB=end_effector_frame,
            p_BQ=np.zeros((3, 1)),
            frameA=self.plant.world_frame(),
            p_AQ_lower=X_WTarget.translation().reshape((3, 1)) - 0.01,
            p_AQ_upper=X_WTarget.translation().reshape((3, 1)) + 0.01
        )

        last_joint_positions = None
        for _ in range (self.max_tries):
            q0 = self.plant.GetPositions(context).copy()
            ik.prog().SetInitialGuess(ik.q(), q0)

            # Solve the inverse kinematics problem
            result = Solve(ik.prog())
            # print(result.GetSolution(ik.q()[1:8]))
            # last_joint_positions = result.GetSolution(ik.q()[:7])
            last_joint_positions = result.GetSolution(ik.q()[1:8])
            if result.is_success():
                print("Success!")
                joint_positions = last_joint_positions
                return joint_positions

        return last_joint_positions

        # # Solve the inverse kinematics problem
        # result = Solve(ik.prog())
        # joint_positions = result.GetSolution(ik.q()[:7])
        # return joint_positions

    def move_arm(self, trajectory_source, wsg_src = None):
        self.builder.AddSystem(trajectory_source)

        iiwa_position_input = self.station.GetInputPort("iiwa.position")
        self.builder.Connect(trajectory_source.get_output_port(), iiwa_position_input)

        wsg_input = self.station.GetInputPort("wsg.position")

        if wsg_src is None:
            wsg_src = ConstantVectorSource(np.zeros(wsg_input.size()))

        self.builder.AddSystem(wsg_src)
        self.builder.Connect(wsg_src.get_output_port(), wsg_input)

In [9]:
class TrajOptPlanner:
    def __init__(self, num_time_samples=20):
        self.num_time_samples = num_time_samples

    def plan(self, start, goal):
        prog = MathematicalProgram()
        n = len(start)
        h = 1.0 / (self.num_time_samples - 1)

        # Decision variables for joint positions and velocities
        q = prog.NewContinuousVariables(self.num_time_samples, n, "q")
        v = prog.NewContinuousVariables(self.num_time_samples, n, "v")

        # Initial and final state constraints
        prog.AddBoundingBoxConstraint(start, start, q[0])
        prog.AddBoundingBoxConstraint(goal, goal, q[-1])
        prog.AddBoundingBoxConstraint(np.zeros(n), np.zeros(n), v[0])
        prog.AddBoundingBoxConstraint(np.zeros(n), np.zeros(n), v[-1])

        for i in range(self.num_time_samples - 1):
            q_next = q[i] + h * v[i]
            for j in range(n):
                prog.AddLinearEqualityConstraint(q_next[j] == q[i + 1][j])

        R = 10  # Cost on input "effort".
        for i in range(self.num_time_samples - 1):
            u = (v[i + 1] - v[i]) / h
            prog.AddQuadraticCost(R * u.dot(u))

        result = Solve(prog)
        if not result.is_success():
            raise RuntimeError("Trajectory optimization failed")

        q_trajectory = result.GetSolution(q)
        v_trajectory = result.GetSolution(v)
        times = np.linspace(0, 4, self.num_time_samples)

        return times, q_trajectory, v_trajectory

    

In [10]:
class Desired_Pose:
    def __init__(self, desired_translation, desired_rotation, desired_wsg = 0.0):
        self.desired_translation = desired_translation
        self.desired_rotation = desired_rotation
        self.desired_wsg = desired_wsg

    def __repr__(self):
        return f"Desired_Pose({self.desired_translation}, {self.desired_rotation}, {self.desired_wsg})"

In [11]:
from pydrake.all import (
    LeafSystem,
    AbstractValue,
    BasicVector,
    DiscreteUpdateEvent,
    PiecewisePose
)

class DynamicTrajectorySource(LeafSystem):
    def __init__(self, initial_trajectory):
        LeafSystem.__init__(self)
        self.DeclareVectorOutputPort("trajectory_output", initial_trajectory.rows(), self.CalcOutput)
        self.trajectory = initial_trajectory

    def CalcOutput(self, context, output):
        time = context.get_time()
        # print(self.trajectory.value(time))
        output.SetFromVector(self.trajectory.value(time))

    def update_trajectory(self, new_trajectory):
        self.trajectory = new_trajectory

def update_trajectory(trajectory_source: DynamicTrajectorySource, new_trajectory: PiecewisePolynomial):
    trajectory_source.update_trajectory(new_trajectory)

In [12]:
class SelfMadeArmDriver:
    def __init__(
        self, 
        IIWA: IIWA_ARM, 
        planner: TrajOptPlanner, 
        dynamic_joint_trajectory_source: DynamicTrajectorySource,
        dynamic_wsg_trajectory_source: DynamicTrajectorySource
    ):
        self.iiwa = IIWA
        self.planner = planner
        self.dynamic_joint_trajectory_source = dynamic_joint_trajectory_source
        self.dynamic_wsg_trajectory_source = dynamic_wsg_trajectory_source

    def set_desired_poses_and_follow(self, desired_poses: list[Desired_Pose], start_time = 0.0, dynamic_joint_trajectory_source = None):
        pose_positions = [self.iiwa.get_X_WG()]
        # print(desired_poses)

        # self.iiwa.clear_all_triads()
        self.iiwa.visualize_frame("current_gripper", pose_positions[0])
        desired_rotation = [0]
        desired_wsg_grip = [0.0]
        for i, d in enumerate(desired_poses):
            # x_w_target = self.iiwa.get_X_WTarget(d.desired_translation)
            x_w_target = RigidTransform(RotationMatrix().Identity(), d.desired_translation)
            self.iiwa.visualize_frame(f"gripper_target_{i}", x_w_target)
            pose_positions.append(x_w_target)
            desired_rotation.append(d.desired_rotation)
            desired_wsg_grip.append(d.desired_wsg)

        # print(desired_wsg_grip)
        return self.follow_positions(pose_positions, desired_rotation, desired_wsg_grip, start_time)


    def generate_grip_values(self, times, initial_wsg_grip, desired_grip, transition_steps = 7):
        num_steps = len(times)
        grip_values = [initial_wsg_grip] * (num_steps - transition_steps)
        transition_values = [initial_wsg_grip + (desired_grip - initial_wsg_grip) * (i / (transition_steps - 1)) for i in range(transition_steps)]
        grip_values.extend(transition_values)
        return grip_values

    def follow_positions_works(
        self, 
        pose_positions: list[RigidTransform], 
        desired_rotation: list[float], 
        desired_wsg_grips: list[float],
        start_time: float
    ):
        times, joint_positions, wsg_positions = [], [], []

        # print(desired_wsg_grips)

        for i, pose in enumerate(pose_positions[:-1]):
            start_joint_positions = self.iiwa.find_joint_positions(pose)

            start_joint_positions[-1] = desired_rotation[i]

            end_joint_positions = self.iiwa.find_joint_positions(pose_positions[i + 1])
            end_joint_positions[-1] = desired_rotation[i + 1]

            t, q, _ = self.planner.plan(start_joint_positions, end_joint_positions)

            wsg_gripper_position = self.generate_grip_values(
                t, 
                desired_wsg_grips[i], 
                desired_wsg_grips[i+1]
            )

            times.append(t)
            joint_positions.append(q)
            wsg_positions.append(wsg_gripper_position)
            # print(q[-1])

        total_t, total_q, grip_values = self.combine_all_trajectories(times, joint_positions, wsg_positions, start_time)
        total_t, total_q= np.array(total_t), np.array(total_q).T
        trajectory = PiecewisePolynomial.FirstOrderHold(total_t, total_q)

        total_wsg = [np.array([[value]]) for value in grip_values]
        wsg_trajectory = PiecewisePolynomial.FirstOrderHold(total_t, total_wsg)

        trajectory_source = TrajectorySource(trajectory)
        wsg_trajectory_source = TrajectorySource(wsg_trajectory)

        self.iiwa.move_arm(trajectory_source, wsg_trajectory_source)

        return trajectory.end_time()
        
    def follow_positions(
        self, 
        pose_positions: list[RigidTransform], 
        desired_rotation: list[float], 
        desired_wsg_grips: list[float],
        start_time: float
    ):
        times, joint_positions, wsg_positions = [], [], []

        # print(desired_wsg_grips)

        for i, pose in enumerate(pose_positions[:-1]):
            start_joint_positions = self.iiwa.find_joint_positions(pose)

            start_joint_positions[-1] = desired_rotation[i]

            end_joint_positions = self.iiwa.find_joint_positions(pose_positions[i + 1])
            end_joint_positions[-1] = desired_rotation[i + 1]

            # print(start_joint_positions, end_joint_positions)
            # print()
            t, q, _ = self.planner.plan(start_joint_positions, end_joint_positions)

            wsg_gripper_position = self.generate_grip_values(
                t, 
                desired_wsg_grips[i], 
                desired_wsg_grips[i+1]
            )

            times.append(t)
            joint_positions.append(q)
            wsg_positions.append(wsg_gripper_position)
            # print(q[-1])

        total_t, total_q, grip_values = self.combine_all_trajectories(times, joint_positions, wsg_positions, start_time)
        # for t, q in zip(total_t, total_q):
        #     print(f"{t}, {q}")
        
        total_t, total_q= np.array(total_t), np.array(total_q).T
        trajectory = PiecewisePolynomial.FirstOrderHold(total_t, total_q)

        total_wsg = [np.array([[value]]) for value in grip_values]
        wsg_trajectory = PiecewisePolynomial.FirstOrderHold(total_t, total_wsg)

        update_trajectory(self.dynamic_joint_trajectory_source, trajectory)
        update_trajectory(self.dynamic_wsg_trajectory_source, wsg_trajectory)

        return trajectory.end_time()

    def combine_all_trajectories(self, times, joint_positions, wsg_positions,start_time: float):
        t_res, q_res, wsg_res = times[0] + start_time, joint_positions[0], wsg_positions[0]

        for i in range (1, len(times)):
            next_time = times[i] + t_res[-1]
            t_res = np.concatenate([t_res, next_time[1:]])
            q_res = np.concatenate([q_res, joint_positions[i][1:]])
            wsg_res = np.concatenate([wsg_res, wsg_positions[i][1:]])

        # for t, q in zip(t_res, q_res):
        #     print(f"{t}, {q}")

        return t_res, q_res, wsg_res
        


In [13]:
class PerceptionModule:
    def __init__ (self, diagram, context):
        self.cameras = [CameraSystem(i, meshcat, diagram, context) for i in range(3)]
        self.diagram = diagram
        self.context = context
    def update_cameras(self):
        for c in self.cameras:
            c.compute_camera_data()
    def valid_prediction(self, predictions):
        return (predictions[0][0]["boxes"].size != 0)
    def get_camera_data(self):
        cameras = self.cameras
        rgb_ims = [c.rgb_im for c in cameras]
        depth_ims = [c.depth_im for c in cameras]
        project_depth_to_pC_funcs = [c.project_depth_to_pC for c in cameras]
        X_WCs = [c.X_WC for c in cameras]
        return rgb_ims, depth_ims, project_depth_to_pC_funcs, X_WCs
    def predict_mask(self, current_time):
        cameras = self.cameras
        if running_as_notebook and current_time == 1.0:
            with torch.no_grad():
                predictions = []
                predictions.append(
                    model([Tf.to_tensor(cameras[0].rgb_im[:, :, :3]).to(device)])
                )
                predictions.append(
                    model([Tf.to_tensor(cameras[1].rgb_im[:, :, :3]).to(device)])
                )
                predictions.append(
                    model([Tf.to_tensor(cameras[2].rgb_im[:, :, :3]).to(device)])
                )
                for i in range(3):
                    for k in predictions[i][0].keys():
                        if k == "masks":
                            predictions[i][0][k] = (
                                predictions[i][0][k].mul(255).byte().cpu().numpy()
                            )
                        else:
                            predictions[i][0][k] = predictions[i][0][k].cpu().numpy()
            return predictions
        else:
            return [[{"boxes": np.empty((0, 4), dtype=np.float32)}]]
    def empty_mask(self):
        return [[{"boxes": np.empty((0, 4), dtype=np.float32)}]]
    def get_antipodal_grasps(self, diagram, context, predictions, X_WG_curr):
        modes = np.concatenate((predictions[0][0]['labels'], predictions[1][0]['labels'], predictions[2][0]['labels']))
        label = mode(modes)[0]

        pcd = get_merged_masked_pcd(
            predictions, *self.get_camera_data(), label
        )    

        antipodal_grasp = find_antipodal_grasp(self.diagram, self.context, self.cameras, label, predictions)
        X_WG = antipodal_grasp
        translation = X_WG.translation()
        # translation_1 = translation + np.array([0, 0, 0.2])
        # translation_2 = translation + np.array([0, 0, 0.4])
        rotation = (X_WG.inverse() @ X_WG_curr).rotation().ToRollPitchYaw()
        #rotation = abs(rotation)
        print("yaw:", rotation.yaw_angle())
        print("pitch:", rotation.pitch_angle())
        print("roll:", rotation.roll_angle())
        #print("position:", rotation)
        return  translation, abs(rotation.pitch_angle())
        
    

# State Machine

In [None]:
from pydrake.all import (
    LeafSystem
)

from enum import (
    Enum
)

class STATES(int, Enum):
    IDLE = 0
    FOLLOW = 1
    

class StateMachine(LeafSystem):
    def __init__(
        self,
        builder: DiagramBuilder,
        meshcat: MeshcatVisualizer,
    ):
        LeafSystem.__init__(self)
        self.builder = builder
        self.meshcat = meshcat
        self.DeclarePeriodicDiscreteUpdateEvent(0.1, 0.0, self.Update)

        self.iiwa = IIWA_ARM(builder, scenario, meshcat)
        self.station = self.iiwa.get_station()
        self.plant = self.iiwa.get_plant()
        self.planner = TrajOptPlanner()

        self.start_time = 0
        self.state = STATES.IDLE
        self.initial_joint_position = self.iiwa.get_current_gripper_joint_position()
        self.predictions = [[{"boxes": np.empty((0, 4), dtype=np.float32)}]]

        self.initial_gripper_pose = self.iiwa.get_X_WG()
        #print(self.initial_gripper_pose)

        # Setup initial trajectory
        times = np.array([0, 0.1])
        positions = np.tile(self.initial_joint_position, (2, 1)).T
        self.initial_trajectory = PiecewisePolynomial.FirstOrderHold(times, positions)
        self.iiwa_position_input = self.station.GetInputPort("iiwa.position")
        self.dynamic_trajectory_source = self.builder.AddSystem(DynamicTrajectorySource(self.initial_trajectory))

        # Setup initial wsg trajectory
        wsg_positions = np.zeros((1, 2))  # Single joint position, 2 time points
        self.initial_wsg_trajectory = PiecewisePolynomial.FirstOrderHold(times, wsg_positions)
        self.wsg_input = self.station.GetInputPort("wsg.position")
        self.dynamic_wsg_trajectory_source = self.builder.AddSystem(DynamicTrajectorySource(self.initial_wsg_trajectory))

        self.ARM = SelfMadeArmDriver(
            self.iiwa, 
            self.planner, 
            self.dynamic_trajectory_source, 
            self.dynamic_wsg_trajectory_source
        )

        self.builder.Connect(self.dynamic_trajectory_source.get_output_port(0), self.iiwa_position_input)
        self.builder.Connect(self.dynamic_wsg_trajectory_source.get_output_port(0), self.wsg_input)

        self.is_moving = False
        self.end_move_time = None
        self.cams = None

    def set_perception_module(self, diagram, context):
        self.perception = PerceptionModule(diagram, context)
        self.diagram = diagram
        self.context = context
        self.predictions = self.perception.empty_mask()
    def Update(self, context, event):
        diagram = self.diagram
        current_time = context.get_time()
        #Update the cameras with the current time data
        self.perception.update_cameras()
        
        #Run the masking model and get the predictions
        predicts = self.perception.predict_mask(current_time)

        #If not a valid prediction, don't do anything  
        if (not self.perception.valid_prediction(predicts) and not self.perception.valid_prediction(self.predictions)):
            curr_times_arr = np.array([current_time, current_time + 0.1])

            current_position = self.iiwa.get_current_gripper_joint_position()
            positions = np.tile(current_position, (2, 1)).T
            curr_joint_traj = PiecewisePolynomial.FirstOrderHold(curr_times_arr, positions)

            wsg_positions = np.zeros((1, 2))  # Single joint position, 2 time points
            curr_wsg_traj = PiecewisePolynomial.FirstOrderHold(curr_times_arr, wsg_positions)

        
            update_trajectory(self.dynamic_trajectory_source, self.initial_trajectory)
            update_trajectory(self.dynamic_wsg_trajectory_source, self.initial_wsg_trajectory)
        else:
            self.predictions = self.predictions if self.perception.valid_prediction(self.predictions) else predicts

            X_WG_curr = state_machine.iiwa.get_X_WG()


            curr_times_arr = np.array([current_time, current_time + 0.1])

            current_position = self.iiwa.get_current_gripper_joint_position()
            positions = np.tile(current_position, (2, 1)).T
            curr_joint_traj = PiecewisePolynomial.FirstOrderHold(curr_times_arr, positions)

            wsg_positions = np.zeros((1, 2))  # Single joint position, 2 time points
            curr_wsg_traj = PiecewisePolynomial.FirstOrderHold(curr_times_arr, wsg_positions)

        
            if current_time <= 2.0:
                update_trajectory(self.dynamic_trajectory_source, self.initial_trajectory)
                update_trajectory(self.dynamic_wsg_trajectory_source, self.initial_wsg_trajectory)
            elif not self.is_moving:
                self.is_moving = True
                translation, rotation = self.perception.get_antipodal_grasps(self.diagram, context, self.predictions, X_WG_curr)
                translation_1 = translation + np.array([0, 0, 0.2])
                translation_2 = translation + np.array([0, 0, 0.4])
            
                follow_poses = [

                    Desired_Pose(
                        translation_1,
                        rotation,
                        1.0,
                    ),
                    Desired_Pose(
                        translation + np.array([0.03, 0, 0.0]),
                        rotation,
                        1.0,
                    ),
                    Desired_Pose(
                        translation,
                        rotation,
                        0.0,
                    ),
                    Desired_Pose(
                        translation_2,
                        rotation,
                        0.0,
                    ),
                    Desired_Pose(
                        np.array([0.15, -0.8, 0.6]),
                        rotation,
                        0.0,
                    ),
                    Desired_Pose(
                        np.array([0.15, -0.8, 0.6]),
                        rotation,
                        1.0,
                    ),
                ]
                self.end_move_time = self.ARM.set_desired_poses_and_follow(follow_poses, current_time)
            elif current_time >= self.end_move_time:
                self.is_moving = False
                update_trajectory(self.dynamic_trajectory_source, curr_joint_traj)
                update_trajectory(self.dynamic_wsg_trajectory_source, curr_wsg_traj)

In [15]:
from pydrake.all import (
    ConstantValueSource
)

scenario = LoadScenario(data=scenario_data)
builder = DiagramBuilder()

state_machine = StateMachine(builder, meshcat)
builder.AddSystem(state_machine)

# Export the camera outputs
for idx in range(3):  # Assuming three cameras defined in directive
    builder.ExportOutput(state_machine.station.GetOutputPort(f"camera{idx}.rgb_image"), f"camera{idx}_rgb_image")
    builder.ExportOutput(state_machine.station.GetOutputPort(f"camera{idx}.depth_image"), f"camera{idx}_depth_image")

to_point_cloud = AddPointClouds(
    scenario=scenario, station=state_machine.station, builder=builder, meshcat=meshcat
)

# Access plant and scene graph to add visual aids (optional)
plant = state_machine.station.GetSubsystemByName("plant")
scene_graph = state_machine.station.GetSubsystemByName("scene_graph")
for idx in range(3):  # Assuming three cameras defined in directive
    camera_instance = plant.GetModelInstanceByName(f"camera{idx}_model")
    AddMultibodyTriad(
        plant.GetFrameByName("base", camera_instance),
        scene_graph,
        length=0.1,
        radius=0.005,
    )

#Export the point cloud output.
# for idx in range(3):  # Assuming three cameras defined in directive
#     builder.ExportOutput(
#         to_point_cloud[f"camera{idx}"].point_cloud_output_port(), f"camera{idx}_point_cloud"
#     )

diagram = builder.Build()
simulator = Simulator(diagram)
context = simulator.get_mutable_context()

state_machine.set_perception_module(diagram, context)

diagram.ForcedPublish(context)
meshcat.StartRecording()

simulator.AdvanceTo(30.0)
meshcat.PublishRecording()

#point_cloud_subsystem_context = to_point_cloud['camera0'].GetMyContextFromRoot(context)
#print(to_point_cloud['camera0'].point_cloud_output_port().Eval(point_cloud_subsystem_context))



  img = torch.from_numpy(pic.transpose((2, 0, 1))).contiguous()


Antipodal grasp: RigidTransform(
  R=RotationMatrix([
    [-0.9618902147692217, 0.00729971318928091, 0.2733384878105807],
    [-0.273330389589753, 0.002104251938649493, -0.9619179124283384],
    [-0.007596897915241624, -0.9999711427391957, -2.8825899276552214e-05],
  ]),
  p=[-0.669394119897552, 0.009695721373195985, 0.4241087518412503],
)
yaw: -0.007698721626953134
pitch: -1.2947306656192092
roll: -0.241486898571507
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Antipodal grasp: RigidTransform(
  R=RotationMatrix([
    [1.0, 0.0, 0.0],
    [0.0, 1.0, 0.0],
    [0.0, 0.0, 1.0],
  ]),
  p=[0.0, 0.0, 0.0],
)
yaw: -1.5699999999999996
pitch: -1.8685069714410054e-16
roll: -1.8123889803846895
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!
Success!


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=4929c60d-9325-4a04-bca1-550c19632d0a' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>