Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
24cbba8
cherrry picked old low level adapter implementatio for the g1
mustafab0 Apr 29, 2026
d8e82dc
MotorCommand LCM data type added
mustafab0 Apr 29, 2026
446838d
g1 low level control module
mustafab0 Apr 29, 2026
752a131
absorbed adapter into wholebody_connection module
mustafab0 Apr 30, 2026
8bfd47e
Updated control coordinator to support WholeBodyAdapter
mustafab0 Apr 30, 2026
f0b90e2
TransportWholeBody adapter
mustafab0 Apr 30, 2026
7c152f0
low level g1 coordinator blueprint
mustafab0 Apr 30, 2026
cdaded1
Close dds endpoints to clean exit
mustafab0 Apr 30, 2026
72bea79
fixed network adapter field missing
mustafab0 Apr 30, 2026
88c26a3
hardware id added to whole body adapter
mustafab0 Apr 30, 2026
b3eb8ec
coordinator blueprint added with TransportWholeBodyAdapter
mustafab0 Apr 30, 2026
2ab8e42
replay script added to test wholebodyadapter
mustafab0 May 1, 2026
bbb01b1
ruff and mypy fixes
mustafab0 May 1, 2026
726bdc5
delete mock scripts
mustafab0 May 1, 2026
33de772
changed to use dimos logger
mustafab0 May 1, 2026
dddcdbc
replay joint commands for the whole body controller testing
mustafab0 May 1, 2026
63a0aef
g1 replay now pulls trawjectory from lfs
mustafab0 May 1, 2026
39ab104
skipo zero division when ramp is set to 0
mustafab0 May 1, 2026
e8bea3f
states now publish only for smallest message type to prevent crash in…
mustafab0 May 1, 2026
bfbd286
removed section headers
mustafab0 May 1, 2026
26e5df2
command current position to prevent robot jerking when starting
mustafab0 May 1, 2026
cc86432
drop state messages for buggy frame and use previous good frame
mustafab0 May 1, 2026
6679a85
forward dof for the wholebody adapter object creation
mustafab0 May 1, 2026
4cb34e9
replaced Any annotations on the DDS/SDK fields
mustafab0 May 2, 2026
47b25db
reduced verbose docstring
mustafab0 May 2, 2026
7d06666
removed getattr check for motor states and defined it as a bool in Pr…
mustafab0 May 2, 2026
ffb59b9
ControlMode validated name warns and returns
mustafab0 May 2, 2026
98eddaf
removed redundant isinstance checks
mustafab0 May 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data/.lfs/g1_wholebody_replay.json.tar.gz
Git LFS file not shown
54 changes: 54 additions & 0 deletions dimos/control/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def split_joint_name(joint_name: str) -> tuple[str, str]:
class HardwareType(Enum):
MANIPULATOR = "manipulator"
BASE = "base"
WHOLE_BODY = "whole_body"


@dataclass(frozen=True)
Expand Down Expand Up @@ -135,6 +136,58 @@ def make_twist_base_joints(
return [f"{hardware_id}/{s}" for s in suffixes]


_HUMANOID_29DOF_JOINTS = [
# Left leg (0-5)
"left_hip_pitch",
"left_hip_roll",
"left_hip_yaw",
"left_knee",
"left_ankle_pitch",
"left_ankle_roll",
# Right leg (6-11)
"right_hip_pitch",
"right_hip_roll",
"right_hip_yaw",
"right_knee",
"right_ankle_pitch",
"right_ankle_roll",
# Waist (12-14)
"waist_yaw",
"waist_roll",
"waist_pitch",
# Left arm (15-21)
"left_shoulder_pitch",
"left_shoulder_roll",
"left_shoulder_yaw",
"left_elbow",
"left_wrist_roll",
"left_wrist_pitch",
"left_wrist_yaw",
# Right arm (22-28)
"right_shoulder_pitch",
"right_shoulder_roll",
"right_shoulder_yaw",
"right_elbow",
"right_wrist_roll",
"right_wrist_pitch",
"right_wrist_yaw",
]


def make_humanoid_joints(hardware_id: HardwareId) -> list[JointName]:
"""Create joint names for a 29-DOF humanoid.

Covers 6-DOF legs, 3-DOF waist, and 7-DOF arms.

Args:
hardware_id: The hardware identifier (e.g., "g1")

Returns:
List of 29 joint names like ["g1/left_hip_pitch", ..., "g1/right_wrist_yaw"]
"""
return [f"{hardware_id}/{j}" for j in _HUMANOID_29DOF_JOINTS]
Comment thread
mustafab0 marked this conversation as resolved.


__all__ = [
"TWIST_SUFFIX_MAP",
"HardwareComponent",
Expand All @@ -144,6 +197,7 @@ def make_twist_base_joints(
"JointState",
"TaskName",
"make_gripper_joints",
"make_humanoid_joints",
"make_joints",
"make_twist_base_joints",
"split_joint_name",
Expand Down
58 changes: 51 additions & 7 deletions dimos/control/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
TaskName,
split_joint_name,
)
from dimos.control.hardware_interface import ConnectedHardware, ConnectedTwistBase
from dimos.control.hardware_interface import (
ConnectedHardware,
ConnectedTwistBase,
ConnectedWholeBody,
)
from dimos.control.task import ControlTask
from dimos.control.tick_loop import TickLoop
from dimos.core.core import rpc
Expand All @@ -50,6 +54,7 @@
TwistBaseAdapter,
)
from dimos.hardware.manipulators.spec import ManipulatorAdapter
from dimos.hardware.whole_body.spec import WholeBodyAdapter
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Twist import Twist
from dimos.msgs.sensor_msgs.JointState import JointState
Expand Down Expand Up @@ -210,8 +215,10 @@ def _setup_from_config(self) -> None:

def _setup_hardware(self, component: HardwareComponent) -> None:
"""Connect and add a single hardware adapter."""
adapter: ManipulatorAdapter | TwistBaseAdapter
if component.hardware_type == HardwareType.BASE:
adapter: ManipulatorAdapter | TwistBaseAdapter | WholeBodyAdapter
if component.hardware_type == HardwareType.WHOLE_BODY:
adapter = self._create_whole_body_adapter(component)
elif component.hardware_type == HardwareType.BASE:
adapter = self._create_twist_base_adapter(component)
else:
adapter = self._create_adapter(component)
Expand Down Expand Up @@ -252,6 +259,30 @@ def _create_twist_base_adapter(self, component: HardwareComponent) -> TwistBaseA
**component.adapter_kwargs,
)

def _create_whole_body_adapter(self, component: HardwareComponent) -> WholeBodyAdapter:
"""Create a whole-body adapter from component config.

``component.address`` carries the DDS network interface — int (CAN port)
or str ("enp60s0"); cyclonedds requires the right type, so try int() first
and fall back to keeping the original string.
"""
from dimos.hardware.whole_body.registry import whole_body_adapter_registry

addr: int | str | None = component.address
if addr is not None:
try:
addr = int(addr)
except ValueError:
pass # keep as string (e.g. "enp60s0")

return whole_body_adapter_registry.create(
component.adapter_type,
dof=len(component.joints),
hardware_id=component.hardware_id,
network_interface=addr if addr is not None else "",
**component.adapter_kwargs,
)

def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
Comment thread
mustafab0 marked this conversation as resolved.
"""Create a control task from config."""
task_type = cfg.type.lower()
Expand Down Expand Up @@ -334,13 +365,21 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
@rpc
def add_hardware(
self,
adapter: ManipulatorAdapter | TwistBaseAdapter,
adapter: ManipulatorAdapter | TwistBaseAdapter | WholeBodyAdapter,
component: HardwareComponent,
) -> bool:
"""Register a hardware adapter with the coordinator."""
is_base = component.hardware_type == HardwareType.BASE
is_whole_body = component.hardware_type == HardwareType.WHOLE_BODY

if is_base and not isinstance(adapter, TwistBaseAdapter):
raise TypeError(
f"Hardware type / adapter mismatch for '{component.hardware_id}': "
f"hardware_type={component.hardware_type.value} but got "
f"{type(adapter).__name__}"
)

if is_base != isinstance(adapter, TwistBaseAdapter):
if is_whole_body and not isinstance(adapter, WholeBodyAdapter):
raise TypeError(
f"Hardware type / adapter mismatch for '{component.hardware_id}': "
f"hardware_type={component.hardware_type.value} but got "
Expand All @@ -352,8 +391,13 @@ def add_hardware(
logger.warning(f"Hardware {component.hardware_id} already registered")
return False

if isinstance(adapter, TwistBaseAdapter):
connected: ConnectedHardware = ConnectedTwistBase(
if isinstance(adapter, WholeBodyAdapter):
connected: ConnectedHardware = ConnectedWholeBody(
adapter=adapter,
component=component,
)
elif isinstance(adapter, TwistBaseAdapter):
connected = ConnectedTwistBase(
adapter=adapter,
component=component,
)
Expand Down
125 changes: 111 additions & 14 deletions dimos/control/hardware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
if TYPE_CHECKING:
from dimos.control.components import HardwareComponent, HardwareId, JointName, JointState
from dimos.hardware.drive_trains.spec import TwistBaseAdapter
from dimos.hardware.whole_body.spec import MotorCommand, WholeBodyAdapter

logger = setup_logger()

Expand All @@ -54,15 +55,6 @@ def __init__(
adapter: ManipulatorAdapter,
component: HardwareComponent,
) -> None:
"""Initialize hardware interface.

Args:
adapter: ManipulatorAdapter instance (XArmAdapter, PiperAdapter, etc.)
component: Hardware component with joints config
"""
if not isinstance(adapter, ManipulatorAdapter):
raise TypeError("adapter must implement ManipulatorAdapter")

self._adapter = adapter
self._component = component
self._arm_joint_names: list[JointName] = list(component.joints)
Expand Down Expand Up @@ -248,11 +240,6 @@ def __init__(
adapter: TwistBaseAdapter,
component: HardwareComponent,
) -> None:
from dimos.hardware.drive_trains.spec import TwistBaseAdapter as TwistBaseAdapterProto

if not isinstance(adapter, TwistBaseAdapterProto):
raise TypeError("adapter must implement TwistBaseAdapter")

self._twist_adapter = adapter
self._component = component
self._joint_names = component.joints
Expand Down Expand Up @@ -319,7 +306,117 @@ def write_command(self, commands: dict[str, float], _mode: ControlMode) -> bool:
return self._twist_adapter.write_velocities(ordered)


_DEFAULT_KP: float = 40.0
_DEFAULT_KD: float = 3.0


class ConnectedWholeBody(ConnectedHardware):
"""Runtime wrapper for a whole-body motor platform connected to the coordinator.

Wraps a WholeBodyAdapter for joint-level motor control (any DOF count).

Key differences from ConnectedHardware:
- Reads joint state from MotorState (q, dq, tau)
- write_command converts position commands to MotorCommand with PD gains
- write_motor_commands provides direct pass-through to adapter
"""

_wb_adapter: WholeBodyAdapter

def __init__(
self,
adapter: WholeBodyAdapter,
component: HardwareComponent,
) -> None:
self._wb_adapter = adapter
self._component = component
self._joint_names = component.joints

self._last_commanded: dict[str, float] = {}
self._initialized = False
self._warned_unknown_joints: set[str] = set()
self._current_mode: ControlMode | None = None

@property
def adapter(self) -> WholeBodyAdapter: # type: ignore[override]
Comment thread
paul-nechifor marked this conversation as resolved.
"""The underlying whole-body adapter."""
return self._wb_adapter

def disconnect(self) -> None:
"""Disconnect the underlying adapter."""
self._wb_adapter.disconnect()

def read_state(self) -> dict[JointName, JointState]:
"""Read motor states as {joint_name: JointState}."""
from dimos.control.components import JointState

motor_states = self._wb_adapter.read_motor_states()
return {
name: JointState(
position=motor_states[i].q,
velocity=motor_states[i].dq,
effort=motor_states[i].tau,
)
for i, name in enumerate(self._joint_names)
}

def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool:
"""Write position commands — converts to MotorCommand with PD gains.

Only POSITION / SERVO_POSITION are supported; other modes are warned
and dropped (matches ConnectedHardware's warn-and-skip pattern).
"""
from dimos.hardware.whole_body.spec import MotorCommand

if mode not in (ControlMode.POSITION, ControlMode.SERVO_POSITION):
logger.warning(
f"WholeBody {self.hardware_id} only supports POSITION/SERVO_POSITION; "
f"got {mode.name} — skipping"
)
return False

if not self._initialized and not self._try_initialize_last_commanded():
return False

for joint_name, value in commands.items():
if joint_name in self._joint_names:
self._last_commanded[joint_name] = value
elif joint_name not in self._warned_unknown_joints:
logger.warning(
f"WholeBody {self.hardware_id} received command for unknown joint "
f"{joint_name}. Valid joints: {self._joint_names}"
)
self._warned_unknown_joints.add(joint_name)

motor_cmds = [
MotorCommand(
q=self._last_commanded[name],
dq=0.0,
kp=_DEFAULT_KP,
kd=_DEFAULT_KD,
tau=0.0,
)
for name in self._joint_names
]
return self._wb_adapter.write_motor_commands(motor_cmds)

def write_motor_commands(self, commands: list[MotorCommand]) -> bool:
"""Direct pass-through to adapter for full MotorCommand control."""
return self._wb_adapter.write_motor_commands(commands)

def _try_initialize_last_commanded(self) -> bool:
"""Non-blocking init. Returns True once motor_states is cached."""
if not self._wb_adapter.has_motor_states():
return False
states = self._wb_adapter.read_motor_states()
for i, name in enumerate(self._joint_names):
self._last_commanded[name] = states[i].q
self._initialized = True
return True


__all__ = [
"ConnectedHardware",
"ConnectedTwistBase",
"ConnectedWholeBody",
]
Loading
Loading