# Custom Python Arm Actions

## Class Definitions

In [None]:
from dataclasses import dataclass
import logging
from typing import List, Literal, Optional, Tuple
import time
import math

from booster_robotics_sdk_python import (
    B1LowCmdPublisher,
    LowCmd,
    MotorCmd,
    LowCmdType,
    B1JointIndex,
    B1LowStateSubscriber,
    ChannelFactory,
    MotorState
)

DEG_TO_RAD = math.pi / 180.0

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BoosterLowLevelController")

NUM_MOTORS = 23

KP_HARD = 80.0
KP_MED = 40.0
KP_SOFT = 20.0
KD_MED = 2.0
WEIGHT_MED = 1.0

ARM_JOINTS_RIGHT = [
    B1JointIndex.kRightShoulderPitch,  # forward/back
    B1JointIndex.kRightShoulderRoll,  # lateral
    B1JointIndex.kRightElbowPitch,  # extend/flex
    B1JointIndex.kRightElbowYaw,
]
ARM_JOINTS_LEFT = [
    B1JointIndex.kLeftShoulderPitch,  # forward/back
    B1JointIndex.kLeftShoulderRoll,  # lateral
    B1JointIndex.kLeftElbowPitch,  # extend/flex
    B1JointIndex.kLeftElbowYaw,
]

@dataclass
class MotorCommandMaker:
    @staticmethod
    def create_neutral_motor_cmds() -> List[MotorCmd]:
        cmds = []
        for _ in range(NUM_MOTORS):
            mc = MotorCmd()
            mc.mode = 0
            mc.q = 0.0
            mc.dq = 0.0
            mc.tau = 0.0
            mc.kp = 0.0
            mc.kd = 0.0
            mc.weight = 0.0
            cmds.append(mc)
        return cmds

    @staticmethod
    def _get_pd_gains(speed: str):
        "Returns (kp, kd, weight) tuple based on speed setting"
        if speed == "fast":
            return KP_HARD, KD_MED, WEIGHT_MED
        elif speed == "medium":
            return KP_MED, KD_MED, WEIGHT_MED
        else:  # slow
            return KP_SOFT, KD_MED, WEIGHT_MED

    @staticmethod
    def set_targets_rel(
        prev_cmds: List[MotorCmd],
        cmds: List[MotorCmd],
        targets: dict[B1JointIndex, float],
        speed: Optional[Literal["fast", "medium", "slow"]] = "medium",
    ):
        """targets: dict{joint_enum: abs_angle_rad}"""
        kp, kd, weight = MotorCommandMaker._get_pd_gains(speed)
        for j, q_rel in targets.items():
            idx = int(j)
            if 0 <= idx < len(cmds):
                prev_cmds[idx].q = prev_cmds[idx].q + q_rel
                prev_cmds[idx].kp = kp
                prev_cmds[idx].kd = kd
                prev_cmds[idx].weight = weight

    @staticmethod
    def set_targets_abs(
        prev_cmds: List[MotorCmd],
        targets: dict[B1JointIndex, float],
        speed: Optional[Literal["fast", "medium", "slow"]] = "medium",
    ):
        """targets: dict{joint_enum: abs_angle_rad}"""
        kp, kd, weight = MotorCommandMaker._get_pd_gains(speed)
        for j, q_abs in targets.items():
            idx = int(j)
            prev_cmds[idx].q = q_abs
            prev_cmds[idx].kp = kp
            prev_cmds[idx].kd = kd
            prev_cmds[idx].weight = weight


@dataclass
class ArmMotorState:
    arm_side: Literal["right", "left"]  # "left" or "right"
    shoulder_pitch: float  # forward/back
    shoulder_roll: float  # lateral
    elbow_pitch: float  # extend/flex
    elbow_yaw: float  # pronate/supinate
    units: Literal["degrees", "radians"] = "degrees"  # "degrees" or "radians"
    speed: Literal["fast", "medium", "slow"] = "medium"  # "fast", "medium", or "slow"
    target: Literal["relative", "absolute"] = "absolute"  # "relative" or "absolute"

    def get_motor_command(self, prev_motor_commands: List[MotorCmd]) -> List[MotorCmd]:
        if not self.target == "absolute":
            raise NotImplementedError("Relative target not implemented yet")
        if self.units == "degrees":
            self.shoulder_pitch *= DEG_TO_RAD
            self.shoulder_roll *= DEG_TO_RAD
            self.elbow_pitch *= DEG_TO_RAD
            self.elbow_yaw *= DEG_TO_RAD
            self.units = "radians"  # avoid double-conversion if called again

        arm_joints = ARM_JOINTS_RIGHT if self.arm_side == "right" else ARM_JOINTS_LEFT
        targets = {
            arm_joints[0]: self.shoulder_pitch,
            arm_joints[1]: self.shoulder_roll,
            arm_joints[2]: self.elbow_pitch,
            arm_joints[3]: self.elbow_yaw,
        }
        MotorCommandMaker.set_targets_abs(prev_motor_commands, targets)
        return prev_motor_commands

def stringify_motor_cmds(cmds: List[MotorCmd]) -> str:
    s = ""
    for i, cmd in enumerate(cmds):
        s += f"  Motor {i}: mode={cmd.mode}, q={cmd.q:.2f}, dq={cmd.dq:.2f}, tau={cmd.tau:.2f}, kp={cmd.kp:.1f}, kd={cmd.kd:.1f}, weight={cmd.weight:.1f}\n"
    return s

def stringify_motor_states(states: List[MotorState], start: int, stop: int) -> str:
    s = ""
    for i, state in enumerate(states):
        if i < start or i >= stop:
            continue
        s += f"  Motor {i}: q={state.q:.2f}, dq={state.dq:.2f}, mode={state.mode:.2f}, temperature={state.temperature:.1f}\n"
    return s

def stringify_q_values(start: int, stop: int, states: List[MotorState]) -> str:
    s = f"{start} : "
    for i in range(start, stop):
        s += f"{states[i].q:.2f}, "
    return s

@dataclass
class BoosterLowLevelController:
    low_state_msg = None

    def init(self, network_domain=0, network_interface=""):
        # initialize publisher
        logger.info("Initializing BoosterLowLevelController")
        ChannelFactory.Instance().Init(
            domain_id=network_domain, network_interface=network_interface
        )  # set your domain/interface as needed
        self.pub = B1LowCmdPublisher()
        self.pub.InitChannel()
        self.sub = B1LowStateSubscriber(handler=self._grab_low_state_handler)
        self.sub.InitChannel()
        self._prev_motor_cmds = (
            MotorCommandMaker.create_neutral_motor_cmds()
        )  # TODO Replace with a subscriber read

    def close(self):
        logger.info("Closing BoosterLowLevelController")
        self.pub.CloseChannel()

    def _grab_low_state_handler(self, msg):
        self.low_state_msg = msg

    def read_latest_low_state(self, motor_start: int, motor_end: int):
        # wait up to ~1s to get one packet
        for _ in range(500):
            if self.low_state_msg is not None:
                break
            time.sleep(0.01)

        if self.low_state_msg is None:
            raise RuntimeError(
                "Did not receive LowState; check channels / domain / wiring"
            )

        serial_states = self.low_state_msg.motor_state_serial  # list[MotorState]
        serial_cnt = len(serial_states)
        parallel_cnt = len(self.low_state_msg.motor_state_parallel)
        q_values = stringify_q_values(motor_start, motor_end, serial_states)
        print(q_values)
        return q_values
        # logger.info(f"serial motors: {serial_cnt}, parallel motors: {parallel_cnt}")

    def _send_motor_cmds(self, cmds: List[MotorCmd]):
        msg = LowCmd()
        msg.cmd_type = LowCmdType.SERIAL
        msg.motor_cmd = cmds
        if not self.pub.Write(msg):
            raise RuntimeError("Publish LowCmd failed")

    def send_neutral_pose(self):
        cmds = MotorCommandMaker.create_neutral_motor_cmds()
        self._send_motor_cmds(cmds)

    def send_arm_command(self, arm_states: List[Tuple], arm: Literal["right", "left"], time_gap_s=1, units: Literal["radians", "degrees"]="radians", speed: Literal["fast", "medium", "slow"]="medium"):
        "Send a list of (shoulder_pitch, shoulder_roll, elbow_pitch, elbow_yaw) tuples"
        for sp, sr, ep, ey in arm_states:
            arm_state = ArmMotorState(speed=speed, arm_side=arm, shoulder_pitch=sp, shoulder_roll=sr, elbow_pitch=ep, elbow_yaw=ey, units=units)  # validate
            cmds = arm_state.get_motor_command(self._prev_motor_cmds)
            logger.info(f"Sending arm command:\n{stringify_motor_cmds(cmds)}")
            self._send_motor_cmds(cmds)
            self._prev_motor_cmds = cmds
            time.sleep(time_gap_s)

## Usage

In [None]:
from booster_robotics_sdk_python import (
    B1LocoClient,
    RobotMode,
    ChannelFactory
)
ChannelFactory.Instance().Init(domain_id=0, network_interface="")  # set your domain/interface as needed
loco = B1LocoClient()
loco.Init()

In [None]:
ret = loco.ChangeMode(RobotMode.kPrepare)

In [None]:
loco.SwitchHandEndEffectorControlMode(True)

In [None]:
robot = BoosterLowLevelController()
robot.init(network_interface="")

In [None]:
q_str = robot.read_latest_low_state(6, 10)
with open("q_values-right.txt", "a") as f:
    f.write(q_str + "\n")

q_str = robot.read_latest_low_state(2, 6)
with open("q_values-left.txt", "a") as f:
    f.write(q_str + "\n")


In [None]:
import math
DEG_180 = math.pi
DEG_90 = math.pi / 2
DEG_60 = math.pi / 3
DEG_45 = math.pi / 4
DEG_30 = math.pi / 6
DEG_15 = math.pi / 12

# shoulder_pitch, shoulder_roll, elbow_pitch, elbow_yaw
right_arm_states = [
(-0.70, 1.64, 0.56, 1.88), 
( -1.02, 1.47, 0.47, 1.27), 
 (-1.22, 1.47, 0.47, 0.57), 
 (-0.91, 1.44, 0.47, 1.56), 
 (-0.61, 1.50, 0.73, 2.03), 
 (-0.58, 1.50, 0.73, 2.03), 
( -0.58, 1.61, 0.73, 2.03), 
( -0.51, 1.61, 0.73, 2.03), 
( -0.51, 1.61, 0.73, 2.03), 
( -0.51, 1.61, 0.73, 2.03), 
]

left_arm_states = [
  ( -0.57, -1.48, 0.47, -2.04), 
 (-0.64, -1.59, 0.73, -2.04), 
 (-0.64, -1.64, 0.74, -2.04), 
 (-0.64, -1.55, 0.74, -2.04), 
 (-0.65, -1.55, 0.74, -2.04), 
 (-1.00, -1.55, 0.73, -1.61), 
 (-1.13, -1.48, 0.44, -1.16), 
 (-1.35, -1.23, 0.44, -0.58), 
 (-0.94, -1.32, 0.44, -1.48), 
 (-0.65, -1.32, 0.44, -1.91), 

]


In [None]:
robot.send_arm_command(arm_states=right_arm_states, arm="right", time_gap_s=.2, units="radians")
robot.send_arm_command(arm_states=left_arm_states, arm="left", time_gap_s=.05, units="radians")

In [None]:
from booster_robotics_sdk_python import B1RemoteControllerStateSubscriber, RemoteControllerState


EVENT_AXIS, EVENT_HAT, EVENT_BTN_DN, EVENT_BTN_UP, EVENT_REMOVE = (
    0x600,
    0x602,
    0x603,
    0x604,
    0x606,
)


def on_remote(rc: RemoteControllerState):
    ev = rc.event
    # if ev == EVENT_AXIS:
    #     print(f"AXIS lx={rc.lx:+.2f} ly={rc.ly:+.2f} rx={rc.rx:+.2f} ry={rc.ry:+.2f}")
    # elif ev in (EVENT_BTN_DN, EVENT_BTN_UP):
    #     names = [n for n in ("a","b","x","y","lb","rb","lt","rt","ls","rs","back","start")
    #              if getattr(rc, n)]
    #     print(("BTN_DOWN" if ev==EVENT_BTN_DN else "BTN_UP"), names)
    # elif ev == EVENT_HAT:
    #     hats = [n for n in ("hat_c","hat_u","hat_d","hat_l","hat_r","hat_lu","hat_ld","hat_ru","hat_rd")
    #             if getattr(rc, n)]
    #     print("HAT", hats)
    # elif ev == EVENT_REMOVE:
    #     print("CONTROLLER REMOVED")

    if ev == EVENT_BTN_DN:
        if rc.x:
            robot.send_arm_command(arm_states=right_arm_states, arm="right", time_gap_s=.04, units="radians")
        if rc.y:
            robot.send_arm_command(arm_states=left_arm_states, arm="left", time_gap_s=.04, units="radians")



sub = B1RemoteControllerStateSubscriber(on_remote)
sub.InitChannel()
print("Subscribed:", sub.GetChannelName())

In [None]:
sub.CloseChannel()

# Auxiliary: Low Level Controller for Booster Robot

In [None]:
# pip install the wheel that exposes: booster_robotics_sdk_python
from time import sleep, perf_counter

from booster_robotics_sdk_python import (
    ChannelFactory,
    B1LowStateSubscriber,
    B1LowCmdPublisher,
    LowCmd, MotorCmd, LowCmdType,
    B1LocoClient, RobotMode,
    B1JointIndex as J  # enums provided in your binding
)

# ---- 1) Bring up DDS and switch to Custom mode (so low-level cmds are accepted)
ChannelFactory.Instance().Init(domain_id=0, network_interface="")  # set your domain/interface as needed

loco = B1LocoClient()
loco.Init()
# ret = loco.ChangeMode(RobotMode.kPrepare)
# time.sleep(1.0)  # wait a bit after switching to Prepare
ret = loco.ChangeMode(RobotMode.kCustom)
if ret != 0:
    raise RuntimeError("ChangeMode(kCustom) failed")

# ---- 2) Read ONE LowState to learn SERIAL motor count and current poses
latest_low_state = {"msg": None}

def _grab_once(ls):
    latest_low_state["msg"] = ls

sub = B1LowStateSubscriber(handler=_grab_once)
sub.InitChannel()

# wait up to ~1s to get one packet
for _ in range(100):
    if latest_low_state["msg"] is not None:
        break
    sleep(0.01)

if latest_low_state["msg"] is None:
    raise RuntimeError("Did not receive LowState; check channels / domain / wiring")

low_state = latest_low_state["msg"]
serial_states = low_state.motor_state_serial  # list[MotorState]
serial_cnt = len(serial_states)
parallel_cnt = len(low_state.motor_state_parallel)
print(f"serial motors: {serial_cnt}, parallel motors: {parallel_cnt}")

In [None]:

# ---- 3) Prepare a helper to build a neutral MotorCmd array from current state
def neutral_cmds_from(low_state_list):
    cmds = []
    for s in low_state_list:
        mc = MotorCmd()
        mc.mode = 0  # leave as 0 if your firmware ignores it in PD+weight mode
        mc.q = s.q      # hold current position by default
        mc.dq = 0.0
        mc.tau = 0.0
        mc.kp = 0.0
        mc.kd = 0.0
        mc.weight = 0.0  # weight==0 → joint ignored; >0 → joint considered
        cmds.append(mc)
    return cmds

# ---- 4) Choose which SERIAL joints are the arm (LEFT or RIGHT) and set a punch profile
# Adjust these to your wiring/order if needed. The enum gives semantic names.
ARM_JOINTS_RIGHT = [
    J.kRightShoulderPitch,  # forward/back
    J.kRightShoulderRoll,   # lateral
    J.kRightElbowPitch,     # extend/flex
    # J.kRightElbowYaw,     # optional; include if you want pronation/supination
]

# Target deltas (radians) relative to current posture for a forward punch
PUNCH_DELTA = {
    J.kRightShoulderPitch: +0.45,  # shoulder pitch forward
    J.kRightElbowPitch:    -0.60,  # elbow extension (negative if flexion was positive)
    J.kRight
    # keep roll / yaw small to avoid side swing
    J.kRightShoulderRoll:   0.00,
}

# PD gains for a snappy but controlled hit — start modest!
KP_HARD = 80.0
KD_MED  = 2.0

# ---- 5) Build publisher
pub = B1LowCmdPublisher()
pub.InitChannel()

def send_serial_cmds(cmds):
    msg = LowCmd()
    msg.cmd_type = LowCmdType.SERIAL
    msg.motor_cmd = cmds
    ok = pub.Write(msg)
    if not ok:
        raise RuntimeError("Publish LowCmd failed")

# ---- 6) Compose the motion: stiffen → extend fast → hold briefly → retract
serial_cmds = neutral_cmds_from(serial_states)

# Helper to apply weights/gains to selected joints by enum index
def set_pd_for(j_indices, kp, kd, weight=1.0):
    for j in j_indices:
        idx = int(j)  # enums are ints in the binding
        if 0 <= idx < len(serial_cmds):
            serial_cmds[idx].kp = kp
            serial_cmds[idx].kd = kd
            serial_cmds[idx].weight = weight

def set_targets(delta_map, scale=1.0):
    # Targets are absolute q = q0 + scale*delta
    for j, dq in delta_map.items():
        idx = int(j)
        if 0 <= idx < len(serial_cmds):
            q0 = serial_states[idx].q
            serial_cmds[idx].q = q0 + scale * dq

# 6a) Stiffen arm at current pose
set_pd_for(ARM_JOINTS_RIGHT, kp=40.0, kd=1.0, weight=1.0)
set_targets({j: 0.0 for j in ARM_JOINTS_RIGHT}, scale=0.0)
send_serial_cmds(serial_cmds)
sleep(0.08)

# 6b) Fast extend (the “punch”)
set_pd_for(ARM_JOINTS_RIGHT, kp=KP_HARD, kd=KD_MED, weight=1.0)
set_targets(PUNCH_DELTA, scale=1.0)
send_serial_cmds(serial_cmds)
sleep(0.20)  # short dwell at extension

# 6c) Retract quickly back to start pose
set_targets({j: 0.0 for j in PUNCH_DELTA.keys()}, scale=0.0)  # back to q0
send_serial_cmds(serial_cmds)
sleep(0.25)

# 6d) Relax gains
set_pd_for(ARM_JOINTS_RIGHT, kp=10.0, kd=0.5, weight=1.0)
send_serial_cmds(serial_cmds)

# ---- 7) Cleanup (optional; keep alive if you’ll stream more)
pub.CloseChannel()
sub.CloseChannel()