Skip to content

Add broadcast, subscribe_buffered, and get_inbox to agent-tools#2

Merged
atsyplikhin merged 2 commits into
mainfrom
feat/agent-tools-broadcast-subscribe
Mar 14, 2026
Merged

Add broadcast, subscribe_buffered, and get_inbox to agent-tools#2
atsyplikhin merged 2 commits into
mainfrom
feat/agent-tools-broadcast-subscribe

Conversation

@atsyplikhin
Copy link
Copy Markdown
Collaborator

Summary

  • Adds broadcast() to invoke a function on all discovered devices, collecting results
  • Adds subscribe_buffered() using subscribe_with_subject to buffer messages as (subject, data) tuples — matching Zenoh mesh inbox format
  • Adds unsubscribe_buffered() and get_inbox() for reading/managing buffered subscriptions

These methods close the DX gap between the agent-tools connection and the Zenoh mesh, enabling strands-robots to use Device Connect as a drop-in replacement for fleet coordination (see strands-gtc-nvidia#feat/device-connect-integration).

Test plan

  • Existing unit tests pass (pytest tests/test_connection_unit.py tests/test_tools_unit.py -v)
  • Integration tests with Docker infra (pytest tests/ -v -m "not llm")
  • Verified inbox tuple format matches Zenoh mesh (topic, data) convention

…ction

Adds fleet-level methods to _DeviceConnectConnection matching the Zenoh
mesh DX: broadcast() invokes a function on all discovered devices,
subscribe_buffered() buffers messages as (subject, data) tuples matching
Zenoh mesh inbox format, and get_inbox() returns buffered messages.
- Replace hardcoded nats_url in test_strands_agent.py with messaging_url
  fixture so Zenoh-parameterized runs use the correct URL
- Increase SETTLE_TIME in test_sensor_device.py from 0.3s to 0.5s to
  reduce Zenoh D2D discovery flakiness under load
@atsyplikhin atsyplikhin merged commit 82e03b6 into main Mar 14, 2026
7 checks passed
soupat added a commit that referenced this pull request May 19, 2026
Addresses items #1 and #2 from atsyplikhin's PR #29 review.

#1 — broadcast() now emits a logger.warning when any matched row carries
labels.safety == "critical". Per the discovery design doc this layer is
explicitly advisory, not a gate, because broadcast IS the canonical
async-critical workload (ESTOP) and must not be blocked. The WARN
surfaces typo'd selectors or buggy where-masks that sweep across
critical functions; edge self-election means the agent never sees the
matched set otherwise.

Log line shape (matches the sibling [broadcast::%s::%d targets]
bracket convention used elsewhere in the function):

    [broadcast::<fn>::<correlation_id>] matched N row(s) labeled
    safety:critical (sample devices: [...]); where=... proceeding
    (advisory only)

#2 — docs/discovery.md broadcast worked-example updated from the bare
``where="mask[seat_row][seat_col] == 1 && status.battery > 30"`` form
to the namespaced ``where="bindings.mask[identity.seat_row][identity.seat_col] == 1 && status.battery > 30"``.
The PR's predicate context (identity, labels, status, bindings) is the
right design — collision-safe, mirrors the K8s admission request.object
idiom — so the example, not the implementation, is what was out of step.

Tests: 4 new unit tests in TestBroadcastSafetyCriticalAdvisory cover
the WARN-emission positive case, the no-critical-rows negative case,
the return-envelope-unchanged invariant, and where= truncation. 1 new
integration test in test_tools_broadcast.py spawns a robot (whose
dispatch_robot carries safety:critical) and pins the advisory
end-to-end on real NATS + Zenoh backends, asserting replies still
arrive (advisory does not block).

Review-chain follow-ups applied before commit: symmetric repr() of
where=, square-bracket prefix, caplog filter pinned to the tools
logger by name, sample_ids deduped before slicing, and the
implementation comment trimmed to intent only.
soupat added a commit that referenced this pull request May 19, 2026
Addresses items #1 and #2 from atsyplikhin's PR #29 review.

#1 — broadcast() now emits a logger.warning when any matched row carries
labels.safety == "critical". Per the discovery design doc this layer is
explicitly advisory, not a gate, because broadcast IS the canonical
async-critical workload (ESTOP) and must not be blocked. The WARN
surfaces typo'd selectors or buggy where-masks that sweep across
critical functions; edge self-election means the agent never sees the
matched set otherwise.

Log line shape (matches the sibling [broadcast::%s::%d targets]
bracket convention used elsewhere in the function):

    [broadcast::<fn>::<correlation_id>] matched N row(s) labeled
    safety:critical (sample devices: [...]); where=... proceeding
    (advisory only)

#2 — docs/discovery.md broadcast worked-example updated from the bare
``where="mask[seat_row][seat_col] == 1 && status.battery > 30"`` form
to the namespaced ``where="bindings.mask[identity.seat_row][identity.seat_col] == 1 && status.battery > 30"``.
The PR's predicate context (identity, labels, status, bindings) is the
right design — collision-safe, mirrors the K8s admission request.object
idiom — so the example, not the implementation, is what was out of step.

Tests: 4 new unit tests in TestBroadcastSafetyCriticalAdvisory cover
the WARN-emission positive case, the no-critical-rows negative case,
the return-envelope-unchanged invariant, and where= truncation. 1 new
integration test in test_tools_broadcast.py spawns a robot (whose
dispatch_robot carries safety:critical) and pins the advisory
end-to-end on real NATS + Zenoh backends, asserting replies still
arrive (advisory does not block).

Review-chain follow-ups applied before commit: symmetric repr() of
where=, square-bracket prefix, caplog filter pinned to the tools
logger by name, sample_ids deduped before slicing, and the
implementation comment trimmed to intent only.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant