## Main

In [None]:
import os

import numpy as np

import pydot
from IPython.display import SVG, display
from matplotlib import pyplot as plt

from pydrake.common import temp_directory
from pydrake.geometry import StartMeshcat
from pydrake.systems.analysis import Simulator
from pydrake.visualization import ModelVisualizer
from pydrake.all import (
    ConstantVectorSource,
    DiagramBuilder,
    MeshcatVisualizer,
    MeshcatVisualizerParams,
    MultibodyPlant,
    Parser,
    PiecewisePolynomial,
    PiecewiseQuaternionSlerp,
    RigidTransform,
    RollPitchYaw,
    RotationMatrix,
    Simulator,
    Solve,
    StartMeshcat,
    TrajectorySource,
    PointCloud,
    Rgba, 
)

from scipy.spatial import KDTree

from pydrake.perception import Concatenate

from manipulation import FindResource, running_as_notebook
from manipulation.station import MakeHardwareStation, load_scenario, AddPointClouds
from pydrake.multibody import inverse_kinematics
from pydrake.trajectories import PiecewisePolynomial
from manipulation.meshcat_utils import AddMeshcatTriad
from pydrake.visualization import AddFrameTriadIllustration
from manipulation.scenarios import AddMultibodyTriad, MakeManipulationStation

import trimesh

In [None]:
# Start the visualizer.
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at https://50b7e7e2-5bbe-4c2a-b831-ce519bfa4464.deepnoteproject.com/7000/
Installing NginX server for MeshCat on Deepnote...


In [None]:
def CreateIiwaControllerPlant():
    """creates plant that includes only the robot and gripper, used for controllers."""
    robot_sdf_path = "package://drake/manipulation/models/iiwa_description/iiwa7/iiwa7_with_box_collision.sdf"
    gripper_sdf_path = "package://drake/manipulation/models/wsg_50_description/sdf/schunk_wsg_50_no_tip.sdf"
    sim_timestep = 1e-3
    plant_robot = MultibodyPlant(sim_timestep)
    parser = Parser(plant=plant_robot)
    parser.AddModelsFromUrl(robot_sdf_path)
    parser.AddModelsFromUrl(gripper_sdf_path)
    plant_robot.WeldFrames(
        frame_on_parent_F=plant_robot.world_frame(),
        frame_on_child_M=plant_robot.GetFrameByName("iiwa_link_0"),
    )
    plant_robot.WeldFrames(
        frame_on_parent_F=plant_robot.GetFrameByName("iiwa_link_7"),
        frame_on_child_M=plant_robot.GetFrameByName("body"),
        X_FM=RigidTransform(
            RollPitchYaw(np.pi / 2, 0, np.pi / 2), np.array([0, 0, 0.114])
        ),
    )
    plant_robot.mutable_gravity_field().set_gravity_vector([0, 0, 0])
    plant_robot.Finalize()

    link_frame_indices = []
    for i in range(8):
        link_frame_indices.append(
            plant_robot.GetFrameByName("iiwa_link_" + str(i)).index()
        )

    return plant_robot, link_frame_indices

In [None]:
def create_q_knots(pose_lst):
    """Convert end-effector pose list to joint position list using series of
    InverseKinematics problems. Note that q is 9-dimensional because the last 2 dimensions
    contain gripper joints, but these should not matter to the constraints.
    @param: pose_lst (python list): post_lst[i] contains keyframe X_WG at index i.
    @return: q_knots (python_list): q_knots[i] contains IK solution that will give f(q_knots[i]) \approx pose_lst[i].
    """
    q_knots = []
    plant, _ = CreateIiwaControllerPlant()
    world_frame = plant.world_frame()
    gripper_frame = plant.GetFrameByName("body")
    q_nominal = np.array(
        [0.0, 0.6, 0.0, -1.75, 0.0, 1.0, 0.0, 0.0, 0.0]
    )  # nominal joint angles for joint-centering.

    def AddOrientationConstraint(ik, R_WG, bounds):
        """Add orientation constraint to the ik problem. Implements an inequality
        constraint where the axis-angle difference between f_R(q) and R_WG must be
        within bounds. Can be translated to:
        ik.prog().AddBoundingBoxConstraint(angle_diff(f_R(q), R_WG), -bounds, bounds)
        """
        ik.AddOrientationConstraint(
            frameAbar=world_frame,
            R_AbarA=R_WG,
            frameBbar=gripper_frame,
            R_BbarB=RotationMatrix(),
            theta_bound=bounds,
        )

    def AddPositionConstraint(ik, p_WG_lower, p_WG_upper):
        """Add position constraint to the ik problem. Implements an inequality
        constraint where f_p(q) must lie between p_WG_lower and p_WG_upper. Can be
        translated to
        ik.prog().AddBoundingBoxConstraint(f_p(q), p_WG_lower, p_WG_upper)
        """
        ik.AddPositionConstraint(
            frameA=world_frame,
            frameB=gripper_frame,
            p_BQ=np.zeros(3),
            p_AQ_lower=p_WG_lower,
            p_AQ_upper=p_WG_upper,
        )

    for i in range(len(pose_lst)):
        ik = inverse_kinematics.InverseKinematics(plant)
        q_variables = ik.q()  # Get variables for MathematicalProgram
        prog = ik.prog()  # Get MathematicalProgram

        AddOrientationConstraint(ik, pose_lst[i].rotation(), 0.01)
        p_WG_lower = [pose_lst[i].translation()[0] - 0.05, pose_lst[i].translation()[1] - 0.05, pose_lst[i].translation()[2] - 0.05]
        p_WG_upper = [pose_lst[i].translation()[0] + 0.05, pose_lst[i].translation()[1] + 0.05, pose_lst[i].translation()[2] + 0.05]
        AddPositionConstraint(ik, p_WG_lower, p_WG_upper)

        prog.AddCost((q_variables - q_nominal).dot((q_variables - q_nominal)))

        if i == 0:
            for j in range(len(q_nominal)):
                prog.SetInitialGuess(q_variables[j], q_nominal[j])
        else:
            for j in range(len(q_knots[i-1])):
                prog.SetInitialGuess(q_variables[j], q_knots[i-1][j])

        result = Solve(prog)

        assert result.is_success()

        q_knots.append(result.GetSolution(q_variables))

    return q_knots

In [None]:
def fit_plane(xyzs):
    """
    Args:
      xyzs is (3, N) numpy array
    Returns:
      (4,) numpy array
    """
    center = np.mean(xyzs, axis=1)
    cxyzs = xyzs.T - center
    U, S, V = np.linalg.svd(cxyzs)
    normal = V[-1]  # last row of V
    d = -center.dot(normal)
    plane_equation = np.hstack([normal, d])
    return plane_equation

In [None]:
def ransac(
    point_cloud, model_fit_func, rng, tolerance=1e-3, max_iterations=500
):
    """
    Args:
      point_cloud is (3, N) numpy array
      tolerance is a float
      rng is a random number generator
      max_iterations is a (small) integer
      model_fit_func: the function to fit the model (point clouds)

    Returns:
      (4,) numpy array
    """
    best_ic = 0  # inlier count
    best_model = np.ones(4)  # plane equation ((4,) array)
    iter_count = 0
    N = point_cloud.shape[1]

    ##################
    while iter_count < max_iterations:
        iter_count += 1
        point_indices = rng.integers(low=0, high=N, size=3)

        plane = model_fit_func(point_cloud[:, point_indices])
        ic = 0
        for point in point_cloud.T:
            if abs(plane[0]*point[0] + plane[1]*point[1] + plane[2]*point[2] + plane[3]) <= tolerance:
                ic += 1
        if ic > best_ic:
            best_model = plane
            best_ic = ic

    ##################
    return best_ic, best_model

In [None]:
def remove_plane(point_cloud, ransac, rng, tol=1e-4):
    """
    Args:
        point_cloud: 3xN numpy array of points
        ransac: The RANSAC function to use (call ransac(args))
        rng is a random number generator
        tol: points less than this distance tolerance should be removed
    Returns:
        point_cloud_wo_plane: 3xN numpy array of points
    """
    bunny_points = []
    inlier_count, ransac_plane = ransac(point_cloud, fit_plane, rng, tol, 500)
    for point in point_cloud.T:
        if abs(ransac_plane[0]*point[0] + ransac_plane[1]*point[1] + ransac_plane[2]*point[2] + ransac_plane[3]) > tol:
            bunny_points.append(point)
    point_cloud_wo_plane = np.array(bunny_points).T
    return point_cloud_wo_plane

In [None]:
def least_squares_transform(scene, model) -> RigidTransform:
    """
    Calculates the least-squares best-fit transform that maps corresponding
    points scene to model.
    Args:
      scene: 3xN numpy array of corresponding points
      model: 3xM numpy array of corresponding points
    Returns:
      X_BA: A RigidTransform object that maps point_cloud_A on to point_cloud_B
            such that
                        X_BA.multiply(model) ~= scene,
    """
    # X_BA = RigidTransform()
    ##################
    p_Ombar = model.mean(axis=1)
    p_sbar = scene.mean(axis=1)
    merr = (model.transpose() - p_Ombar).transpose()
    serr = (scene.transpose() - p_sbar).transpose()
    W = np.matmul(serr, merr.T)
    U, Sigma, Vt = np.linalg.svd(W)
    R = np.matmul(U, Vt)
    if np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = np.matmul(U, Vt)
    p = p_sbar - np.matmul(R, p_Ombar)
    X_BA = RigidTransform(RotationMatrix(R), p)
    ##################
    return X_BA

In [None]:
def nearest_neighbors(scene, model):
    """
    Find the nearest (Euclidean) neighbor in model for each
    point in scene
    Args:
        scene: 3xN numpy array of points
        model: 3xM numpy array of points
    Returns:
        distances: (N, ) numpy array of Euclidean distances from each point in
            scene to its nearest neighbor in model.
        indices: (N, ) numpy array of the indices in model of each
            scene point's nearest neighbor - these are the c_i's
    """
    distances = np.empty(scene.shape[1], dtype=float)
    indices = np.empty(scene.shape[1], dtype=int)

    kdtree = KDTree(model.T)
    for i in range(model.shape[1]):
        distances[i], indices[i] = kdtree.query(scene[:, i], 1)

    return distances, indices

In [None]:
def icp(scene, model, max_iterations=20, tolerance=1e-3):
    """
    Perform ICP to return the correct relative transform between two set of points.
    Args:
        scene: 3xN numpy array of points
        model: 3xM numpy array of points
        max_iterations: max amount of iterations the algorithm can perform.
        tolerance: tolerance before the algorithm converges.
    Returns:
      X_BA: A RigidTransform object that maps point_cloud_A on to point_cloud_B
            such that
                        X_BA.multiply(model) ~= scene,
      mean_error: Mean of all pairwise distances.
      num_iters: Number of iterations it took the ICP to converge.
    """
    X_BA = RigidTransform()

    mean_error = 0
    num_iters = 0
    prev_error = 0

    N = model.shape[1]


    while True:
        num_iters += 1

        # your code here
        ##################
        m_corr = np.zeros(shape=(3, N))

        distances, indices = nearest_neighbors(scene, X_BA.multiply(model))
        for i in range(N):
            m_corr[:, [i]] = model[:, [indices[i]]]

        X_BA = least_squares_transform(scene, m_corr)

        mean_error = distances.mean()
        ##################

        if (
            abs(mean_error - prev_error) < tolerance
            or num_iters >= max_iterations
        ):
            break

        prev_error = mean_error

    return X_BA, mean_error, num_iters

In [None]:
class RobotStation:
    """
    Class containing scenario builder and simulator. Can call simulate_trajectory consecutively for replanning.
    """
    def __init__(self, yaml_filename):
        self.q_traj_points = 10
        self.g_traj_points = 4
        builder = DiagramBuilder()
        with open(yaml_filename, 'r') as file:
            scenario_data = file.read()
        scenario = load_scenario(data=scenario_data)
        station = builder.AddSystem(
            MakeHardwareStation(scenario, meshcat=meshcat)
        )
        self.plant = station.GetSubsystemByName("plant")
        scene_graph = station.GetSubsystemByName("scene_graph")
        AddMultibodyTriad(self.plant.GetFrameByName("body"), scene_graph)
        AddMultibodyTriad(self.plant.GetFrameByName("cleaver"), scene_graph)

        t_lst = np.linspace(0, 10, 10)
        q_knots = np.array(create_q_knots([RigidTransform(RotationMatrix(), [0, 0, 0])] * self.q_traj_points))
        q_traj = PiecewisePolynomial.CubicShapePreserving(t_lst, q_knots[:, 0:7].T)

        self.q_traj_system = builder.AddSystem(TrajectorySource(q_traj))
        self.g_traj_system = builder.AddSystem(TrajectorySource(PiecewisePolynomial.FirstOrderHold(np.array([0.0, 0.1, 0.2, 0.3]), np.array([0.0, 0.0, 0.0, 0.0]).reshape(1, 4))))

        # AddFrameTriadIllustration(scene_graph=scene_graph, body="cleaver")

        self.visualizer = MeshcatVisualizer.AddToBuilder(
        builder,
        station.GetOutputPort("query_object"),
        meshcat,
        MeshcatVisualizerParams(delete_on_initialization_event=False),
        )

        # Export the camera outputs
        builder.ExportOutput(
            station.GetOutputPort("camera0.rgb_image"), "rgb_image"
        )
        builder.ExportOutput(
            station.GetOutputPort("camera0.depth_image"), "depth_image"
        )

        to_point_cloud = AddPointClouds(
            scenario=scenario, station=station, builder=builder, meshcat=meshcat
        )

        builder.Connect(
            self.q_traj_system.get_output_port(), station.GetInputPort("iiwa.position")
        )
        builder.Connect(
            self.g_traj_system.get_output_port(), station.GetInputPort("wsg.position")
        )

        builder.ExportOutput(
            to_point_cloud['camera0'].point_cloud_output_port(), "point_cloud0"
        ) #need to have "to_point_cloud['string of camera name']" for this to work

        builder.ExportOutput(
            to_point_cloud['camera1'].point_cloud_output_port(), "point_cloud1"
        ) #need to have "to_point_cloud['string of camera name']" for this to work

        self.diagram = builder.Build()

        self.context = self.diagram.CreateDefaultContext()
        self.diagram.ForcedPublish(self.context)

        self.simulator = Simulator(self.diagram)
        self.visualizer.StartRecording(False)

    def simulate_trajectory(self, q_traj, g_traj, time_to_advance=0.01):
        self.q_traj_system.UpdateTrajectory(q_traj)
        self.g_traj_system.UpdateTrajectory(g_traj)
        
        self.simulator.AdvanceTo(time_to_advance)
        self.visualizer.PublishRecording()

    def get_point_cloud(self):
        pc_image0 = self.diagram.GetOutputPort("point_cloud0").Eval(self.context)
        pc_image1 = self.diagram.GetOutputPort("point_cloud1").Eval(self.context)

        pc_image = Concatenate([pc_image0, pc_image1])

        xprime = pc_image.xyzs()[0]
        yprime = pc_image.xyzs()[1]
        zprime = pc_image.xyzs()[2]

        x = np.where(xprime == np.inf, 0, xprime)[0::150]
        y = np.where(yprime == np.inf, 0, yprime)[0::150]
        z = np.where(zprime == np.inf, 0, zprime)[0::150]

        return np.array([x, y, z])


In [None]:
t_lst = np.linspace(0, 10, 10)
pose_lst = [RigidTransform(RotationMatrix(), [0.5, 0, 0.5])] * 10

# Create gripper trajectory.
gripper_t_lst = np.array([0.0, 5.0, 6.0, 10.0])
gripper_knots = np.array([0.05, 0.05, 0.0, 0.0]).reshape(1, 4)
g_traj = PiecewisePolynomial.FirstOrderHold(gripper_t_lst, gripper_knots)

In [None]:
q_knots = np.array(create_q_knots(pose_lst))
q_traj = PiecewisePolynomial.CubicShapePreserving(t_lst, q_knots[:, 0:7].T)

In [None]:
t_lst_2 = np.linspace(10, 20, 10)
pose_lst_2 = [RigidTransform(RotationMatrix(), [0, 0.5, 0.5])] * 10

# Create gripper trajectory.
gripper_t_lst_2 = np.array([10.0, 15.0, 16.0, 20.0])
gripper_knots_2 = np.array([0.05, 0.05, 0.0, 0.0]).reshape(1, 4)
g_traj_2 = PiecewisePolynomial.FirstOrderHold(gripper_t_lst, gripper_knots)

In [None]:
q_knots_2 = np.array(create_q_knots(pose_lst_2))
q_traj_2 = PiecewisePolynomial.CubicShapePreserving(t_lst_2, q_knots_2[:, 0:7].T)

In [None]:
robot = RobotStation('/work/scene.dmd.yaml')

INFO:drake:PackageMap: Downloading https://github.com/RobotLocomotion/models/archive/fe5326c5ffc36fda12c58883d22d29dc86009d65.tar.gz


In [None]:
robot.simulate_trajectory(q_traj, g_traj, 10)

In [None]:
rng = np.random.default_rng(135)  # random number generator
point_cloud_wo_table = remove_plane(robot.get_point_cloud(), ransac, rng, 1e-3)

In [None]:
cleaver_mesh = trimesh.load('/work/cleaver.obj')
cleaver_hull = cleaver_mesh.convex_hull
cleaver_sample_points = cleaver_hull.sample(point_cloud_wo_table.shape[1])
cleaver_point_cloud = np.transpose(np.array(cleaver_sample_points))
cleaver_point_cloud *= 0.001

In [None]:
icp(point_cloud_wo_table, cleaver_point_cloud, max_iterations=30, tolerance=1e-5)

(RigidTransform(
   R=RotationMatrix([
     [0.16370920128972216, -0.9777250865544246, 0.13135049499421342],
     [0.9863017462537642, 0.1649443176465196, -0.001495798415273276],
     [-0.020203038134312103, 0.12979609854794386, 0.9913348627239315],
   ]),
   p=[2.456705959816547, 0.01801972770935442, -0.4949989134460297],
 ),
 2.671241560033635,
 10)

In [None]:
# def draw_diagram():
#     system = RobotStation('/work/scene.dmd.yaml').diagram
#     display(
#         SVG(
#             pydot.graph_from_dot_data(system.GetGraphvizString(max_depth=2))[
#                 0
#             ].create_svg()
#         )
#     )


# draw_diagram()

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=50b7e7e2-5bbe-4c2a-b831-ce519bfa4464' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>