In [9]:
import random

from pydrake.all import (
    DiagramBuilder,
    LeafSystem,
    Simulator,
    StartMeshcat,
)

from manipulation.station import (
    LoadScenario,
    MakeHardwareStation,
)

In [10]:
# Start meshcat for visualization
meshcat = StartMeshcat()
print("Click the link above to open Meshcat in your browser!")

INFO:drake:Meshcat listening for connections at http://localhost:7001


Click the link above to open Meshcat in your browser!


In [11]:
class RestingMoleController(LeafSystem):
    """
    Keeps all moles at a fixed position.
    For rest_height = 0.0, the mole base sits flush with the socket.
    """

    def __init__(self, plant, mole_joints, rest_height=0.):
        super().__init__()

        self.plant = plant
        self.mole_joints = mole_joints
        self.rest_height = rest_height   # height for ALL moles (usually 0.0)

        # Input: full plant state [q; v]
        self.state_in = self.DeclareVectorInputPort(
            "x", plant.num_multibody_states()
        )

        # One actuation output per mole
        self.mole_out = {}
        for ij, joint in mole_joints.items():
            self.mole_out[ij] = self.DeclareVectorOutputPort(
                f"mole_{ij[0]}_{ij[1]}_u", 1,
                (lambda ctx, out, ij=ij: self._CalcMoleControl(ctx, out, ij))
            )

    def _CalcMoleControl(self, context, output, ij):
        # Desired height is constant for all moles
        target = self.rest_height

        # Extract plant state
        x = self.state_in.Eval(context)
        nq = self.plant.num_positions()
        q = x[:nq]
        v = x[nq:]

        joint = self.mole_joints[ij]
        qj = q[joint.position_start()]
        vj = v[joint.velocity_start()]

        # PD control
        kp = 600.0
        kd = 12.0
        u = kp * (target - qj) - kd * vj

        output.SetFromVector([u])

In [12]:
class PoppingMoleController(LeafSystem):
    """
    A controller that keeps all moles resting at joint height = 0.0 (base touching socket)
    and repeatedly selects ONE mole at random to rise to a commanded height.
    """

    def __init__(self, plant, mole_joints,
                 rise_height=0.09,
                 rest_height=0.0,
                 min_up=1.5, max_up=1.5):
        super().__init__()

        self.plant = plant
        self.mole_joints = mole_joints
        self.rise_height = rise_height
        self.rest_height = rest_height

        self.min_up = min_up
        self.max_up = max_up

        # ---------------------------
        # State machine
        # ---------------------------
        # Pick the first active mole
        self.active = random.choice(list(mole_joints.keys()))
        # Set the time when we will switch to the next mole
        self.next_switch_time = random.uniform(min_up, max_up)

        # Input: full plant state [q; v]
        self.state_in = self.DeclareVectorInputPort(
            "x", plant.num_multibody_states()
        )

        # Output: one actuation per mole
        self.mole_out = {}
        for ij, joint in mole_joints.items():
            self.mole_out[ij] = self.DeclareVectorOutputPort(
                f"mole_{ij[0]}_{ij[1]}_u", 1,
                (lambda ctx, out, ij=ij: self._CalcMoleControl(ctx, out, ij))
            )

    # --------------------------------------------------------------
    # Helper: pick a new mole different from the current one
    # --------------------------------------------------------------
    def _choose_new_mole(self):
        keys = list(self.mole_joints.keys())
        keys.remove(self.active)
        return random.choice(keys)

    # --------------------------------------------------------------
    # Main PD control calculation
    # --------------------------------------------------------------
    def _CalcMoleControl(self, context, output, ij):

        t = context.get_time()

        # --------------------------------------------
        # Random state switching: one mole at a time
        # --------------------------------------------
        if t >= self.next_switch_time and ij == self.active:
            self.active = self._choose_new_mole()
            self.next_switch_time = t + random.uniform(self.min_up, self.max_up)

        # --------------------------------------------
        # Target height:
        #   active mole → rise_height
        #   others      → rest_height (0.0)
        # --------------------------------------------
        target = self.rise_height if ij == self.active else self.rest_height

        # --------------------------------------------
        # Extract q and v for this mole
        # --------------------------------------------
        x = self.state_in.Eval(context)
        nq = self.plant.num_positions()
        q = x[:nq]
        v = x[nq:]

        joint = self.mole_joints[ij]
        qj = q[joint.position_start()]
        vj = v[joint.velocity_start()]

        # --------------------------------------------
        # PD control
        # --------------------------------------------
        kp = 600.0
        kd = 12.0
        u = kp * (target - qj) - kd * vj

        output.SetFromVector([u])

In [13]:
HAMMER_SDF_PATH = "/workspaces/whackamole/bonkbot/sim/assets/hammer.sdf"
GRID_SDF_PATH = "/workspaces/whackamole/bonkbot/sim/assets/grid_board.sdf"
MOLE_SDF_PATH = "/workspaces/whackamole/bonkbot/sim/assets/mole.sdf"

scenario_string = f"""directives:

# ===============================================================
# IIWA
# ===============================================================
- add_model:
    name: iiwa
    file: package://drake_models/iiwa_description/urdf/iiwa14_primitive_collision.urdf
    default_joint_positions:
      iiwa_joint_1: [-1.57]
      iiwa_joint_2: [0.1]
      iiwa_joint_3: [0]
      iiwa_joint_4: [-1.2]
      iiwa_joint_5: [0]
      iiwa_joint_6: [1.6]
      iiwa_joint_7: [0]

- add_weld:
    parent: world
    child: iiwa::iiwa_link_0

# ===============================================================
# Hammer
# ===============================================================
- add_model:
    name: hammer
    file: file://{HAMMER_SDF_PATH}
    default_free_body_pose:
      hammer_link:
        translation: [0, 0, 0]
        rotation: !Rpy {{ deg: [0, 0, 0] }}

- add_weld:
    parent: iiwa::iiwa_link_7
    child: hammer::hammer_link
    X_PC:
      translation: [0, 0, 0.06]
      rotation: !Rpy {{deg: [0, -90, 0] }}

# ===============================================================
# Grid Board + Moles
# ===============================================================
- add_model:
    name: grid_board
    file: file://{GRID_SDF_PATH}

- add_weld:
    parent: world
    child: grid_board::board
    X_PC:
      translation: [0, -0.75, 0.25]
      rotation: !Rpy {{ deg: [0, 0, 0] }}

# 3x3 mole grid
- add_model:
    name: mole_0_0
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_0_0::socket
    X_PC: {{translation: [-0.2, -0.2, 0.0125]}}

- add_model:
    name: mole_0_1
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_0_1::socket
    X_PC: {{translation: [0.0, -0.2, 0.0125]}}

- add_model:
    name: mole_0_2
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_0_2::socket
    X_PC: {{translation: [0.2, -0.2, 0.0125]}}

- add_model:
    name: mole_1_0
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_1_0::socket
    X_PC: {{translation: [-0.2, 0.0, 0.0125]}}

- add_model:
    name: mole_1_1
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_1_1::socket
    X_PC: {{translation: [0.0, 0.0, 0.0125]}}

- add_model:
    name: mole_1_2
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_1_2::socket
    X_PC: {{translation: [0.2, 0.0, 0.0125]}}

- add_model:
    name: mole_2_0
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_2_0::socket
    X_PC: {{translation: [-0.2, 0.2, 0.0125]}}

- add_model:
    name: mole_2_1
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_2_1::socket
    X_PC: {{translation: [0.0, 0.2, 0.0125]}}

- add_model:
    name: mole_2_2
    file: file://{MOLE_SDF_PATH}
- add_weld:
    parent: grid_board::board
    child: mole_2_2::socket
    X_PC: {{translation: [0.2, 0.2, 0.0125]}}

model_drivers:
  iiwa: !IiwaDriver
    control_mode: position_only
"""

In [14]:
# Load scenario and create station system
scenario = LoadScenario(data=scenario_string)
station_sys = MakeHardwareStation(scenario, meshcat=meshcat)

# Build diagram
builder = DiagramBuilder()
station = builder.AddSystem(station_sys)

# Get plant
plant = station.GetSubsystemByName("plant")

# Build mole joints dictionary
mole_joints = {}
for i in range(3):
    for j in range(3):
        model_name = f"mole_{i}_{j}"
        instance = plant.GetModelInstanceByName(model_name)
        joint = plant.GetJointByName("mole_slider", instance)
        mole_joints[(i, j)] = joint

# Add mole controller
controller = builder.AddSystem(
    PoppingMoleController(
        plant,
        mole_joints,
        rise_height=0.09,
        rest_height=0.0,
        min_up=3,
        max_up=3
    )
)

# Connect plant state → controller
builder.Connect(
    station.GetOutputPort("state"),
    controller.state_in
)

# Connect each mole controller output → station actuation port
for i in range(3):
    for j in range(3):
        builder.Connect(
            controller.mole_out[(i, j)],
            station.GetInputPort(f"mole_{i}_{j}_actuation")
        )

In [15]:
# Build final diagram
diagram = builder.Build()

# Create context
root_context = diagram.CreateDefaultContext()

# Plant context
plant_context = plant.GetMyContextFromRoot(root_context)

# Fix iiwa to default position
iiwa = plant.GetModelInstanceByName("iiwa")
q0 = plant.GetPositions(plant_context, iiwa)

station_context = station.GetMyMutableContextFromRoot(root_context)

# Correct input name for IIWA position driver
if station.HasInputPort("iiwa_position"):
    station.GetInputPort("iiwa_position").FixValue(station_context, q0)
elif station.HasInputPort("iiwa.position"):
    station.GetInputPort("iiwa.position").FixValue(station_context, q0)

# Run simulation
simulator = Simulator(diagram, root_context)
meshcat.StartRecording()

simulator.set_target_realtime_rate(1.0)
simulator.AdvanceTo(10.0)

meshcat.StopRecording()
meshcat.PublishRecording()

In [16]:
# meshcat.Delete()

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=2500cebb-ccf6-40e5-a2c5-6c1beb3b7769' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>