# Monday Demo: Adaptive Admittance Control on Human Manikin

UR5e slides along the manikin's curved chest while maintaining contact force using:
- **Sensor-based normal estimation** (proprioception from contact forces)
- **Admittance control** (force error → velocity command, hardware-compatible)

In [None]:
import numpy as np
from pathlib import Path
from pydrake.all import (
    DiagramBuilder, Integrator, JacobianWrtVariable, LeafSystem, Multiplexer,
    RigidTransform, RotationMatrix, Simulator, StartMeshcat, TrajectorySource,
    PiecewisePolynomial, PiecewisePose, Rgba, AbstractValue, ContactResults, BasicVector
)
from manipulation import running_as_notebook
from manipulation.station import LoadScenario, MakeHardwareStation
from ur5e_description.ur5e_driver import UR5E_PACKAGE_XML

# Package paths
HUMAN_GAZEBO_PKG = str(Path.cwd() / "human-gazebo" / "package.xml")

# Control parameters
FORCE_DES = 10.0         # Desired force into chest (N)
K_ADMITTANCE = 0.005     # Admittance gain (m/s per Newton)
KP_POS = 4.0             # Position tracking gain (1/s)
KP_ORI = 2.0             # Orientation alignment gain (1/s)
ALPHA_FILTER = 0.1       # Normal estimate filter smoothing

# Contact positions - adjusted to be on chest (sternum area)
X_CONTACT = 0.80         # X position
Z_CONTACT = 0.18         # Z height
Y_START, Y_END = -0.3, -0.2  # Y-axis travel range

# FSM Timing
T_APPROACH = 3.0         # Time to reach pre-contact position
T_DESCEND = 4.0          # Time to descend to contact
T_SLIDE_END = 8.0        # Time when slide ends
T_LIFT = 9.0             # Time when lift completes
T_FINAL = 12.0           # Total duration

In [None]:
meshcat = StartMeshcat()

## Adaptive Admittance Controller

**Key Innovation: Sensor-Based Normal Estimation (MIT Ch. 9)**

> "The geometry of a constraint is revealed by the forces it exerts."

- **In contact:** Surface normal ≈ direction of reaction force (proprioception)
- **Approaching:** Fallback to cylinder model (geometric heuristic)

**Control Law:**
- **Normal direction (force):** Admittance control → velocity into/out of surface
- **Tangent direction (motion):** Position tracking → follow purple line
- **Output:** Joint velocities (compatible with UR5e speedJ)

In [None]:
class AdaptiveAdmittanceController(LeafSystem):
    """FSM-Aware Adaptive Admittance Controller.
    
    Control modes based on FSM phase:
    - Non-contact phases (0-4s, 8-12s): Pure position tracking (pseudoinverse IK)
    - Contact phase (4-8s): Admittance control with sensor-based normal estimation
    
    Uses proprioception (contact forces) to estimate surface normal in real-time.
    """
    
    def __init__(self, plant, trajectory, hand_body_name="tcp", force_des=FORCE_DES):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._trajectory = trajectory
        self._force_des = force_des
        
        # Frame lookups - use tcp from netft_probe
        self._ur5e = plant.GetModelInstanceByName("ur5e")
        self._hand_body = plant.GetBodyByName(hand_body_name, plant.GetModelInstanceByName("netft_probe"))
        self._hand_body_index = self._hand_body.index()
        self._ee_frame = self._hand_body.body_frame()
        self._W = plant.world_frame()
        
        # Get UR5e velocity indices for Jacobian column extraction
        v_start = plant.GetVelocitiesFromArray(self._ur5e, np.arange(plant.num_velocities()))
        self._v_indices = [int(i) for i in v_start]
        
        # Gains
        self._k_admittance = K_ADMITTANCE  # (m/s per Newton) - "Compliance"
        self._kp_pos = KP_POS              # (1/s) - Position tracking
        self._kp_ori = KP_ORI              # (1/s) - Orientation alignment
        self._alpha = ALPHA_FILTER         # Filter smoothing (0.0=freeze, 1.0=raw)
        
        # DISCRETE STATE: Estimated Normal [nx, ny, nz]
        # Initial guess: Pointing back along -X (toward mannequin)
        self._normal_idx = self.DeclareDiscreteState([-1.0, 0.0, 0.0])
        
        # I/O Ports
        self.DeclareVectorInputPort("robot_state", 12)  # UR5e: [q(6), v(6)]
        self.DeclareAbstractInputPort("contact_results", AbstractValue.Make(ContactResults()))
        # OUTPUT: Joint Velocities (Safe for UR5e hardware)
        self.DeclareVectorOutputPort("joint_velocities", 6, self.CalcOutput)
        
        # Update Normal Estimate at 1kHz
        self.DeclarePeriodicDiscreteUpdateEvent(0.001, 0.0, self.UpdateNormal)
    
    def UpdateNormal(self, context, discrete_state):
        """Estimate surface normal from contact forces (proprioception)."""
        # Only update during contact phase
        t_now = context.get_time()
        if not (T_DESCEND <= t_now <= T_SLIDE_END):
            return  # Skip during non-contact phases
        
        # 1. Sense Force (Proprioception)
        contact_results = self.get_input_port(1).Eval(context)
        measured_n = np.zeros(3)
        found_contact = False
        
        for i in range(contact_results.num_point_pair_contacts()):
            info = contact_results.point_pair_contact_info(i)
            if info.bodyB_index() == self._hand_body_index:
                measured_n = np.array(info.contact_force())
                found_contact = True
                break
            elif info.bodyA_index() == self._hand_body_index:
                measured_n = -np.array(info.contact_force())
                found_contact = True
                break
        
        # 2. Fuse Sensor + Model
        if found_contact and np.linalg.norm(measured_n) > 0.5:
            # SENSOR MODE: Normal is aligned with the reaction force
            target_n = measured_n / np.linalg.norm(measured_n)
        else:
            # FALLBACK MODE: Cylinder Heuristic (Chest approximation)
            state = self.get_input_port(0).Eval(context)
            self._plant.SetPositions(self._plant_context, self._ur5e, state[:6])
            p_curr = self._plant.CalcRelativeTransform(
                self._plant_context, self._W, self._ee_frame
            ).translation()
            
            center_axis = np.array([0.8, p_curr[1], 0.2])
            vec = p_curr - center_axis
            vec[1] = 0.0  # Force normal to be in X-Z plane
            target_n = vec / (np.linalg.norm(vec) + 1e-6)
        
        # 3. Low-Pass Filter
        current_n = discrete_state.get_value(self._normal_idx).copy()
        new_n = (1 - self._alpha) * current_n + self._alpha * target_n
        new_n = new_n / (np.linalg.norm(new_n) + 1e-6)
        discrete_state.set_value(self._normal_idx, new_n)
    
    def CalcOutput(self, context, output):
        """Compute joint velocities - FSM-aware control."""
        t_now = context.get_time()
        
        # 1. Update Context
        state = self.get_input_port(0).Eval(context)
        q = state[:6]
        self._plant.SetPositions(self._plant_context, self._ur5e, q)
        
        # 2. Kinematics
        X_WE = self._plant.CalcRelativeTransform(self._plant_context, self._W, self._ee_frame)
        p_curr = X_WE.translation()
        R_curr = X_WE.rotation()
        
        J_full = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context, JacobianWrtVariable.kQDot,
            self._ee_frame, [0, 0, 0], self._W, self._W
        )
        J_G = J_full[:, self._v_indices]  # Extract UR5e columns
        
        # 3. Get desired pose from trajectory
        p_des = self._trajectory.value(t_now).flatten()[:3]
        v_des = self._trajectory.derivative(1).value(t_now).flatten()[:3]
        
        # ===== FSM PHASE DETECTION =====
        in_contact_phase = T_DESCEND <= t_now <= T_SLIDE_END
        
        if in_contact_phase:
            # ===== ADMITTANCE CONTROL (Contact Phase) =====
            n = context.get_discrete_state(self._normal_idx).get_value().copy()
            
            # Project trajectory velocity onto surface tangent
            v_tangent = v_des - np.dot(v_des, n) * n
            
            # Admittance Law: Force -> Velocity
            contact_results = self.get_input_port(1).Eval(context)
            F_meas = 0.0
            for i in range(contact_results.num_point_pair_contacts()):
                info = contact_results.point_pair_contact_info(i)
                if info.bodyB_index() == self._hand_body_index or \
                   info.bodyA_index() == self._hand_body_index:
                    F_meas = np.linalg.norm(info.contact_force())
                    break
            
            F_err = self._force_des - F_meas
            v_force = -n * (self._k_admittance * F_err)
            
            # Position correction (tangent only)
            p_err = p_des - p_curr
            p_err_tangent = p_err - np.dot(p_err, n) * n
            v_pos_corr = self._kp_pos * p_err_tangent
            
            # Orientation: Align Tool-Z with -n
            z_tool = R_curr.matrix()[:, 2]
            ori_err = np.cross(z_tool, -n)
            v_ang = self._kp_ori * ori_err
            
            v_linear = v_tangent + v_force + v_pos_corr
        else:
            # ===== POSITION TRACKING (Non-Contact Phases) =====
            # Simple position + feedforward velocity tracking
            p_err = p_des - p_curr
            v_linear = v_des + self._kp_pos * p_err
            
            # Orientation: Align with trajectory (Z pointing down for contact pose)
            R_contact = RotationMatrix.MakeXRotation(np.pi)
            z_des = R_contact.matrix()[:, 2]  # [0, 0, -1]
            z_tool = R_curr.matrix()[:, 2]
            ori_err = np.cross(z_tool, z_des)
            v_ang = self._kp_ori * ori_err
        
        # 4. Differential IK (Damped Least Squares)
        V_spatial = np.hstack([v_ang, v_linear])
        lambda_sq = 0.01
        q_dot = J_G.T @ np.linalg.inv(J_G @ J_G.T + lambda_sq * np.eye(6)) @ V_spatial
        
        output.SetFromVector(q_dot)

In [None]:
def MakeFullTrajectory(X_TCP_initial):
    """Creates full FSM trajectory starting from robot's initial pose.
    
    Phases:
    - 0 → T_APPROACH:  HOME → PRE-CONTACT (move to above contact start)
    - T_APPROACH → T_DESCEND: Slow Z descent to contact
    - T_DESCEND → T_SLIDE_END: SLIDE along Y (admittance active)
    - T_SLIDE_END → T_LIFT: Lift up (post-contact)
    - T_LIFT → T_FINAL: Return to HOME
    
    Returns:
        trajectory: PiecewisePose trajectory
        position_traj: PiecewisePolynomial for position only (for controller)
    """
    # Contact orientation: Z pointing down (into chest)
    R_contact = RotationMatrix.MakeXRotation(np.pi)
    
    # Keyframe poses
    X_TCP = {
        "initial": X_TCP_initial,
        "pre_contact": RigidTransform(R_contact, [X_CONTACT, Y_START, Z_CONTACT + 0.05]),
        "contact_start": RigidTransform(R_contact, [X_CONTACT, Y_START, Z_CONTACT]),
        "contact_end": RigidTransform(R_contact, [X_CONTACT, Y_END, Z_CONTACT]),
        "post_contact": RigidTransform(R_contact, [X_CONTACT, Y_END, Z_CONTACT + 0.05]),
        "final": X_TCP_initial,
    }
    
    # FSM timing (from constants)
    times = {
        "initial": 0.0,
        "pre_contact": T_APPROACH,
        "contact_start": T_DESCEND,
        "contact_end": T_SLIDE_END,
        "post_contact": T_LIFT,
        "final": T_FINAL,
    }
    
    # Build PiecewisePose trajectory
    order = ["initial", "pre_contact", "contact_start", "contact_end", "post_contact", "final"]
    pose_trajectory = PiecewisePose.MakeLinear(
        [times[k] for k in order],
        [X_TCP[k] for k in order]
    )
    
    # Also create position-only trajectory for controller
    positions = np.array([X_TCP[k].translation() for k in order]).T  # Shape: (3, 6)
    position_trajectory = PiecewisePolynomial.FirstOrderHold(
        [times[k] for k in order], positions
    )
    
    return pose_trajectory, position_trajectory, times

## Setup: UR5e + Human Manikin

In [None]:
# Keep InverseDynamicsDriver - it handles torque control internally
# We feed it desired_state (positions + velocities)
scenario_data = """
directives:
- add_directives:
    file: package://ur5e_description/ur5e_netft_probe.dmd.yaml
- add_model:
    name: human
    file: package://human-gazebo/humanSubjectWithMeshes/humanSubjectWithMesh.urdf
- add_weld:
    parent: world
    child: human::Pelvis
    X_PC:
      translation: [0.8, 0.0, 0.1]
      rotation: !Rpy { deg: [0, -90, 90] }
model_drivers:
    ur5e: !InverseDynamicsDriver {}
visualization:
  publish_contacts: true
"""

meshcat.Delete()
builder = DiagramBuilder()
scenario = LoadScenario(data=scenario_data)

# Custom parser callback to weld ALL human joints before finalize
def weld_human_joints(parser):
    plant = parser.plant()
    human = plant.GetModelInstanceByName("human")
    
    # Set arm positions first
    plant.GetJointByName("jRightShoulder_rotx", human).set_default_angle(np.pi/2)
    plant.GetJointByName("jLeftShoulder_rotx", human).set_default_angle(-np.pi/2)
    
    # Weld all human joints to make manikin completely rigid
    from pydrake.all import WeldJoint, RigidTransform
    joints_to_weld = []
    for joint_idx in plant.GetJointIndices(human):
        joint = plant.get_joint(joint_idx)
        if joint.type_name() != "weld" and joint.num_positions() > 0:
            joints_to_weld.append(joint)
    
    # Remove and replace with welds
    for joint in joints_to_weld:
        parent_frame = joint.frame_on_parent()
        child_frame = joint.frame_on_child()
        joint_name = joint.name()
        
        # Get current default position for the joint
        X_PJ = RigidTransform()  # Identity - weld at default pose
        
        # Remove actuators first
        for actuator_idx in plant.GetJointActuatorIndices():
            actuator = plant.get_joint_actuator(actuator_idx)
            if actuator.joint() == joint:
                plant.RemoveJointActuator(actuator)
        
        plant.RemoveJoint(joint)
        plant.AddJoint(WeldJoint(joint_name + "_welded", parent_frame, child_frame, X_PJ))

station = builder.AddSystem(MakeHardwareStation(
    scenario, meshcat=meshcat, package_xmls=[UR5E_PACKAGE_XML, HUMAN_GAZEBO_PKG],
    parser_prefinalize_callback=weld_human_joints))
plant = station.GetSubsystemByName("plant")
ur5e_instance = plant.GetModelInstanceByName("ur5e")

# Get initial TCP pose from robot's default configuration
temp_context = station.CreateDefaultContext()
tcp_body = plant.GetBodyByName("tcp", plant.GetModelInstanceByName("netft_probe"))
X_TCP_initial = plant.EvalBodyPoseInWorld(plant.GetMyContextFromRoot(temp_context), tcp_body)
print(f"Initial TCP position: {X_TCP_initial.translation()}")
print("Manikin loaded - ALL joints welded (fully rigid body)")

## Wire Control Diagram

In [None]:
# Create FSM trajectory starting from robot's actual pose
pose_trajectory, position_trajectory, times = MakeFullTrajectory(X_TCP_initial)
print(f"Trajectory: HOME → PRE-CONTACT → SLIDE → LIFT → HOME")
print(f"  Slide phase (admittance): {T_DESCEND}s - {T_SLIDE_END}s")
print(f"  Total duration: {T_FINAL}s")

# 1. Admittance Controller (outputs joint velocities)
controller = builder.AddSystem(AdaptiveAdmittanceController(
    plant, position_trajectory, hand_body_name="tcp", force_des=FORCE_DES
))

# 2. Integrator: joint velocities -> joint positions
integrator = builder.AddSystem(Integrator(6))

# 3. Mux: [q_des, v_des] -> desired_state for InverseDynamicsDriver
state_mux = builder.AddSystem(Multiplexer([6, 6]))

# --- CONNECTIONS ---
# Controller Inputs
builder.Connect(station.GetOutputPort("ur5e.state_estimated"), controller.get_input_port(0))
builder.Connect(station.GetOutputPort("contact_results"), controller.get_input_port(1))

# Velocity -> Position (Integration)
builder.Connect(controller.get_output_port(0), integrator.get_input_port())

# [q_des, v_des] -> InverseDynamicsDriver
builder.Connect(integrator.get_output_port(), state_mux.get_input_port(0))      # q_des
builder.Connect(controller.get_output_port(0), state_mux.get_input_port(1))     # v_des (Feedforward)
builder.Connect(state_mux.get_output_port(), station.GetInputPort("ur5e.desired_state"))

diagram = builder.Build()
print("Diagram built with FSM-aware Adaptive Admittance Controller")

## Run Simulation

In [None]:
simulator = Simulator(diagram)
context = simulator.get_mutable_context()

# Initialize integrator to current robot position
q0 = plant.GetPositions(plant.GetMyContextFromRoot(context), ur5e_instance)
integrator.set_integral_value(integrator.GetMyContextFromRoot(context), q0)

# Visualize full trajectory path
traj_times = np.linspace(0, T_FINAL, 100)
traj_pts = np.array([position_trajectory.value(t).flatten() for t in traj_times]).T
meshcat.SetLine("full_trajectory", traj_pts, 2.0, rgba=Rgba(0.3, 0.3, 0.3))  # Gray for full path

# Highlight slide phase (purple) - where admittance is active
slide_times = np.linspace(T_DESCEND, T_SLIDE_END, 50)
slide_pts = np.array([position_trajectory.value(t).flatten() for t in slide_times]).T
meshcat.SetLine("slide_path", slide_pts, 4.0, rgba=Rgba(0.5, 0, 0.5))  # Purple for contact phase

# Run
diagram.ForcedPublish(context)
meshcat.StartRecording(set_visualizations_while_recording=False)
simulator.AdvanceTo(T_FINAL if running_as_notebook else 0.1)
meshcat.PublishRecording()
print(f"Done! Total duration: {T_FINAL}s")
print("  - Gray line: Full trajectory")
print("  - Purple line: Slide phase (admittance active)")
print("Check meshcat for playback.")