mcp: add wait_for_event tool + resource subscriptions to bridge#23
Open
mcp: add wait_for_event tool + resource subscriptions to bridge#23
Conversation
Two new event-delivery surfaces on the device-connect-agent-tools MCP
bridge, both backed by the same EventSubscriptionManager. Either gives
a dispatcher agent a way to know when a remote device emits an event
(progress / work_done / work_failed / etc.) without polling a status
function in a loop.
wait_for_event tool
- One-call wait for a Device Connect event from a worker. Filter by
event_name (e.g. "work_done") and/or match_params (e.g.
{"task_id": "T-42"}). Returns the matched event payload
({device_id, event_name, params}) or {"timeout": true}.
- Race-safe: a 32-event-per-device ring buffer holds recent events.
If a matching event already fired before the wait call lands (the
common dispatch→wait race for fast tasks), the tool returns
immediately from the buffer. Otherwise blocks up to timeout_seconds
for a future event.
- Pre-warm: every invoke_device call ensures a fabric subscription
exists for the target device before the RPC reply, so events fired
during the invocation always land in the ring buffer for a
subsequent wait. Pinned subs survive waiter timeouts and
resource unsubscribes — released only on bridge shutdown.
Resource subscriptions
- New resource template events://devices/{device_id}/latest exposes
the most recent event payload for a given device.
- Bridge handles resources/subscribe and pushes
notifications/resources/updated to subscribed sessions when events
arrive on the fabric. Refcounted: one fabric subscription per
device, shared across multiple session subscribers.
- get_capabilities patched to advertise resources.subscribe=true (the
underlying MCP SDK hardcodes False; FastMCP doesn't expose a clean
override path).
Tool description tightening
- invoke_device: nudges agents to prefer wait_for_event over
re-invoking a status function in a poll loop.
- wait_for_event: spells out everyday triggers ("wait", "tell me
when", "block until"), explains race-safety, and includes a TIP
for waiting on terminal events on coding workers.
EventSubscriptionManager (mcp/event_subscriptions.py) is the underlying
implementation — encapsulates the ring buffer, waiter queues, fabric
subscription lifecycle, and resource-subscribe state. Reusable for
clients that want the push surface without going through the bridge.
Tests: 24 unit cases cover URI parse/build, JSON-RPC + bare-dict +
malformed event payloads, Zenoh-style separator parsing, subscribe +
fire + notify, multi-session refcounting, unsubscribe-then-tear-down,
no-op for unrecognized URIs, dispatch→wait race resolution via ring
buffer, ring-buffer match skipping non-matching events, pinned fabric
sub surviving waiter timeouts and released on close().
Two issues surfaced by PR #23 CI: 1. Lint: `device_id_from_uri` was imported into bridge.py but never used after refactoring the resource handler. Removed. 2. Fuzz tests: collection failed with ModuleNotFoundError on `mcp`. The fuzz CI job installs [dev,fuzz] but not [mcp]. Importing any submodule under device_connect_agent_tools.mcp triggers mcp/__init__.py → bridge.py → event_subscriptions.py, which eagerly imported `mcp.server.lowlevel.server.request_ctx` at module top level. Mirror bridge.py's existing fastmcp guard: wrap the mcp imports in try/except, set _MCP_AVAILABLE flag, and let the bridge's own fastmcp guard (which is checked before EventSubscriptionManager is ever instantiated) be the gatekeeper at runtime. The fuzz tests that exercise schema/jsonrpc parsers have no functional dependency on the mcp module — they just need the imports not to blow up at collection time. ServerSession references in type hints rely on `from __future__ import annotations` (already present), so they stay as forward refs and don't need their own guard.
soupat
added a commit
that referenced
this pull request
Apr 26, 2026
Two issues surfaced by PR #23 CI: 1. Lint: `device_id_from_uri` was imported into bridge.py but never used after refactoring the resource handler. Removed. 2. Fuzz tests: collection failed with ModuleNotFoundError on `mcp`. The fuzz CI job installs [dev,fuzz] but not [mcp]. Importing any submodule under device_connect_agent_tools.mcp triggers mcp/__init__.py → bridge.py → event_subscriptions.py, which eagerly imported `mcp.server.lowlevel.server.request_ctx` at module top level. Mirror bridge.py's existing fastmcp guard: wrap the mcp imports in try/except, set _MCP_AVAILABLE flag, and let the bridge's own fastmcp guard (which is checked before EventSubscriptionManager is ever instantiated) be the gatekeeper at runtime. The fuzz tests that exercise schema/jsonrpc parsers have no functional dependency on the mcp module — they just need the imports not to blow up at collection time. ServerSession references in type hints rely on `from __future__ import annotations` (already present), so they stay as forward refs and don't need their own guard.
Pre-existing flake on main (run 24551372160 on 2026-04-17 hit the same assertion) — surfaced again on PR #23. The test waited a fixed 0.5s after spawning two D2D sensors and then called discover_devices once. Under Zenoh CI load, both peers' presence announcements don't always propagate within that window, so discover_devices sees only one of the two. The miss is non-deterministic — different runs miss different sensors. Replace the single fixed-sleep + single discover with a poll-until- both-visible loop with a 10s deadline: - pass refresh=True so the agent-tools 30s cache doesn't mask peers that show up between polls - 0.5s between polls is plenty given Zenoh peer convergence is sub-second once the first announcement lands - assertion messages now show what's actually in the device list when a real failure happens Test scope unchanged: still asserts both sensors are eventually visible. Just gives Zenoh enough time to converge.
soupat
added a commit
that referenced
this pull request
Apr 26, 2026
Pre-existing flake on main (run 24551372160 on 2026-04-17 hit the same assertion) — surfaced again on PR #23. The test waited a fixed 0.5s after spawning two D2D sensors and then called discover_devices once. Under Zenoh CI load, both peers' presence announcements don't always propagate within that window, so discover_devices sees only one of the two. The miss is non-deterministic — different runs miss different sensors. Replace the single fixed-sleep + single discover with a poll-until- both-visible loop with a 10s deadline: - pass refresh=True so the agent-tools 30s cache doesn't mask peers that show up between polls - 0.5s between polls is plenty given Zenoh peer convergence is sub-second once the first announcement lands - assertion messages now show what's actually in the device list when a real failure happens Test scope unchanged: still asserts both sensors are eventually visible. Just gives Zenoh enough time to converge.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two new event-delivery surfaces on the
device-connect-agent-toolsMCPbridge, both backed by the same
EventSubscriptionManager. Either lets adispatcher agent learn when a remote device emits an event
(
progress/work_done/work_failed/ …) without polling a statusfunction in a loop.
wait_for_eventtoolevent_name(e.g.
work_done) and/ormatch_params(e.g.{"task_id": "T-42"}).Returns the matched event payload or
{"timeout": true}.If the matching event already fired before the wait call lands (the
common dispatch→wait race for fast tasks), the tool returns
immediately from the buffer. Otherwise blocks up to
timeout_secondsfor a future event.
invoke_devicecall ensures a fabric subscriptionexists for the target device before the RPC reply, so events fired
during the invocation always land in the ring buffer for a subsequent
wait. Pinned subs survive waiter timeouts and resource unsubscribes —
released only on bridge shutdown.
Resource subscriptions
events://devices/{device_id}/latestexposesthe most recent event payload for a given device.
resources/subscribeand pushesnotifications/resources/updatedto subscribed sessions when eventsarrive. Refcounted: one fabric subscription per device, shared across
multiple session subscribers.
get_capabilitiespatched to advertiseresources.subscribe=true(the underlying MCP SDK hardcodes
False; FastMCP doesn't expose aclean override path).
Tool description tightening
invoke_device: nudges agents to preferwait_for_eventoverre-invoking a status function in a poll loop.
wait_for_event: spells out everyday triggers ("wait", "tell mewhen", "block until"), explains race-safety, and includes a TIP
for waiting on terminal events.
EventSubscriptionManager(mcp/event_subscriptions.py) is theunderlying implementation — encapsulates the ring buffer, waiter
queues, fabric subscription lifecycle, and resource-subscribe state.
Reusable for clients that want the push surface without going through
the bridge.
Why both surfaces
CLI MCP clients (codex CLI, Claude Code) don't actively use
resources/subscribetoday — they call tools.wait_for_eventcoversthat case with a single tool call. The resource subscription path is
useful for MCP clients that do subscribe natively (Claude Desktop) and
is future-proofing for CLI clients as their support catches up. Both
surfaces share the same fabric subscription and ring buffer, so adding
the second surface costs nothing on the wire.
Test plan
24 unit tests in
tests/test_mcp_bridge_subscriptions.pycover:uri_for_device,device_id_from_uri) including 5 garbage-input rejections.) and Zenoh-style (/) separatorswait_for_eventreturns matching eventwait_for_eventfilters byevent_namewait_for_eventfilters bymatch_paramswait_for_eventreturnsNoneon timeoutwait_for_eventreleases fabric sub on lone-waiter timeoutwait_for_eventkeeps fabric sub when a session subscription still holds itensure_fabric_subpins the sub; survives waiter timeout; released onclose()wait_for_eventis called → ring-buffer hit returns immediatelyLive-verified end-to-end against a Jetson worker:
wait_for_eventresolves immediately from the ring buffer for fast tasks (3/3 race-repro runs pass).notifications/resources/updatedarrives at a Python MCP client subscribed toevents://devices/jetson-01/latest;resources/readreturns the payload.