diff --git a/bin/pytest-slow b/bin/pytest-slow index a0057410fc..9f9d5ae611 100755 --- a/bin/pytest-slow +++ b/bin/pytest-slow @@ -3,4 +3,4 @@ set -euo pipefail . .venv/bin/activate -exec pytest --numprocesses=auto "$@" -m 'not (tool or mujoco)' dimos +exec pytest "$@" -m 'not (tool or mujoco)' dimos diff --git a/dimos/core/coordination/test_worker.py b/dimos/core/coordination/test_worker.py index 2bc2abca8b..41a7c2734f 100644 --- a/dimos/core/coordination/test_worker.py +++ b/dimos/core/coordination/test_worker.py @@ -190,6 +190,9 @@ class CapturingLogger: def log_stats(self, coordinator, workers): captured.append(workers) + def stop(self): + pass + monitor = StatsMonitor(manager, resource_logger=CapturingLogger(), interval=0.5) monitor.start() import time diff --git a/dimos/core/resource_monitor/logger.py b/dimos/core/resource_monitor/logger.py index 88f79b6db8..b4668f7198 100644 --- a/dimos/core/resource_monitor/logger.py +++ b/dimos/core/resource_monitor/logger.py @@ -28,10 +28,15 @@ class ResourceLogger(Protocol): def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: ... + def stop(self) -> None: ... + class StructlogResourceLogger: """Default implementation — logs resource stats via structlog info.""" + def stop(self) -> None: + pass + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: logger.info( "coordinator", @@ -65,6 +70,9 @@ def __init__(self, topic: str = "/dimos/resource_stats") -> None: self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic) + def stop(self) -> None: + self._transport.stop() + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: self._transport.broadcast( None, diff --git a/dimos/core/resource_monitor/monitor.py b/dimos/core/resource_monitor/monitor.py index 692c597d43..d03f4b849a 100644 --- a/dimos/core/resource_monitor/monitor.py +++ b/dimos/core/resource_monitor/monitor.py @@ -95,6 +95,7 @@ def stop(self) -> None: if self._thread is not None: self._thread.join(timeout=5.0) self._thread = None + self._logger.stop() def _loop(self) -> None: while not self._stop.wait(self._interval): diff --git a/dimos/navigation/nav_stack/tests/conftest.py b/dimos/navigation/nav_stack/tests/conftest.py index bf687ac7c9..f17df64afd 100644 --- a/dimos/navigation/nav_stack/tests/conftest.py +++ b/dimos/navigation/nav_stack/tests/conftest.py @@ -22,14 +22,13 @@ from __future__ import annotations +import asyncio import math from pathlib import Path -import threading import time import lcm as lcmlib -from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT from dimos.core.coordination.blueprints import Blueprint from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.msgs.geometry_msgs.PointStamped import PointStamped @@ -76,13 +75,11 @@ def _clear_precomputed_paths() -> None: path.unlink(missing_ok=True) -def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None = None) -> None: - """Build the coordinator, drive the cross-wall waypoint sequence, tear down.""" +async def _run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None) -> None: _clear_precomputed_paths() coordinator = ModuleCoordinator.build(blueprint) - lock = threading.Lock() odom_count = 0 robot_x = 0.0 robot_y = 0.0 @@ -94,52 +91,43 @@ def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None def _odom_handler(_channel: str, data: bytes) -> None: nonlocal odom_count, robot_x, robot_y, robot_z, max_z_seen msg = Odometry.lcm_decode(data) - with lock: - odom_count += 1 - robot_x = msg.x - robot_y = msg.y - robot_z = msg.pose.position.z - if robot_z > max_z_seen: - max_z_seen = robot_z + odom_count += 1 + robot_x = msg.x + robot_y = msg.y + robot_z = msg.pose.position.z + if robot_z > max_z_seen: + max_z_seen = robot_z subscription = lcm.subscribe(ODOM_TOPIC, _odom_handler) - lcm_stop = threading.Event() + loop = asyncio.get_running_loop() + lcm_fd = lcm.fileno() - def _lcm_loop() -> None: - while not lcm_stop.is_set(): - try: - lcm.handle_timeout(100) - except Exception: - # Don't spin forever waiting on odom that will never arrive. - logger.exception("LCM handle_timeout failed; stopping loop") - lcm_stop.set() - return + def _on_lcm_readable() -> None: + try: + lcm.handle() + except Exception: + logger.exception("LCM handle failed; removing reader to stop further polling") + loop.remove_reader(lcm_fd) - lcm_thread = threading.Thread(target=_lcm_loop, daemon=True) - lcm_thread.start() + loop.add_reader(lcm_fd, _on_lcm_readable) try: logger.info(f"[{label}] Blueprint started, waiting for odom…") deadline = time.monotonic() + ODOM_WAIT_SEC - while time.monotonic() < deadline: - with lock: - if odom_count > 0: - break - time.sleep(0.5) + while time.monotonic() < deadline and odom_count == 0: + await asyncio.sleep(0.5) - with lock: - assert odom_count > 0, f"No odometry received after {ODOM_WAIT_SEC}s — sim not running?" - initial_x, initial_y = robot_x, robot_y + assert odom_count > 0, f"No odometry received after {ODOM_WAIT_SEC}s — sim not running?" + initial_x, initial_y = robot_x, robot_y logger.info(f"[{label}] Odom online. Robot at ({initial_x:.2f}, {initial_y:.2f})") logger.info(f"[{label}] Warming up for {WARMUP_SEC}s…") - time.sleep(WARMUP_SEC) + await asyncio.sleep(WARMUP_SEC) for name, goal_x, goal_y, goal_z, timeout_sec, threshold in CROSS_WALL_WAYPOINTS: - with lock: - start_x, start_y = robot_x, robot_y + start_x, start_y = robot_x, robot_y logger.info( f"[{label}] === {name}: goal ({goal_x}, {goal_y}) | " @@ -156,10 +144,9 @@ def _lcm_loop() -> None: current_x, current_y = start_x, start_y distance = _distance(current_x, current_y, goal_x, goal_y) while True: - with lock: - current_x, current_y = robot_x, robot_y - current_z = robot_z - current_max_z = max_z_seen + current_x, current_y = robot_x, robot_y + current_z = robot_z + current_max_z = max_z_seen if max_z is not None: assert current_z <= max_z, ( @@ -176,7 +163,7 @@ def _lcm_loop() -> None: break if elapsed >= timeout_sec: break - time.sleep(GOAL_POLL_INTERVAL_SEC) + await asyncio.sleep(GOAL_POLL_INTERVAL_SEC) assert reached, ( f"{name}: robot did not reach ({goal_x}, {goal_y}) within {timeout_sec}s. " @@ -184,16 +171,17 @@ def _lcm_loop() -> None: ) if max_z is not None: - with lock: - final_max_z = max_z_seen - assert final_max_z <= max_z, ( - f"Robot z peaked at {final_max_z:.2f}m during the run " + assert max_z_seen <= max_z, ( + f"Robot z peaked at {max_z_seen:.2f}m during the run " f"(limit {max_z}m) — went through the ceiling" ) finally: - lcm_stop.set() - lcm_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) - assert not lcm_thread.is_alive(), "LCM loop thread didn't exit cleanly" + loop.remove_reader(lcm_fd) lcm.unsubscribe(subscription) coordinator.stop() + + +def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None = None) -> None: + """Build the coordinator, drive the cross-wall waypoint sequence, tear down.""" + asyncio.run(_run_cross_wall_test(blueprint, label=label, max_z=max_z)) diff --git a/dimos/navigation/nav_stack/tests/test_cross_wall_planning_far.py b/dimos/navigation/nav_stack/tests/test_cross_wall_planning_far.py index e2f22b1356..4b48ab4bbe 100644 --- a/dimos/navigation/nav_stack/tests/test_cross_wall_planning_far.py +++ b/dimos/navigation/nav_stack/tests/test_cross_wall_planning_far.py @@ -26,7 +26,7 @@ unitree_g1_nav_sim, ) -pytestmark = [pytest.mark.skipif_in_ci, pytest.mark.skipif_macos] +pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos] class TestCrossWallPlanning: @@ -36,5 +36,5 @@ def test_cross_wall_sequence(self) -> None: blueprint = autoconnect( unitree_g1_nav_sim, create_nav_stack(**{**nav_config, "planner": "far"}), - ).global_config(dtop=True) + ).global_config() run_cross_wall_test(blueprint, label="far") diff --git a/dimos/navigation/nav_stack/tests/test_cross_wall_planning_simple.py b/dimos/navigation/nav_stack/tests/test_cross_wall_planning_simple.py index 4fd8ccbe98..7d4e839f9c 100644 --- a/dimos/navigation/nav_stack/tests/test_cross_wall_planning_simple.py +++ b/dimos/navigation/nav_stack/tests/test_cross_wall_planning_simple.py @@ -26,7 +26,7 @@ unitree_g1_nav_sim, ) -pytestmark = [pytest.mark.skipif_in_ci, pytest.mark.skipif_macos] +pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos] class TestCrossWallPlanningSimple: @@ -36,5 +36,5 @@ def test_cross_wall_sequence_simple(self) -> None: blueprint = autoconnect( unitree_g1_nav_sim, create_nav_stack(**{**nav_config, "planner": "simple"}), - ).global_config(dtop=True) + ).global_config() run_cross_wall_test(blueprint, label="simple") diff --git a/dimos/simulation/unity/test_unity_sim.py b/dimos/simulation/unity/test_unity_sim.py index 4874535e6c..2b0c2572f1 100644 --- a/dimos/simulation/unity/test_unity_sim.py +++ b/dimos/simulation/unity/test_unity_sim.py @@ -54,6 +54,8 @@ _is_linux_x86 = platform.system() == "Linux" and platform.machine() in ("x86_64", "AMD64") _has_display = bool(os.environ.get("DISPLAY")) +pytestmark = pytest.mark.self_hosted + class _MockTransport: def __init__(self): diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index 0a2cb562ce..5bea65afcb 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -166,7 +166,9 @@ def _default_blueprint() -> Blueprint: class Config(ModuleConfig): pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()]) - visual_override: dict[Glob | str, Callable[[Any], Archetype]] = field(default_factory=dict) + visual_override: dict[Glob | str, Callable[[Any], Archetype] | None] = field( + default_factory=dict + ) static: dict[str, Callable[[Any], Archetype]] = field(default_factory=dict) max_hz: dict[str, float] = field(default_factory=dict) diff --git a/docs/development/testing.md b/docs/development/testing.md index 1ff6d1631b..bcb2b1f69f 100644 --- a/docs/development/testing.md +++ b/docs/development/testing.md @@ -55,7 +55,7 @@ The default `addopts` in `pyproject.toml` includes a `-m` filter that excludes ` ./bin/pytest-slow ``` -(Shortcut for `pytest --numprocesses=auto -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.) +(Shortcut for `pytest -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.) When writing or debugging a specific self-hosted test, override `-m` yourself to run it: