diff --git a/data/.lfs/g1_wholebody_replay.json.tar.gz b/data/.lfs/g1_wholebody_replay.json.tar.gz new file mode 100644 index 0000000000..1ad3ea8da3 --- /dev/null +++ b/data/.lfs/g1_wholebody_replay.json.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8bffca753401f78587ba743d19fc3493aac7ffdb9a42c3863299082e62da2788 +size 1147342 diff --git a/dimos/control/components.py b/dimos/control/components.py index be12aa4fb1..04ca6f03a7 100644 --- a/dimos/control/components.py +++ b/dimos/control/components.py @@ -37,6 +37,7 @@ def split_joint_name(joint_name: str) -> tuple[str, str]: class HardwareType(Enum): MANIPULATOR = "manipulator" BASE = "base" + WHOLE_BODY = "whole_body" @dataclass(frozen=True) @@ -135,6 +136,58 @@ def make_twist_base_joints( return [f"{hardware_id}/{s}" for s in suffixes] +_HUMANOID_29DOF_JOINTS = [ + # Left leg (0-5) + "left_hip_pitch", + "left_hip_roll", + "left_hip_yaw", + "left_knee", + "left_ankle_pitch", + "left_ankle_roll", + # Right leg (6-11) + "right_hip_pitch", + "right_hip_roll", + "right_hip_yaw", + "right_knee", + "right_ankle_pitch", + "right_ankle_roll", + # Waist (12-14) + "waist_yaw", + "waist_roll", + "waist_pitch", + # Left arm (15-21) + "left_shoulder_pitch", + "left_shoulder_roll", + "left_shoulder_yaw", + "left_elbow", + "left_wrist_roll", + "left_wrist_pitch", + "left_wrist_yaw", + # Right arm (22-28) + "right_shoulder_pitch", + "right_shoulder_roll", + "right_shoulder_yaw", + "right_elbow", + "right_wrist_roll", + "right_wrist_pitch", + "right_wrist_yaw", +] + + +def make_humanoid_joints(hardware_id: HardwareId) -> list[JointName]: + """Create joint names for a 29-DOF humanoid. + + Covers 6-DOF legs, 3-DOF waist, and 7-DOF arms. + + Args: + hardware_id: The hardware identifier (e.g., "g1") + + Returns: + List of 29 joint names like ["g1/left_hip_pitch", ..., "g1/right_wrist_yaw"] + """ + return [f"{hardware_id}/{j}" for j in _HUMANOID_29DOF_JOINTS] + + __all__ = [ "TWIST_SUFFIX_MAP", "HardwareComponent", @@ -144,6 +197,7 @@ def make_twist_base_joints( "JointState", "TaskName", "make_gripper_joints", + "make_humanoid_joints", "make_joints", "make_twist_base_joints", "split_joint_name", diff --git a/dimos/control/coordinator.py b/dimos/control/coordinator.py index 555db13522..13c959cd89 100644 --- a/dimos/control/coordinator.py +++ b/dimos/control/coordinator.py @@ -40,7 +40,11 @@ TaskName, split_joint_name, ) -from dimos.control.hardware_interface import ConnectedHardware, ConnectedTwistBase +from dimos.control.hardware_interface import ( + ConnectedHardware, + ConnectedTwistBase, + ConnectedWholeBody, +) from dimos.control.task import ControlTask from dimos.control.tick_loop import TickLoop from dimos.core.core import rpc @@ -50,6 +54,7 @@ TwistBaseAdapter, ) from dimos.hardware.manipulators.spec import ManipulatorAdapter +from dimos.hardware.whole_body.spec import WholeBodyAdapter from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.sensor_msgs.JointState import JointState @@ -210,8 +215,10 @@ def _setup_from_config(self) -> None: def _setup_hardware(self, component: HardwareComponent) -> None: """Connect and add a single hardware adapter.""" - adapter: ManipulatorAdapter | TwistBaseAdapter - if component.hardware_type == HardwareType.BASE: + adapter: ManipulatorAdapter | TwistBaseAdapter | WholeBodyAdapter + if component.hardware_type == HardwareType.WHOLE_BODY: + adapter = self._create_whole_body_adapter(component) + elif component.hardware_type == HardwareType.BASE: adapter = self._create_twist_base_adapter(component) else: adapter = self._create_adapter(component) @@ -252,6 +259,30 @@ def _create_twist_base_adapter(self, component: HardwareComponent) -> TwistBaseA **component.adapter_kwargs, ) + def _create_whole_body_adapter(self, component: HardwareComponent) -> WholeBodyAdapter: + """Create a whole-body adapter from component config. + + ``component.address`` carries the DDS network interface — int (CAN port) + or str ("enp60s0"); cyclonedds requires the right type, so try int() first + and fall back to keeping the original string. + """ + from dimos.hardware.whole_body.registry import whole_body_adapter_registry + + addr: int | str | None = component.address + if addr is not None: + try: + addr = int(addr) + except ValueError: + pass # keep as string (e.g. "enp60s0") + + return whole_body_adapter_registry.create( + component.adapter_type, + dof=len(component.joints), + hardware_id=component.hardware_id, + network_interface=addr if addr is not None else "", + **component.adapter_kwargs, + ) + def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: """Create a control task from config.""" task_type = cfg.type.lower() @@ -334,13 +365,21 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: @rpc def add_hardware( self, - adapter: ManipulatorAdapter | TwistBaseAdapter, + adapter: ManipulatorAdapter | TwistBaseAdapter | WholeBodyAdapter, component: HardwareComponent, ) -> bool: """Register a hardware adapter with the coordinator.""" is_base = component.hardware_type == HardwareType.BASE + is_whole_body = component.hardware_type == HardwareType.WHOLE_BODY + + if is_base and not isinstance(adapter, TwistBaseAdapter): + raise TypeError( + f"Hardware type / adapter mismatch for '{component.hardware_id}': " + f"hardware_type={component.hardware_type.value} but got " + f"{type(adapter).__name__}" + ) - if is_base != isinstance(adapter, TwistBaseAdapter): + if is_whole_body and not isinstance(adapter, WholeBodyAdapter): raise TypeError( f"Hardware type / adapter mismatch for '{component.hardware_id}': " f"hardware_type={component.hardware_type.value} but got " @@ -352,8 +391,13 @@ def add_hardware( logger.warning(f"Hardware {component.hardware_id} already registered") return False - if isinstance(adapter, TwistBaseAdapter): - connected: ConnectedHardware = ConnectedTwistBase( + if isinstance(adapter, WholeBodyAdapter): + connected: ConnectedHardware = ConnectedWholeBody( + adapter=adapter, + component=component, + ) + elif isinstance(adapter, TwistBaseAdapter): + connected = ConnectedTwistBase( adapter=adapter, component=component, ) diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index 3925116f97..0169321600 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -33,6 +33,7 @@ if TYPE_CHECKING: from dimos.control.components import HardwareComponent, HardwareId, JointName, JointState from dimos.hardware.drive_trains.spec import TwistBaseAdapter + from dimos.hardware.whole_body.spec import MotorCommand, WholeBodyAdapter logger = setup_logger() @@ -54,15 +55,6 @@ def __init__( adapter: ManipulatorAdapter, component: HardwareComponent, ) -> None: - """Initialize hardware interface. - - Args: - adapter: ManipulatorAdapter instance (XArmAdapter, PiperAdapter, etc.) - component: Hardware component with joints config - """ - if not isinstance(adapter, ManipulatorAdapter): - raise TypeError("adapter must implement ManipulatorAdapter") - self._adapter = adapter self._component = component self._arm_joint_names: list[JointName] = list(component.joints) @@ -248,11 +240,6 @@ def __init__( adapter: TwistBaseAdapter, component: HardwareComponent, ) -> None: - from dimos.hardware.drive_trains.spec import TwistBaseAdapter as TwistBaseAdapterProto - - if not isinstance(adapter, TwistBaseAdapterProto): - raise TypeError("adapter must implement TwistBaseAdapter") - self._twist_adapter = adapter self._component = component self._joint_names = component.joints @@ -319,7 +306,117 @@ def write_command(self, commands: dict[str, float], _mode: ControlMode) -> bool: return self._twist_adapter.write_velocities(ordered) +_DEFAULT_KP: float = 40.0 +_DEFAULT_KD: float = 3.0 + + +class ConnectedWholeBody(ConnectedHardware): + """Runtime wrapper for a whole-body motor platform connected to the coordinator. + + Wraps a WholeBodyAdapter for joint-level motor control (any DOF count). + + Key differences from ConnectedHardware: + - Reads joint state from MotorState (q, dq, tau) + - write_command converts position commands to MotorCommand with PD gains + - write_motor_commands provides direct pass-through to adapter + """ + + _wb_adapter: WholeBodyAdapter + + def __init__( + self, + adapter: WholeBodyAdapter, + component: HardwareComponent, + ) -> None: + self._wb_adapter = adapter + self._component = component + self._joint_names = component.joints + + self._last_commanded: dict[str, float] = {} + self._initialized = False + self._warned_unknown_joints: set[str] = set() + self._current_mode: ControlMode | None = None + + @property + def adapter(self) -> WholeBodyAdapter: # type: ignore[override] + """The underlying whole-body adapter.""" + return self._wb_adapter + + def disconnect(self) -> None: + """Disconnect the underlying adapter.""" + self._wb_adapter.disconnect() + + def read_state(self) -> dict[JointName, JointState]: + """Read motor states as {joint_name: JointState}.""" + from dimos.control.components import JointState + + motor_states = self._wb_adapter.read_motor_states() + return { + name: JointState( + position=motor_states[i].q, + velocity=motor_states[i].dq, + effort=motor_states[i].tau, + ) + for i, name in enumerate(self._joint_names) + } + + def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: + """Write position commands — converts to MotorCommand with PD gains. + + Only POSITION / SERVO_POSITION are supported; other modes are warned + and dropped (matches ConnectedHardware's warn-and-skip pattern). + """ + from dimos.hardware.whole_body.spec import MotorCommand + + if mode not in (ControlMode.POSITION, ControlMode.SERVO_POSITION): + logger.warning( + f"WholeBody {self.hardware_id} only supports POSITION/SERVO_POSITION; " + f"got {mode.name} — skipping" + ) + return False + + if not self._initialized and not self._try_initialize_last_commanded(): + return False + + for joint_name, value in commands.items(): + if joint_name in self._joint_names: + self._last_commanded[joint_name] = value + elif joint_name not in self._warned_unknown_joints: + logger.warning( + f"WholeBody {self.hardware_id} received command for unknown joint " + f"{joint_name}. Valid joints: {self._joint_names}" + ) + self._warned_unknown_joints.add(joint_name) + + motor_cmds = [ + MotorCommand( + q=self._last_commanded[name], + dq=0.0, + kp=_DEFAULT_KP, + kd=_DEFAULT_KD, + tau=0.0, + ) + for name in self._joint_names + ] + return self._wb_adapter.write_motor_commands(motor_cmds) + + def write_motor_commands(self, commands: list[MotorCommand]) -> bool: + """Direct pass-through to adapter for full MotorCommand control.""" + return self._wb_adapter.write_motor_commands(commands) + + def _try_initialize_last_commanded(self) -> bool: + """Non-blocking init. Returns True once motor_states is cached.""" + if not self._wb_adapter.has_motor_states(): + return False + states = self._wb_adapter.read_motor_states() + for i, name in enumerate(self._joint_names): + self._last_commanded[name] = states[i].q + self._initialized = True + return True + + __all__ = [ "ConnectedHardware", "ConnectedTwistBase", + "ConnectedWholeBody", ] diff --git a/dimos/hardware/whole_body/registry.py b/dimos/hardware/whole_body/registry.py new file mode 100644 index 0000000000..ddab15b0fc --- /dev/null +++ b/dimos/hardware/whole_body/registry.py @@ -0,0 +1,86 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""WholeBodyAdapter registry with auto-discovery. + +Mirrors the TwistBaseAdapterRegistry pattern: each subpackage provides a +``register(registry)`` function in its ``adapter.py`` module. + +Usage: + from dimos.hardware.whole_body.registry import whole_body_adapter_registry + + adapter = whole_body_adapter_registry.create("unitree_go2") + print(whole_body_adapter_registry.available()) # ["unitree_go2"] +""" + +from __future__ import annotations + +from collections.abc import Callable +import importlib +import os +from typing import TYPE_CHECKING, Any + +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.hardware.whole_body.spec import WholeBodyAdapter + +logger = setup_logger() + + +class WholeBodyAdapterRegistry: + """Registry for whole-body motor adapters with auto-discovery.""" + + def __init__(self) -> None: + # Factory may be a class or any other callable (e.g. functools.partial + # binding transport_cls). Store as Callable so `register("transport_lcm", + # partial(TransportWholeBodyAdapter, ...))` typechecks. + self._adapters: dict[str, Callable[..., WholeBodyAdapter]] = {} + + def register(self, name: str, cls: Callable[..., WholeBodyAdapter]) -> None: + """Register an adapter factory (class or callable).""" + self._adapters[name.lower()] = cls + + def create(self, name: str, **kwargs: Any) -> WholeBodyAdapter: + """Create an adapter instance by name.""" + key = name.lower() + if key not in self._adapters: + raise KeyError(f"Unknown whole-body adapter: {name}. Available: {self.available()}") + return self._adapters[key](**kwargs) + + def available(self) -> list[str]: + """List available adapter names.""" + return sorted(self._adapters.keys()) + + def discover(self) -> None: + """Discover and register adapters from subpackages.""" + import dimos.hardware.whole_body as pkg + + pkg_dir = pkg.__path__[0] + for entry in sorted(os.listdir(pkg_dir)): + entry_path = os.path.join(pkg_dir, entry) + if not os.path.isdir(entry_path) or entry.startswith(("_", ".")): + continue + try: + mod = importlib.import_module(f"dimos.hardware.whole_body.{entry}.adapter") + if hasattr(mod, "register"): + mod.register(self) + except ImportError as e: + logger.warning(f"Skipping whole-body adapter {entry}: {e}") + + +whole_body_adapter_registry = WholeBodyAdapterRegistry() +whole_body_adapter_registry.discover() + +__all__ = ["WholeBodyAdapterRegistry", "whole_body_adapter_registry"] diff --git a/dimos/hardware/whole_body/spec.py b/dimos/hardware/whole_body/spec.py new file mode 100644 index 0000000000..4645e8c44e --- /dev/null +++ b/dimos/hardware/whole_body/spec.py @@ -0,0 +1,77 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""WholeBodyAdapter protocol for joint-level (q/dq/kp/kd/tau) motor control.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + +# Unitree SDK sentinels meaning "no command" for that DOF. +POS_STOP: float = 2.146e9 +VEL_STOP: float = 16000.0 + + +@dataclass(frozen=True) +class MotorCommand: + """Command for a single motor.""" + + q: float = POS_STOP # target position (rad) + dq: float = VEL_STOP # target velocity (rad/s) + kp: float = 0.0 # position gain + kd: float = 0.0 # velocity gain + tau: float = 0.0 # feedforward torque (Nm) + + +@dataclass(frozen=True) +class MotorState: + """Feedback from a single motor.""" + + q: float = 0.0 # position (rad) + dq: float = 0.0 # velocity (rad/s) + tau: float = 0.0 # estimated torque (Nm) + + +@dataclass(frozen=True) +class IMUState: + """IMU feedback.""" + + quaternion: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0) + gyroscope: tuple[float, float, float] = (0.0, 0.0, 0.0) + accelerometer: tuple[float, float, float] = (0.0, 0.0, 0.0) + rpy: tuple[float, float, float] = (0.0, 0.0, 0.0) + + +@runtime_checkable +class WholeBodyAdapter(Protocol): + """Joint-level whole-body motor IO. SI units (rad, rad/s, Nm).""" + + def connect(self) -> bool: ... + def disconnect(self) -> None: ... + def is_connected(self) -> bool: ... + def read_motor_states(self) -> list[MotorState]: ... + def has_motor_states(self) -> bool: ... + def read_imu(self) -> IMUState: ... + def write_motor_commands(self, commands: list[MotorCommand]) -> bool: ... + + +__all__ = [ + "POS_STOP", + "VEL_STOP", + "IMUState", + "MotorCommand", + "MotorState", + "WholeBodyAdapter", +] diff --git a/dimos/hardware/whole_body/transport/adapter.py b/dimos/hardware/whole_body/transport/adapter.py new file mode 100644 index 0000000000..8950803bb3 --- /dev/null +++ b/dimos/hardware/whole_body/transport/adapter.py @@ -0,0 +1,199 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Transport-based whole-body adapter: bridges coordinator ↔ Module via pub/sub. + +Subscribes /{hardware_id}/motor_states + /{hardware_id}/imu, publishes +/{hardware_id}/motor_command. ``network_interface`` is accepted but ignored. +""" + +from __future__ import annotations + +from functools import partial +import threading +from typing import TYPE_CHECKING, Any + +from dimos.core.transport import LCMTransport +from dimos.hardware.whole_body.spec import IMUState, MotorCommand, MotorState +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.msgs.sensor_msgs.MotorCommandArray import MotorCommandArray +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.hardware.whole_body.registry import WholeBodyAdapterRegistry + +logger = setup_logger() + + +class TransportWholeBodyAdapter: + """WholeBodyAdapter that bridges to a robot-side Module via pub/sub.""" + + def __init__( + self, + dof: int = 29, + hardware_id: str = "wholebody", + transport_cls: type = LCMTransport, + network_interface: int | str = "", # accepted-and-ignored — see module docstring + **_: object, + ) -> None: + self._dof = dof + self._prefix = hardware_id + self._transport_cls = transport_cls + + self._lock = threading.Lock() + self._latest_motor_states: list[MotorState] | None = None + self._latest_imu: IMUState | None = None + + self._motor_states_transport: Any = None + self._imu_transport: Any = None + self._motor_command_transport: Any = None + self._motor_states_unsub: Any = None + self._imu_unsub: Any = None + self._connected = False + + def connect(self) -> bool: + ms_topic = f"/{self._prefix}/motor_states" + imu_topic = f"/{self._prefix}/imu" + cmd_topic = f"/{self._prefix}/motor_command" + + self._motor_states_transport = self._transport_cls(ms_topic, JointState) + self._imu_transport = self._transport_cls(imu_topic, Imu) + self._motor_command_transport = self._transport_cls(cmd_topic, MotorCommandArray) + + self._motor_states_unsub = self._motor_states_transport.subscribe(self._on_motor_states) + self._imu_unsub = self._imu_transport.subscribe(self._on_imu) + + self._connected = True + logger.info( + f"TransportWholeBodyAdapter connected: motor_states={ms_topic}, " + f"imu={imu_topic}, motor_command={cmd_topic}" + ) + return True + + def disconnect(self) -> None: + if self._motor_states_unsub is not None: + self._motor_states_unsub() + self._motor_states_unsub = None + if self._imu_unsub is not None: + self._imu_unsub() + self._imu_unsub = None + + for t in ( + self._motor_states_transport, + self._imu_transport, + self._motor_command_transport, + ): + if t is not None: + t.stop() + self._motor_states_transport = None + self._imu_transport = None + self._motor_command_transport = None + + with self._lock: + self._latest_motor_states = None + self._latest_imu = None + + self._connected = False + logger.info("TransportWholeBodyAdapter disconnected") + + def is_connected(self) -> bool: + return self._connected + + def read_motor_states(self) -> list[MotorState]: + with self._lock: + if self._latest_motor_states is None: + return [MotorState() for _ in range(self._dof)] + return list(self._latest_motor_states) + + def has_motor_states(self) -> bool: + """True once the first motor_states frame has been received.""" + with self._lock: + return self._latest_motor_states is not None + + def read_imu(self) -> IMUState: + with self._lock: + if self._latest_imu is None: + return IMUState() + return self._latest_imu + + def write_motor_commands(self, commands: list[MotorCommand]) -> bool: + if self._motor_command_transport is None: + logger.warning("write_motor_commands called before connect()") + return False + + msg = MotorCommandArray( + q=[c.q for c in commands], + dq=[c.dq for c in commands], + kp=[c.kp for c in commands], + kd=[c.kd for c in commands], + tau=[c.tau for c in commands], + ) + self._motor_command_transport.publish(msg) + return True + + def _on_motor_states(self, msg: JointState) -> None: + # Drop short frames; downstream code indexes range(_dof) directly. + if ( + len(msg.position) < self._dof + or len(msg.velocity) < self._dof + or len(msg.effort) < self._dof + ): + return + states = [ + MotorState(q=msg.position[i], dq=msg.velocity[i], tau=msg.effort[i]) + for i in range(self._dof) + ] + with self._lock: + self._latest_motor_states = states + + def _on_imu(self, msg: Imu) -> None: + # dimos Imu Quaternion is (x,y,z,w); IMUState.quaternion is (w,x,y,z). + with self._lock: + self._latest_imu = IMUState( + quaternion=( + msg.orientation.w, + msg.orientation.x, + msg.orientation.y, + msg.orientation.z, + ), + gyroscope=( + msg.angular_velocity.x, + msg.angular_velocity.y, + msg.angular_velocity.z, + ), + accelerometer=( + msg.linear_acceleration.x, + msg.linear_acceleration.y, + msg.linear_acceleration.z, + ), + rpy=(0.0, 0.0, 0.0), + ) + + +def register(registry: WholeBodyAdapterRegistry) -> None: + """Auto-discovered by ``whole_body_adapter_registry.discover()``.""" + from dimos.core.transport import ROSTransport + + registry.register( + "transport_lcm", + partial(TransportWholeBodyAdapter, transport_cls=LCMTransport), + ) + registry.register( + "transport_ros", + partial(TransportWholeBodyAdapter, transport_cls=ROSTransport), + ) + + +__all__ = ["TransportWholeBodyAdapter"] diff --git a/dimos/msgs/sensor_msgs/MotorCommandArray.py b/dimos/msgs/sensor_msgs/MotorCommandArray.py new file mode 100644 index 0000000000..577b03c765 --- /dev/null +++ b/dimos/msgs/sensor_msgs/MotorCommandArray.py @@ -0,0 +1,133 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""LCM type for low-level motor commands: q, dq, kp, kd, tau per motor.""" + +from io import BytesIO +import struct +import time + + +class MotorCommandArray: + """Per-motor q/dq/kp/kd/tau. Joint order is implicit (caller-defined).""" + + msg_name = "sensor_msgs.MotorCommandArray" + + __slots__ = ["dq", "kd", "kp", "num_joints", "q", "tau", "timestamp"] + + def __init__( + self, + q: list[float] | None = None, + dq: list[float] | None = None, + kp: list[float] | None = None, + kd: list[float] | None = None, + tau: list[float] | None = None, + timestamp: float | None = None, + ) -> None: + if q is None: + q = [] + if timestamp is None: + timestamp = time.time() + + n = len(q) + if dq is None: + dq = [0.0] * n + if kp is None: + kp = [0.0] * n + if kd is None: + kd = [0.0] * n + if tau is None: + tau = [0.0] * n + + if not (len(dq) == len(kp) == len(kd) == len(tau) == n): + raise ValueError( + f"All arrays must have length {n}; got " + f"dq={len(dq)} kp={len(kp)} kd={len(kd)} tau={len(tau)}" + ) + + self.timestamp = timestamp + self.num_joints = n + self.q = list(q) + self.dq = list(dq) + self.kp = list(kp) + self.kd = list(kd) + self.tau = list(tau) + + def lcm_encode(self) -> bytes: + return self.encode() + + def encode(self) -> bytes: + buf = BytesIO() + buf.write(MotorCommandArray._get_packed_fingerprint()) + self._encode_one(buf) + return buf.getvalue() + + def _encode_one(self, buf: BytesIO) -> None: + buf.write(struct.pack(">d", self.timestamp)) + buf.write(struct.pack(">i", self.num_joints)) + for arr in (self.q, self.dq, self.kp, self.kd, self.tau): + for v in arr: + buf.write(struct.pack(">d", v)) + + @classmethod + def lcm_decode(cls, data: bytes) -> "MotorCommandArray": + return cls.decode(data) + + @classmethod + def decode(cls, data: bytes | BytesIO) -> "MotorCommandArray": + buf: BytesIO = data if isinstance(data, BytesIO) else BytesIO(data) + if buf.read(8) != cls._get_packed_fingerprint(): + raise ValueError("Decode error") + return cls._decode_one(buf) + + @classmethod + def _decode_one(cls, buf: BytesIO) -> "MotorCommandArray": + self = MotorCommandArray.__new__(MotorCommandArray) + self.timestamp = struct.unpack(">d", buf.read(8))[0] + self.num_joints = struct.unpack(">i", buf.read(4))[0] + n = self.num_joints + arrays = [] + for _ in range(5): + arrays.append([struct.unpack(">d", buf.read(8))[0] for _ in range(n)]) + self.q, self.dq, self.kp, self.kd, self.tau = arrays + return self + + @classmethod + def _get_hash_recursive(cls, parents: list[type]) -> int: + if cls in parents: + return 0 + # Distinct fingerprint from JointCommand (0x8A3D2E1C5F4B6A9D) + tmphash = (0x9B4E3F2D6A5C7B8E) & 0xFFFFFFFFFFFFFFFF + tmphash = (((tmphash << 1) & 0xFFFFFFFFFFFFFFFF) + (tmphash >> 63)) & 0xFFFFFFFFFFFFFFFF + return tmphash + + _packed_fingerprint: bytes | None = None + + @classmethod + def _get_packed_fingerprint(cls) -> bytes: + if cls._packed_fingerprint is None: + cls._packed_fingerprint = struct.pack(">Q", cls._get_hash_recursive([])) + return cls._packed_fingerprint + + def get_hash(self) -> int: + return int(struct.unpack(">Q", MotorCommandArray._get_packed_fingerprint())[0]) + + def __str__(self) -> str: + return f"MotorCommandArray(timestamp={self.timestamp:.6f}, num_joints={self.num_joints})" + + def __repr__(self) -> str: + return ( + f"MotorCommandArray(q={self.q}, dq={self.dq}, kp={self.kp}, " + f"kd={self.kd}, tau={self.tau}, timestamp={self.timestamp})" + ) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 96ce1c6784..f1f23da4da 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -70,6 +70,7 @@ "unitree-g1-agentic-sim": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_agentic_sim:unitree_g1_agentic_sim", "unitree-g1-basic": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_basic:unitree_g1_basic", "unitree-g1-basic-sim": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_basic_sim:unitree_g1_basic_sim", + "unitree-g1-coordinator": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_coordinator:unitree_g1_coordinator", "unitree-g1-detection": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_detection:unitree_g1_detection", "unitree-g1-full": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_full:unitree_g1_full", "unitree-g1-joystick": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_joystick:unitree_g1_joystick", @@ -125,6 +126,7 @@ "g1-connection": "dimos.robot.unitree.g1.connection.G1Connection", "g1-connection-base": "dimos.robot.unitree.g1.connection.G1ConnectionBase", "g1-sim-connection": "dimos.robot.unitree.g1.sim.G1SimConnection", + "g1-whole-body-connection": "dimos.robot.unitree.g1.wholebody_connection.G1WholeBodyConnection", "go2-connection": "dimos.robot.unitree.go2.connection.GO2Connection", "go2-fleet-connection": "dimos.robot.unitree.go2.fleet_connection.Go2FleetConnection", "go2-memory": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2.Go2Memory", diff --git a/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_coordinator.py b/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_coordinator.py new file mode 100644 index 0000000000..55991fa25d --- /dev/null +++ b/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_coordinator.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""G1 ControlCoordinator: G1WholeBodyConnection Module + servo task via LCM bridge. + +Mirrors `unitree_go2_coordinator.py`. Run with `ROBOT_INTERFACE= dimos run unitree-g1-coordinator`. +""" + +from __future__ import annotations + +import os + +from dimos.control.components import HardwareComponent, HardwareType, make_humanoid_joints +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.transport import LCMTransport +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.msgs.sensor_msgs.MotorCommandArray import MotorCommandArray +from dimos.robot.unitree.g1.wholebody_connection import G1WholeBodyConnection + +_g1_joints = make_humanoid_joints("g1") + +# ROBOT_INTERFACE pins cyclonedds to a NIC; required on multi-NIC hosts. +unitree_g1_coordinator = ( + autoconnect( + G1WholeBodyConnection.blueprint( + release_sport_mode=True, + network_interface=os.getenv("ROBOT_INTERFACE", ""), + ), + ControlCoordinator.blueprint( + tick_rate=500, + hardware=[ + HardwareComponent( + hardware_id="g1", + hardware_type=HardwareType.WHOLE_BODY, + joints=_g1_joints, + adapter_type="transport_lcm", + ), + ], + tasks=[ + TaskConfig( + name="servo_g1", + type="servo", + joint_names=_g1_joints, + priority=10, + ), + ], + ), + ) + # No remappings: Module stream names (motor_states/imu/motor_command) don't + # collide with ControlCoordinator's (joint_state/joint_command/...). + .transports( + { + ("motor_states", JointState): LCMTransport("/g1/motor_states", JointState), + ("imu", Imu): LCMTransport("/g1/imu", Imu), + ("motor_command", MotorCommandArray): LCMTransport( + "/g1/motor_command", MotorCommandArray + ), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("joint_command", JointState): LCMTransport("/g1/joint_command", JointState), + } + ) +) + + +__all__ = ["unitree_g1_coordinator"] diff --git a/dimos/robot/unitree/g1/wholebody_connection.py b/dimos/robot/unitree/g1/wholebody_connection.py new file mode 100644 index 0000000000..f4fb762bae --- /dev/null +++ b/dimos/robot/unitree/g1/wholebody_connection.py @@ -0,0 +1,306 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""G1 low-level DDS Module: rt/lowstate (sub) + rt/lowcmd (pub), unitree_hg IDL. + +Streams: motor_states (Out[JointState]), imu (Out[Imu]), +motor_command (In[MotorCommandArray]). 29 motors, ordering from +make_humanoid_joints("g1") (left leg → right leg → waist → left arm → right arm). +""" + +from __future__ import annotations + +import threading +from threading import Thread +import time +from typing import TYPE_CHECKING, Any + +from pydantic import Field +from reactivex.disposable import Disposable + +if TYPE_CHECKING: + from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber + from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_ + from unitree_sdk2py.utils.crc import CRC + +from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT +from dimos.control.components import make_humanoid_joints +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.hardware.whole_body.spec import POS_STOP, VEL_STOP +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.msgs.sensor_msgs.MotorCommandArray import MotorCommandArray +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +_NUM_MOTORS = 29 +_NUM_MOTOR_SLOTS = 35 # G1 hg LowCmd has 35 slots; only 29 are used +_MODE_MACHINE_WAIT_S = 10.0 + +# Joint names sourced from the canonical helper. Order matches the motor index +# convention above. Single-source-of-truth so any coordinator-side adapter built +# from make_humanoid_joints("g1") agrees on the wire-level name → motor-index mapping. +G1_JOINT_NAMES: list[str] = make_humanoid_joints("g1") +assert len(G1_JOINT_NAMES) == _NUM_MOTORS + + +class G1WholeBodyConnectionConfig(ModuleConfig): + network_interface: str = Field(default="") + release_sport_mode: bool = True + publish_rate_hz: float = 500.0 + frame_id: str = "g1_pelvis" + + +class G1WholeBodyConnection(Module): + """G1 humanoid Module — owns the DDS connection in its own worker.""" + + config: G1WholeBodyConnectionConfig + + motor_command: In[MotorCommandArray] + motor_states: Out[JointState] + imu: Out[Imu] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + self._publisher: ChannelPublisher | None = None + self._subscriber: ChannelSubscriber | None = None + self._low_cmd: LowCmd_ | None = None + self._low_state: LowState_ | None = None + self._crc: CRC | None = None + # mode_machine: read from first LowState, echoed back in every LowCmd. + self._mode_machine: int | None = None + # Guards _low_cmd / _low_state / _mode_machine across DDS, publish, and LCM threads. + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._publish_thread: Thread | None = None + + @rpc + def start(self) -> None: + super().start() + + # Lazy SDK imports — file must import cleanly outside the [unitree-dds] extra. + from unitree_sdk2py.core.channel import ( + ChannelFactoryInitialize, + ChannelPublisher, + ChannelSubscriber, + ) + from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ + from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_ + from unitree_sdk2py.utils.crc import CRC + + nic = self.config.network_interface + logger.info(f"Initializing DDS (G1 wholebody) interface={nic!r}...") + try: + if nic: + ChannelFactoryInitialize(0, nic) + else: + ChannelFactoryInitialize(0) + except Exception as e: + # Idempotent — already initialised by a sibling participant is fine. + logger.debug(f"ChannelFactoryInitialize raised (likely already init'd): {e}") + + self._publisher = ChannelPublisher("rt/lowcmd", LowCmd_) + self._publisher.Init() + + self._subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self._subscriber.Init(self._on_low_state, 10) + + # POS_STOP/VEL_STOP + zero gains so the robot can't twitch pre-command. + self._low_cmd = unitree_hg_msg_dds__LowCmd_() + self._low_cmd.mode_pr = 0 # PR (pitch/roll) mode + for i in range(_NUM_MOTOR_SLOTS): + self._low_cmd.motor_cmd[i].mode = 0x01 # enable + self._low_cmd.motor_cmd[i].q = POS_STOP + self._low_cmd.motor_cmd[i].kp = 0 + self._low_cmd.motor_cmd[i].dq = VEL_STOP + self._low_cmd.motor_cmd[i].kd = 0 + self._low_cmd.motor_cmd[i].tau = 0 + + self._crc = CRC() + + if self.config.release_sport_mode: + logger.info("Releasing sport mode...") + self._release_sport_mode() + else: + logger.info("Skipping sport mode release (release_sport_mode=False)") + + logger.info("Waiting for first LowState to capture mode_machine...") + deadline = time.time() + _MODE_MACHINE_WAIT_S + while self._mode_machine is None and time.time() < deadline: + time.sleep(0.1) + if self._mode_machine is None: + raise RuntimeError( + f"Timed out after {_MODE_MACHINE_WAIT_S:.1f}s waiting for " + f"first LowState — mode_machine never captured" + ) + + logger.info(f"G1WholeBodyConnection connected (mode_machine={self._mode_machine})") + + self.register_disposable(Disposable(self.motor_command.subscribe(self._on_motor_command))) + + self._publish_thread = Thread( + target=self._publish_loop, name="g1-wholebody-pump", daemon=True + ) + self._publish_thread.start() + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._publish_thread is not None and self._publish_thread.is_alive(): + self._publish_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) + self._publish_thread = None + + # Close DDS endpoints explicitly — GC-based cleanup races with in-flight + # callbacks and segfaults on process exit (mirrors the Go2 adapter). + if self._subscriber is not None: + try: + self._subscriber.Close() + except (OSError, RuntimeError) as e: + logger.warning(f"ChannelSubscriber Close raised: {e}") + if self._publisher is not None: + try: + self._publisher.Close() + except (OSError, RuntimeError) as e: + logger.warning(f"ChannelPublisher Close raised: {e}") + + self._publisher = None + self._subscriber = None + self._low_cmd = None + self._low_state = None + self._mode_machine = None + self._crc = None + + logger.info("G1WholeBodyConnection disconnected") + super().stop() + + def _publish_loop(self) -> None: + period = 1.0 / float(self.config.publish_rate_hz) + next_tick = time.perf_counter() + frame_id = self.config.frame_id + + # Identity quaternion + zeros while LowState hasn't arrived (start() blocks + # for it, but the publish loop may also see _low_state cleared during stop()). + zero_quat = (1.0, 0.0, 0.0, 0.0) + zero_vec3 = (0.0, 0.0, 0.0) + + while not self._stop_event.is_set(): + with self._lock: + ls = self._low_state + if ls is None: + positions: list[float] = [0.0] * _NUM_MOTORS + velocities: list[float] = [0.0] * _NUM_MOTORS + efforts: list[float] = [0.0] * _NUM_MOTORS + quat = zero_quat + gyro = zero_vec3 + accel = zero_vec3 + else: + positions = [ls.motor_state[i].q for i in range(_NUM_MOTORS)] + velocities = [ls.motor_state[i].dq for i in range(_NUM_MOTORS)] + efforts = [ls.motor_state[i].tau_est for i in range(_NUM_MOTORS)] + quat = tuple(ls.imu_state.quaternion) + gyro = tuple(ls.imu_state.gyroscope) + accel = tuple(ls.imu_state.accelerometer) + + now = time.time() + self.motor_states.publish( + JointState( + ts=now, + frame_id=frame_id, + name=G1_JOINT_NAMES, + position=positions, + velocity=velocities, + effort=efforts, + ) + ) + + # Unitree quat is (w,x,y,z); dimos Quaternion is (x,y,z,w). + self.imu.publish( + Imu( + ts=now, + frame_id=frame_id, + orientation=Quaternion(quat[1], quat[2], quat[3], quat[0]), + angular_velocity=Vector3(gyro[0], gyro[1], gyro[2]), + linear_acceleration=Vector3(accel[0], accel[1], accel[2]), + ) + ) + + next_tick += period + sleep_for = next_tick - time.perf_counter() + if sleep_for > 0: + time.sleep(sleep_for) + else: + next_tick = time.perf_counter() + + def _on_motor_command(self, msg: MotorCommandArray) -> None: + if msg.num_joints != _NUM_MOTORS: + logger.warning(f"Expected {_NUM_MOTORS} motor commands, got {msg.num_joints}; ignoring") + return + + with self._lock: + if ( + self._low_cmd is None + or self._crc is None + or self._publisher is None + or self._mode_machine is None + ): + # Pre-start or post-stop — drop silently. + return + + # Echo mode_machine from the latest LowState — required by G1 firmware. + self._low_cmd.mode_machine = self._mode_machine + + for i in range(_NUM_MOTORS): + self._low_cmd.motor_cmd[i].q = msg.q[i] + self._low_cmd.motor_cmd[i].dq = msg.dq[i] + self._low_cmd.motor_cmd[i].kp = msg.kp[i] + self._low_cmd.motor_cmd[i].kd = msg.kd[i] + self._low_cmd.motor_cmd[i].tau = msg.tau[i] + + self._low_cmd.crc = self._crc.Crc(self._low_cmd) + self._publisher.Write(self._low_cmd) + + def _on_low_state(self, msg: Any) -> None: + """rt/lowstate callback — captures mode_machine and the latest snapshot.""" + with self._lock: + self._low_state = msg + if self._mode_machine is None: + self._mode_machine = msg.mode_machine + + def _release_sport_mode(self) -> None: + """Loop ReleaseMode until MotionSwitcher reports no active controller.""" + from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import ( + MotionSwitcherClient, + ) + + msc = MotionSwitcherClient() + msc.SetTimeout(5.0) + msc.Init() + + # CheckMode returns (status, None) once nothing is active — null-tolerant. + _status, result = msc.CheckMode() + while result and result.get("name"): + msc.ReleaseMode() + _status, result = msc.CheckMode() + time.sleep(1) + + logger.info("Sport mode released — low-level control active") + + +__all__ = ["G1_JOINT_NAMES", "G1WholeBodyConnection", "G1WholeBodyConnectionConfig"] diff --git a/scripts/g1_replay.py b/scripts/g1_replay.py new file mode 100644 index 0000000000..03891c938f --- /dev/null +++ b/scripts/g1_replay.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Replay a 29-DOF G1 joint trajectory through unitree-g1-coordinator. + +Publishes JointState to /g1/joint_command. Trajectory file: positional 29-DOF +(joint_names ignored, zipped onto make_humanoid_joints("g1")). +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +import threading +import time + +from dimos.control.components import make_humanoid_joints +from dimos.core.transport import LCMTransport +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.utils.data import get_data +from dimos.utils.logging_config import setup_logger + +# get_data() auto-pulls + decompresses data/.lfs/.tar.gz on first use. +_DEFAULT_TRAJECTORY_NAME = "g1_wholebody_replay.json" + +NUM_DOF = 29 +CANONICAL_JOINTS = make_humanoid_joints("g1") +assert len(CANONICAL_JOINTS) == NUM_DOF + +logger = setup_logger() + + +def load_trajectory(path: Path) -> tuple[list[float], list[list[float]]]: + data = json.loads(path.read_text()) + joint_names = data.get("joint_names", []) + samples = data.get("samples", []) + if len(joint_names) != NUM_DOF: + raise ValueError(f"trajectory has {len(joint_names)} joint_names, expected {NUM_DOF}") + if not samples: + raise ValueError("trajectory has zero samples") + for i, s in enumerate(samples): + if len(s["position"]) != NUM_DOF: + raise ValueError(f"sample {i} has {len(s['position'])} positions, expected {NUM_DOF}") + t0 = samples[0]["ts"] + rel_ts = [s["ts"] - t0 for s in samples] + positions = [list(s["position"]) for s in samples] + return rel_ts, positions + + +def make_joint_state(positions: list[float]) -> JointState: + return JointState( + name=list(CANONICAL_JOINTS), + position=list(positions), + velocity=[0.0] * NUM_DOF, + effort=[0.0] * NUM_DOF, + ) + + +def main() -> None: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument( + "--file", + default=None, + help=f"trajectory JSON path (defaults to LFS-bundled {_DEFAULT_TRAJECTORY_NAME})", + ) + p.add_argument( + "--ramp", + type=float, + default=3.0, + help="seconds to interpolate from current pose to first sample", + ) + p.add_argument("--loop", action="store_true", help="repeat trajectory until Ctrl+C") + p.add_argument( + "--dry-run", action="store_true", help="subscribe + log but do NOT publish commands" + ) + args = p.parse_args() + + traj_path = Path(args.file) if args.file else get_data(_DEFAULT_TRAJECTORY_NAME) + rel_ts, positions = load_trajectory(traj_path) + native_rate = len(rel_ts) / rel_ts[-1] if rel_ts[-1] > 0 else 0.0 + logger.info( + f"loaded {len(rel_ts)} samples, duration={rel_ts[-1]:.2f}s, native_rate={native_rate:.1f}Hz" + ) + logger.info(f"first sample[0:3]={positions[0][:3]} last sample[0:3]={positions[-1][:3]}") + logger.info( + f"will publish to /g1/joint_command using canonical joint names ({CANONICAL_JOINTS[0]} ...)" + ) + + # Subscribe to coordinator's joint_state output to capture the current pose. + # Coordinator joint state names are {hardware_id}/{joint} so they align with + # CANONICAL_JOINTS, lookup is straightforward. + state_lock = threading.Lock() + current_q: dict[str, float] = {} + state_event = threading.Event() + + def on_state(msg: JointState) -> None: + with state_lock: + current_q.clear() + for n, q in zip(msg.name, msg.position, strict=True): + current_q[n] = q + state_event.set() + + state_sub: LCMTransport[JointState] = LCMTransport("/coordinator/joint_state", JointState) + # subscribe() returns an unsub callable; signature claims None (see transport.py). + state_unsub = state_sub.subscribe(on_state) # type: ignore[func-returns-value] + cmd_pub: LCMTransport[JointState] = LCMTransport("/g1/joint_command", JointState) + + try: + logger.info("waiting up to 10s for /coordinator/joint_state ...") + if not state_event.wait(timeout=10.0): + logger.error("no joint_state received — is `dimos run unitree-g1-coordinator` running?") + return + with state_lock: + start_q = [current_q.get(n, 0.0) for n in CANONICAL_JOINTS] + missing = [n for n in CANONICAL_JOINTS if n not in current_q] + if missing: + logger.warning( + f"coordinator joint_state missing {len(missing)} of {NUM_DOF} canonical joints: {missing[:3]}..." + ) + logger.warning("will treat missing joints as 0.0 — check joint name conventions") + logger.info(f"current pose[0:3]={start_q[0:3]}") + + if args.dry_run: + logger.info("--dry-run set — exiting before publish phase") + return + + # ramp <= 0: snap to trajectory[0] (only safe when robot already there). + target_q = positions[0] + if args.ramp <= 0: + logger.warning( + f"--ramp={args.ramp} ≤ 0; snapping directly to trajectory[0] (no interpolation)" + ) + cmd_pub.publish(make_joint_state(target_q)) + else: + logger.info(f"ramping current → trajectory[0] over {args.ramp:.2f}s") + ramp_period = 1.0 / 100.0 # 100 Hz during ramp + ramp_start = time.perf_counter() + while True: + elapsed = time.perf_counter() - ramp_start + a = min(elapsed / args.ramp, 1.0) + interp = [start_q[i] + a * (target_q[i] - start_q[i]) for i in range(NUM_DOF)] + cmd_pub.publish(make_joint_state(interp)) + if a >= 1.0: + break + time.sleep(ramp_period) + + passes = 0 + while True: + passes += 1 + logger.info(f"playback pass {passes} (Ctrl+C to stop)") + replay_start = time.perf_counter() + last_log = replay_start + for i, (sample_t, q) in enumerate(zip(rel_ts, positions, strict=True)): + wait = sample_t - (time.perf_counter() - replay_start) + if wait > 0: + time.sleep(wait) + cmd_pub.publish(make_joint_state(q)) + now = time.perf_counter() + if now - last_log >= 2.0: + logger.info(f" t={sample_t:6.2f}s sample={i}/{len(rel_ts)}") + last_log = now + if not args.loop: + break + + logger.info("trajectory complete (last pose held by coordinator's last command)") + + except KeyboardInterrupt: + logger.info("Ctrl+C — exiting; coordinator holds last published target") + finally: + state_unsub() + state_sub.stop() + cmd_pub.stop() + + +if __name__ == "__main__": + main()