In [19]:
from pydrake.all import (DiagramBuilder, RigidTransform, RotationMatrix, RollPitchYaw, ProximityProperties, 
                         AddContactMaterial, UnitInertia, CoulombFriction,
                         SpatialInertia, Parser, FindResourceOrThrow, Simulator, Cylinder,
                         FixedOffsetFrame, Sphere, BasicVector, AbstractValue, JacobianWrtVariable,
                         LeafSystem, PiecewisePose, LogVectorOutput, RollPitchYaw,
                         Quaternion, EventStatus, DrakeVisualizer,
                         MultibodyPlantConfig, AddMultibodyPlant, AddCompliantHydroelasticProperties,
                         DrakeVisualizerParams, Role, ConnectContactResultsToDrakeVisualizer
                         )

import numpy as np

In [20]:
# peg parameter
peg_length = 0.07
peg_width = 0.02
peg_mass = 0.1
peg_props = ProximityProperties()
mu = 1.0
dissipation = 1.0
hydroelastic_modulus = 5e7
resolution_hint_factor = 1e-15

AddContactMaterial(dissipation=dissipation,
                   friction=CoulombFriction(mu, mu), properties=peg_props)
AddCompliantHydroelasticProperties(peg_width*resolution_hint_factor, hydroelastic_modulus, peg_props)


plant_config = MultibodyPlantConfig()
plant_config.time_step = 1e-3
plant_config.penetration_allowance = 1e-5
# plant_config.contact_model = "hydroelastic_with_fallback"
plant_config.contact_model = "hydroelastic"
plant_config.contact_surface_representation = "polygon"

config = {"peg_length": peg_length, "peg_width": peg_width, "peg_mass": peg_mass, "peg_props": peg_props}

In [21]:
def MultiplyQuaternion(q1, q2):
    assert q1.shape == (4,) and q2.shape == (4,)
    s1 = q1[0]
    s2 = q2[0]
    u1 = q1[1:4]
    u2 = q2[1:4]
    q1q2 = np.hstack([s1*s2 - np.dot(u1, u2), s1*u2 + s2*u1 + np.cross(u1, u2)])
    return q1q2

def AddAndWeldModelFrom(model_path, model_name, parent, child_frame_name, X_PC, plant):
    parser = Parser(plant)
    new_model = parser.AddModelFromFile(model_path, model_name)
    child_frame = plant.GetFrameByName(child_frame_name, new_model)
    plant.WeldFrames(parent, child_frame, X_PC)
    return new_model

def normalize(vec):
    norm = np.linalg.norm(vec)
    assert norm != 0
    return vec / np.linalg.norm(vec)

def VecToQuaternion(vec):
    assert vec.shape == (4,)
    return Quaternion(normalize(vec))

def setup_plant(builder, plant, config):
    peg_length = config["peg_length"]
    peg_width = config["peg_width"]
    peg_mass = config["peg_mass"]
    peg_props = config["peg_props"]

    # add kuka iiwa to the scene
    iiwa_sdf = FindResourceOrThrow(
        "drake/manipulation/models/iiwa_description/sdf/iiwa14_no_collision.sdf")
    iiwa_index = AddAndWeldModelFrom(iiwa_sdf, "iiwa", plant.world_frame(),
                                     "iiwa_link_0", RigidTransform(), plant)

    # add table to the scene
    table_sdf = FindResourceOrThrow(
        "drake/examples/kuka_iiwa_arm/models/table/extra_heavy_duty_table_surface_only_collision.sdf")
    table_height = 0.7645
    robot_table_index = AddAndWeldModelFrom(table_sdf, "robot_table", plant.world_frame(),
                                            "link", RigidTransform([0, 0, -table_height]), plant)
    work_table_index = AddAndWeldModelFrom(table_sdf, "work_table", plant.world_frame(),
                                           "link", RigidTransform([0.75, 0, -table_height]), plant)
    robot_table_index = None
    work_table_index = None

    # add hole to the scene
    hole_urdf = "./model/hole.urdf"
    hole_index = AddAndWeldModelFrom(hole_urdf, "hole", plant.world_frame(),
                                     "base_link", RigidTransform(RotationMatrix.MakeXRotation(np.pi/2), [0.5, 0, 0.08]), plant)
    # AddMultibodyTriad(plant.GetFrameByName("base_link", hole), scene_graph)
    plant.RegisterVisualGeometry(plant.GetBodyByName("base_link", hole_index), RigidTransform(),
                                 Sphere(0.002), "hole_center", [1.0, 0., 0., 1.])
    hole_frame = plant.AddFrame(FixedOffsetFrame("hole_frame", plant.GetFrameByName("base_link", hole_index),
                                                 RigidTransform(RotationMatrix.MakeXRotation(-np.pi/2))))

    # add peg to the scene
    G_PPcm_P = UnitInertia.SolidCylinder(peg_width*0.5, peg_length)
    M_PPcm_P = SpatialInertia(peg_mass, [0, 0, 0], G_PPcm_P)
    peg_geom = Cylinder(peg_width*0.5, peg_length)

    peg = plant.AddRigidBody('peg', iiwa_index, M_PPcm_P)

    plant.RegisterVisualGeometry(peg, RigidTransform(), peg_geom, "peg", [
        31./255., 151./255., 217./255., .5])

    plant.RegisterCollisionGeometry(
        peg, RigidTransform(), peg_geom, "peg", peg_props)

    # plant.RegisterCollisionGeometry(
    #     peg, RigidTransform([-peg_width/2, 0, peg_width/2]), Sphere(1e-5), "collision_sphere", peg_props)

    plant.WeldFrames(plant.GetFrameByName("iiwa_link_7"),
                     peg.body_frame(), RigidTransform([0, 0, 0.1]))
    stiffness_frame = plant.AddFrame(FixedOffsetFrame(
        "stiffness_frame", peg.body_frame(), RigidTransform(RotationMatrix.MakeXRotation(-np.pi), [0, 0, peg_length/2])))

    plant.RegisterVisualGeometry(peg, RigidTransform(
        [0, 0, peg_length/2]), Sphere(0.002), "peg_tip", [1.0, 0., 0., 1.])

    # AddMultibodyTriad(plant.GetFrameByName("iiwa_link_0"), scene_graph)
    plant.Finalize()
    model_index = {"iiwa": iiwa_index, "hole": hole_index,
                   "robot_table": robot_table_index, "work_table": work_table_index}
    return plant, model_index

In [22]:
class CartesianImpedanceController(LeafSystem):
    def __init__(self, plant, model_index):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa_index = model_index["iiwa"]
        self._S = plant.GetFrameByName("stiffness_frame", self._iiwa_index)
        self._W = plant.world_frame()
        self._Kp_v = np.diag([300, 300, 300])
        self._Kd_v = np.diag([100, 100, 100])
        self._Kp_omega = np.diag([0.5, 0.5, 0.5])
        self._Kd_omega = np.diag([0.7, 0.7, 0.7])

        self.DeclareVectorInputPort("r_theta_f", BasicVector(3))
        self.DeclareVectorInputPort("q_v", BasicVector(14))
        self.DeclareAbstractInputPort(
            "X_WSd", AbstractValue.Make(RigidTransform()))

        self.DeclareVectorOutputPort("tau", BasicVector(7), self.TauCalcOutput)

        # Log of trajectory tracking result. The output is a (14,) vector. With output[0:4] as Quaternion(X_WS),
        # output[4:7] as translation(X_WS), output[7:11] as Quaternion(X_WSd) and output[11:14] as translation(X_WSd)
        self.DeclareVectorOutputPort(
            "log", BasicVector(14), self.LogCalcOutput)
        # Log of trajectory tracking result. Similar format with `log`, except that the rotation is represented in rpy
        self.DeclareVectorOutputPort(
            "log_rpy", BasicVector(12), self.LogRPYCalcOutput)

    def TauCalcOutput(self, context, output):
        r_theta_f = self.get_input_port(0).Eval(context)
        r = r_theta_f[0]
        theta = r_theta_f[1]
        f = r_theta_f[2]

        q_v = self.get_input_port(1).Eval(context)
        if not np.allclose(q_v, plant.GetPositionsAndVelocities(self._plant_context)):
            self._plant.SetPositionsAndVelocities(self._plant_context, q_v)

        X_WS = self._S.CalcPose(self._plant_context, self._W)
        X_WSd = self.get_input_port(2).Eval(context)

        tau_g = self._plant.CalcGravityGeneralizedForces(
            self._plant_context).reshape(-1)
        tau_bias = self._plant.CalcBiasTerm(self._plant_context).reshape(-1)

        J_v = self._plant.CalcJacobianTranslationalVelocity(self._plant_context, JacobianWrtVariable.kQDot,
                                                            self._S, [0, 0, 0], self._W, self._W)
        J_omega = self._plant.CalcJacobianAngularVelocity(self._plant_context, JacobianWrtVariable.kQDot,
                                                          self._S, self._W, self._W)

        V_WS_W = self._S.CalcSpatialVelocity(
            self._plant_context, self._W, self._W)
        v_WS_W = V_WS_W.translational()
        omega_WS_W = V_WS_W.rotational()

        p_WS = X_WS.translation()
        p_WSd = X_WSd.translation()
        R_WS = X_WS.rotation()
        R_WSd = X_WSd.rotation()
        R_SdS = R_WSd.inverse() @ R_WS
        R_SdS_angle_axis = R_SdS.ToAngleAxis()
        R_SdS_omegaHat = R_SdS_angle_axis.axis()
        R_SdS_theta = R_SdS_angle_axis.angle()

        tau_pd_v = J_v.T @ (self._Kp_v @ (p_WSd - p_WS) - self._Kd_v @ v_WS_W)
        tau_pd_omega = J_omega.T @ (- self._Kp_omega @
                                    (R_SdS_omegaHat * R_SdS_theta) - self._Kd_omega @ omega_WS_W)

        p_SSa = np.array([r*np.cos(theta), r*np.sin(theta), 0.])
        J_a_v = self._plant.CalcJacobianTranslationalVelocity(self._plant_context, JacobianWrtVariable.kQDot,
                                                              self._S, p_SSa, self._W, self._S)
        F_ff = np.array([0., 0., -f])
        tau_ff = J_a_v.T @ F_ff

        tau = -tau_g + tau_bias + tau_pd_v + tau_pd_omega + tau_ff
        output.SetFromVector(tau)

    def LogCalcOutput(self, context, output):
        q_v = self.get_input_port(1).Eval(context)
        if not np.allclose(q_v, plant.GetPositionsAndVelocities(self._plant_context)):
            self._plant.SetPositionsAndVelocities(self._plant_context, q_v)
        X_WS = self._S.CalcPose(self._plant_context, self._W)
        X_WSd = self.GetInputPort("X_WSd").Eval(context)
        log = np.hstack([X_WS.rotation().ToQuaternion().wxyz(), X_WS.translation(),
                        X_WSd.rotation().ToQuaternion().wxyz(), X_WSd.translation()])
        output.set_value(log)

    def LogRPYCalcOutput(self, context, output):
        q_v = self.get_input_port(1).Eval(context)
        if not np.allclose(q_v, plant.GetPositionsAndVelocities(self._plant_context)):
            self._plant.SetPositionsAndVelocities(self._plant_context, q_v)
        X_WS = self._S.CalcPose(self._plant_context, self._W)
        X_WSd = self.GetInputPort("X_WSd").Eval(context)

        log = np.hstack([RollPitchYaw(X_WS.rotation()).vector(), X_WS.translation(),
                        RollPitchYaw(X_WSd.rotation()).vector(), X_WSd.translation()])

        output.set_value(log)


class PoseTrajectorySource(LeafSystem):
    def __init__(self, traj):
        LeafSystem.__init__(self)
        self.traj = traj

        self.DeclareAbstractOutputPort("X_WSd", lambda: AbstractValue.Make(
            RigidTransform()), self.PoseCalcOutput)

    def PoseCalcOutput(self, context, output):
        t = context.get_time()
        output.set_value(self.traj.GetPose(t))


def make_trajectory(X_WS, times):
    sample_times = []
    poses = []
    for name in ["initial", "middle", "goal"]:
        sample_times.append(times[name])
        poses.append(X_WS[name])
    return PiecewisePose.MakeLinear(sample_times, poses)


class PoseIntegrator(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)
        self.x_index = self.DeclareContinuousState(7)
        self.state_ticket = self.all_state_ticket()

        self.DeclareVectorInputPort("omega", BasicVector(3))
        self.DeclareVectorInputPort("v", BasicVector(3))

        self.DeclareAbstractOutputPort("pose", lambda: AbstractValue.Make(RigidTransform()), self.PoseCalcOutput,
                                       prerequisites_of_calc=set([self.all_state_ticket()]))

    def SetInitialPose(self, context, pose):
        q = pose.rotation().ToQuaternion().wxyz()
        p = pose.translation()
        x_init = np.hstack([q, p])
        x = context.get_mutable_continuous_state_vector()
        x.SetFromVector(x_init)
        # print(x)

    def DoCalcTimeDerivatives(self, context, derivatives):
        omega = self.GetInputPort("omega").Eval(context)
        assert omega.shape == (3,)
        v = self.GetInputPort("v").Eval(context)

        x = context.get_continuous_state_vector().value()
        assert x.shape == (7,)
        q = x[0:4]
        p = x[4:7]
        omega_quaternion = np.hstack([0, omega])
        qdot = 0.5 * MultiplyQuaternion(omega_quaternion, q)
        xdot = np.hstack([qdot, v])

        derivatives.get_mutable_vector().SetFromVector(xdot)

    def PoseCalcOutput(self, context, output):
        x = context.get_continuous_state_vector().value()
        q = VecToQuaternion(x[0:4])
        p = x[4:7]
        pose = RigidTransform(q, p)

        output.set_value(pose)

In [23]:
class VelocitySource(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)

        self.DeclareVectorOutputPort("omega", BasicVector(3), self.OmegaCalcOutput)
        self.DeclareVectorOutputPort("v", BasicVector(3), self.VCalcOutput)

    def VCalcOutput(self, context, output):
        t = context.get_time()
        if t < 2.0:
            v = [0, 0, -0.05]
        else:
            v = [0, 0, 0]
        output.SetFromVector(v)
    
    def OmegaCalcOutput(self, context, output):
        omega = [0, 0, 0]
        output.SetFromVector(omega)

class FeedForwardForceSource(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)
        self.DeclareVectorOutputPort("r_theta_f", BasicVector(3), self.CalcOutput)
    
    def CalcOutput(self, context, output):
        t = context.get_time()
        if t < 4.0:
            r = 0
            theta = 0
            f = 0
        else:
            r = peg_width / 2
            theta = np.pi
            # theta = 0
            # theta = np.pi / 4
            f = 20
        output.SetFromVector([r, theta, f])

In [24]:
builder = DiagramBuilder()
plant, scene_graph = AddMultibodyPlant(plant_config, builder)
plant, model_index = setup_plant(builder, plant, config)

vis_params = DrakeVisualizerParams()
# vis_params.role = Role.kProximity
vis_params.role = Role.kIllustration
visualizer = DrakeVisualizer.AddToBuilder(builder, scene_graph, params=vis_params)
ConnectContactResultsToDrakeVisualizer(builder, plant, scene_graph)


controller = builder.AddSystem(CartesianImpedanceController(plant, model_index))
pose_integrator = builder.AddSystem(PoseIntegrator())
velocity_source = builder.AddSystem(VelocitySource())
force_source = builder.AddSystem(FeedForwardForceSource())

hole_index = model_index["hole"]
H = plant.GetFrameByName("hole_frame", hole_index)
iiwa_index = model_index["iiwa"]
S = plant.GetFrameByName("stiffness_frame", iiwa_index)

builder.Connect(controller.GetOutputPort("tau"), plant.get_actuation_input_port())
builder.Connect(plant.get_state_output_port(), controller.GetInputPort("q_v"))
builder.Connect(velocity_source.GetOutputPort("omega"), pose_integrator.GetInputPort("omega"))
builder.Connect(velocity_source.GetOutputPort("v"), pose_integrator.GetInputPort("v"))
builder.Connect(pose_integrator.GetOutputPort("pose"), controller.GetInputPort("X_WSd"))
builder.Connect(force_source.GetOutputPort("r_theta_f"), controller.GetInputPort("r_theta_f"))
tracking_logger = LogVectorOutput(controller.GetOutputPort("log"), builder, 1e-2)

diagram = builder.Build()
simulator = Simulator(diagram)
context = simulator.get_context()
plant_context = plant.GetMyContextFromRoot(context)
init_q = np.load("./init_q_left.npy")
init_q_v = np.hstack([init_q, np.zeros(7)])
plant.SetPositionsAndVelocities(plant_context, init_q_v)

X_WS = S.CalcPoseInWorld(plant_context)
integrator_context = pose_integrator.GetMyContextFromRoot(context)
pose_integrator.SetInitialPose(integrator_context, X_WS)


# reseted = False
X_WS_before = None
X_WS_after = None
def monitor_reset(context):
    # global reseted
    global X_WS_before
    global X_WS_after
    t = context.get_time()
    # if not reseted and t > 4.2:
    if t == 3.0:
        X_WS = S.CalcPoseInWorld(plant_context)
        X_WSd = RigidTransform(RotationMatrix(), X_WS.translation())
        pose_integrator.SetInitialPose(integrator_context, X_WSd)
        reseted = True
        print("do reset")
    if t == 4.0:
        X_WS_before = S.CalcPoseInWorld(plant_context)
    if t == 8.0:
        X_WS_after = S.CalcPoseInWorld(plant_context)
    return EventStatus.Succeeded()

simulator.set_monitor(monitor_reset)
simulator.set_target_realtime_rate(1.0)
simulator.AdvanceTo(8.0)

do reset


<pydrake.systems.analysis.SimulatorStatus at 0x7f8d9ce406f0>