Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/pytest-slow
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
set -euo pipefail

. .venv/bin/activate
exec pytest --numprocesses=auto "$@" -m 'not (tool or mujoco)' dimos
exec pytest "$@" -m 'not (tool or mujoco)' dimos
3 changes: 3 additions & 0 deletions dimos/core/coordination/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ class CapturingLogger:
def log_stats(self, coordinator, workers):
captured.append(workers)

def stop(self):
pass

monitor = StatsMonitor(manager, resource_logger=CapturingLogger(), interval=0.5)
monitor.start()
import time
Expand Down
8 changes: 8 additions & 0 deletions dimos/core/resource_monitor/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@
class ResourceLogger(Protocol):
def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: ...

def stop(self) -> None: ...


class StructlogResourceLogger:
"""Default implementation — logs resource stats via structlog info."""

def stop(self) -> None:
pass

def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None:
logger.info(
"coordinator",
Expand Down Expand Up @@ -65,6 +70,9 @@ def __init__(self, topic: str = "/dimos/resource_stats") -> None:

self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic)

def stop(self) -> None:
self._transport.stop()

def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None:
self._transport.broadcast(
None,
Expand Down
1 change: 1 addition & 0 deletions dimos/core/resource_monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def stop(self) -> None:
if self._thread is not None:
self._thread.join(timeout=5.0)
self._thread = None
self._logger.stop()

def _loop(self) -> None:
while not self._stop.wait(self._interval):
Expand Down
82 changes: 35 additions & 47 deletions dimos/navigation/nav_stack/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@

from __future__ import annotations

import asyncio
import math
from pathlib import Path
import threading
import time

import lcm as lcmlib

from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT
from dimos.core.coordination.blueprints import Blueprint
from dimos.core.coordination.module_coordinator import ModuleCoordinator
from dimos.msgs.geometry_msgs.PointStamped import PointStamped
Expand Down Expand Up @@ -76,13 +75,11 @@ def _clear_precomputed_paths() -> None:
path.unlink(missing_ok=True)


def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None = None) -> None:
"""Build the coordinator, drive the cross-wall waypoint sequence, tear down."""
async def _run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None) -> None:
_clear_precomputed_paths()

coordinator = ModuleCoordinator.build(blueprint)

lock = threading.Lock()
odom_count = 0
robot_x = 0.0
robot_y = 0.0
Expand All @@ -94,52 +91,43 @@ def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None
def _odom_handler(_channel: str, data: bytes) -> None:
nonlocal odom_count, robot_x, robot_y, robot_z, max_z_seen
msg = Odometry.lcm_decode(data)
with lock:
odom_count += 1
robot_x = msg.x
robot_y = msg.y
robot_z = msg.pose.position.z
if robot_z > max_z_seen:
max_z_seen = robot_z
odom_count += 1
robot_x = msg.x
robot_y = msg.y
robot_z = msg.pose.position.z
if robot_z > max_z_seen:
max_z_seen = robot_z

subscription = lcm.subscribe(ODOM_TOPIC, _odom_handler)

lcm_stop = threading.Event()
loop = asyncio.get_running_loop()
lcm_fd = lcm.fileno()

def _lcm_loop() -> None:
while not lcm_stop.is_set():
try:
lcm.handle_timeout(100)
except Exception:
# Don't spin forever waiting on odom that will never arrive.
logger.exception("LCM handle_timeout failed; stopping loop")
lcm_stop.set()
return
def _on_lcm_readable() -> None:
try:
lcm.handle()
except Exception:
logger.exception("LCM handle failed; removing reader to stop further polling")
loop.remove_reader(lcm_fd)

lcm_thread = threading.Thread(target=_lcm_loop, daemon=True)
lcm_thread.start()
loop.add_reader(lcm_fd, _on_lcm_readable)

try:
logger.info(f"[{label}] Blueprint started, waiting for odom…")

deadline = time.monotonic() + ODOM_WAIT_SEC
while time.monotonic() < deadline:
with lock:
if odom_count > 0:
break
time.sleep(0.5)
while time.monotonic() < deadline and odom_count == 0:
await asyncio.sleep(0.5)

with lock:
assert odom_count > 0, f"No odometry received after {ODOM_WAIT_SEC}s — sim not running?"
initial_x, initial_y = robot_x, robot_y
assert odom_count > 0, f"No odometry received after {ODOM_WAIT_SEC}s — sim not running?"
initial_x, initial_y = robot_x, robot_y

logger.info(f"[{label}] Odom online. Robot at ({initial_x:.2f}, {initial_y:.2f})")
logger.info(f"[{label}] Warming up for {WARMUP_SEC}s…")
time.sleep(WARMUP_SEC)
await asyncio.sleep(WARMUP_SEC)

for name, goal_x, goal_y, goal_z, timeout_sec, threshold in CROSS_WALL_WAYPOINTS:
with lock:
start_x, start_y = robot_x, robot_y
start_x, start_y = robot_x, robot_y

logger.info(
f"[{label}] === {name}: goal ({goal_x}, {goal_y}) | "
Expand All @@ -156,10 +144,9 @@ def _lcm_loop() -> None:
current_x, current_y = start_x, start_y
distance = _distance(current_x, current_y, goal_x, goal_y)
while True:
with lock:
current_x, current_y = robot_x, robot_y
current_z = robot_z
current_max_z = max_z_seen
current_x, current_y = robot_x, robot_y
current_z = robot_z
current_max_z = max_z_seen

if max_z is not None:
assert current_z <= max_z, (
Expand All @@ -176,24 +163,25 @@ def _lcm_loop() -> None:
break
if elapsed >= timeout_sec:
break
time.sleep(GOAL_POLL_INTERVAL_SEC)
await asyncio.sleep(GOAL_POLL_INTERVAL_SEC)

assert reached, (
f"{name}: robot did not reach ({goal_x}, {goal_y}) within {timeout_sec}s. "
f"Final pos=({current_x:.2f}, {current_y:.2f}), dist={distance:.2f}m"
)

if max_z is not None:
with lock:
final_max_z = max_z_seen
assert final_max_z <= max_z, (
f"Robot z peaked at {final_max_z:.2f}m during the run "
assert max_z_seen <= max_z, (
f"Robot z peaked at {max_z_seen:.2f}m during the run "
f"(limit {max_z}m) — went through the ceiling"
)

finally:
lcm_stop.set()
lcm_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
assert not lcm_thread.is_alive(), "LCM loop thread didn't exit cleanly"
loop.remove_reader(lcm_fd)
lcm.unsubscribe(subscription)
coordinator.stop()


def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None = None) -> None:
"""Build the coordinator, drive the cross-wall waypoint sequence, tear down."""
asyncio.run(_run_cross_wall_test(blueprint, label=label, max_z=max_z))
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
unitree_g1_nav_sim,
)

pytestmark = [pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]
pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]


class TestCrossWallPlanning:
Expand All @@ -36,5 +36,5 @@ def test_cross_wall_sequence(self) -> None:
blueprint = autoconnect(
unitree_g1_nav_sim,
create_nav_stack(**{**nav_config, "planner": "far"}),
).global_config(dtop=True)
).global_config()
run_cross_wall_test(blueprint, label="far")
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
unitree_g1_nav_sim,
)

pytestmark = [pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]
pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]


class TestCrossWallPlanningSimple:
Expand All @@ -36,5 +36,5 @@ def test_cross_wall_sequence_simple(self) -> None:
blueprint = autoconnect(
unitree_g1_nav_sim,
create_nav_stack(**{**nav_config, "planner": "simple"}),
).global_config(dtop=True)
).global_config()
run_cross_wall_test(blueprint, label="simple")
2 changes: 2 additions & 0 deletions dimos/simulation/unity/test_unity_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
_is_linux_x86 = platform.system() == "Linux" and platform.machine() in ("x86_64", "AMD64")
_has_display = bool(os.environ.get("DISPLAY"))

pytestmark = pytest.mark.self_hosted


class _MockTransport:
def __init__(self):
Expand Down
4 changes: 3 additions & 1 deletion dimos/visualization/rerun/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def _default_blueprint() -> Blueprint:
class Config(ModuleConfig):
pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()])

visual_override: dict[Glob | str, Callable[[Any], Archetype]] = field(default_factory=dict)
visual_override: dict[Glob | str, Callable[[Any], Archetype] | None] = field(
default_factory=dict
)
static: dict[str, Callable[[Any], Archetype]] = field(default_factory=dict)
max_hz: dict[str, float] = field(default_factory=dict)

Expand Down
2 changes: 1 addition & 1 deletion docs/development/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The default `addopts` in `pyproject.toml` includes a `-m` filter that excludes `
./bin/pytest-slow
```

(Shortcut for `pytest --numprocesses=auto -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.)
(Shortcut for `pytest -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.)

When writing or debugging a specific self-hosted test, override `-m` yourself to run it:

Expand Down
Loading