feat(discovery): operations stack — invoke, broadcast, subscribe, where, CLI#29
Open
soupat wants to merge 8 commits into
Open
feat(discovery): operations stack — invoke, broadcast, subscribe, where, CLI#29soupat wants to merge 8 commits into
soupat wants to merge 8 commits into
Conversation
Add two selector-driven invocation tools that replace the legacy
invoke_device(device_id, function, params) shape:
- invoke(selector, params, llm_reasoning) resolves a function-scoped
selector to exactly one (device, function) tuple and calls it. Returns
{success, device_id, function, result|error}. Returns no_match,
ambiguous_match, invalid_invoke_scope, or invalid_selector errors as
structured envelopes when the selector does not resolve cleanly.
- invoke_many(selector, params, timeout, max_concurrency, llm_reasoning)
resolves to N (device, function) tuples and fans out the calls in
parallel via a thread pool. Partial-failure semantics: a single
target's failure does not abort siblings. Returns {candidates, matched,
succeeded, failed, results, errors} with per-target structured errors.
Per-target timeout defaults to 30s.
invoke_device gains a DeprecationWarning pointing to invoke(); the
function still works for one release while callers migrate. Adapters
(Claude Agent SDK, Strands, LangChain, the in-tree
StrandsOpenAIDeviceConnectAgent, and the operator-facing AGENT_SCRIPT
template) drop invoke_device and expose invoke / invoke_many instead.
invoke_device_with_fallback stays unchanged -- it covers a different
ergonomic case (try a list of device ids in order) with no selector
equivalent.
22 unit tests cover scope rejection, ambiguous and zero matches,
JSON-RPC error mapping, partial failure, per-target timeout
propagation, and llm_reasoning stripping. 9 integration tests cover
single-target invoke, robot dispatch through to event emission,
fan-out across multiple cameras, partial failure, and zero-candidate
empty envelopes.
Add device_connect_edge.predicate, a thin wrapper around cel-python that
compiles where expressions into reusable WherePredicate objects and
evaluates them against device-local context (identity, labels, status,
shared bindings).
CEL was chosen over JSONLogic because the v4 design's mask-indexing
pattern (mask[seat_row][seat_col] == 1) needs computed array indices,
which JSONLogic's literal-path var operator cannot express without
flattening the mask to 1D and indexing arithmetically. CEL handles it
natively.
cel-python is an optional dependency. Importing the module without it
installed succeeds; compiling or evaluating a predicate raises a clear
PredicateCompileError pointing at the [predicate] extra:
pip install device-connect-edge[predicate]
pip install device-connect-agent-tools[predicate]
The evaluator is shared by the dispatcher (validates expressions before
sending them out) and the device runtime (evaluates per-call to decide
whether to execute a fan-out). 16 unit tests cover compilation,
evaluation, the mask-indexing regression case, missing-variable and
type-mismatch error surfaces, and evaluator reusability.
Add the async selector-driven fan-out path so callers do not have to
block on the slowest device:
- broadcast(selector, params, where=, bindings=, fire_at=, on_late=)
publishes a single envelope to a fanout subject keyed by tenant.
Returns immediately with a correlation_id and the candidate count.
Compile-validates the optional CEL where predicate at the dispatcher
so syntax errors short-circuit before reaching the wire.
- DeviceRuntime._broadcast_subscription receives envelopes on
``device-connect.<tenant>.broadcast``. Each candidate self-elects via
the target_device_ids gate (pre-resolved by the dispatcher from the
selector), then evaluates the optional where predicate against its
own context (identity, labels, status, shared bindings). On match the
device executes the function and emits a reply on
``device-connect.<tenant>.<device_id>.event.async_reply.<correlation_id>``
carrying {success, result|error, actually_fired_at}.
- fire_at + on_late synchronized fan-out: the edge holds the message
until the wall-clock deadline and fires from its own clock.
on_late=skip drops late arrivals (preserves coherence for
card-stunt / light-show style workloads); on_late=fire executes
immediately. The achieved spread depends on NTP residual (~5-10 ms
typical) rather than network jitter (~50-150 ms).
- subscribe(selector) returns a Subscription handle. Two selector
forms: ``correlation:<id>`` for broadcast replies, and event-scoped
selectors (``event(<name>)`` or ``device(...).event(<name>)``) for
live event streams. The handle exposes sync read() and a yielding
iter() with idle-timeout reset.
- await_replies(correlation_id, timeout, until) sync helper for the
common broadcast-then-collect pattern; subscribes, drains, returns
the list of reply payloads.
The edge predicate context mirrors DeviceStatus.location into
labels["location"] when the driver did not declare a labels.location
itself, matching the dispatcher-side flatten_device contract so the
same selector and predicate strings work on both sides.
Test coverage: 38 unit tests across broadcast (12), subscribe (12),
and existing modules; 5 NATS integration tests cover end-to-end
broadcast + reply, where filter at the edge, fire_at synchronization
spread, on_late=skip late-arrival drop, and subscribe(correlation:<id>)
streaming.
Add the operator-facing shell surface for selector-driven discovery and
operations:
devctl verbs (read-side):
- devctl discover "<selector>" [--offset N] [--limit M]
- devctl discover-labels [--key K] [--offset N] [--limit M]
statectl verbs (write-side):
- statectl invoke "<selector>" [--param k=v ...]
- statectl invoke-many "<selector>" [--param k=v ...] [--timeout T]
[--max-concurrency N]
- statectl broadcast "<selector>" [--param k=v ...] [--where E]
[--bindings JSON] [--fire-at T]
[--on-late skip|fire]
- statectl subscribe "<selector>" [--timeout T] [--until N]
- statectl await <correlation_id> [--timeout T] [--until N]
Each verb is a thin wrapper over the Python tool of the same name and
exits non-zero on tool-side errors so they compose into shell pipelines
naturally. Parameter values are decoded as JSON when they look like
JSON (numbers, booleans, arrays, objects, quoted strings) and pass
through as strings otherwise, so common shapes (--param resolution=4k,
--param zones='[1,2,3]') work without quoting heroics.
The historical ``devctl discover`` verb (mDNS scan for uncommissioned
devices) is renamed to ``mdns-scan`` with ``scan`` as an alias, so
``discover`` is free for the selector-driven sense. Existing scripts
should switch from ``devctl discover`` to ``devctl scan`` if they were
exercising the mDNS path.
22 parser-shape unit tests guard against argument drift; the underlying
tools already have full unit and integration coverage from earlier
phases.
Add the operations layer (invoke / invoke_many / broadcast / subscribe / await_replies) to docs/discovery.md, with the edge-side ``where`` predicate, synchronized fan-out via ``fire_at`` / ``on_late``, worked examples that exercise each tool, and the corresponding devctl / statectl CLI verbs. The guide now covers everything the discovery API ships: labels schema, selector grammar, the five scope shapes, response envelope, error codes, all seven tools, and the CLI surface.
Applies findings from the pre-merge review of the operations stack:
Edge runtime (device.py):
- Hand the broadcast envelope off to a tracked task so the subscription
callback returns immediately. A long fire_at hold or slow driver
function no longer blocks subsequent broadcasts from being received.
- Extract _handle_broadcast_envelope and _evaluate_where so the where
self-election step is isolated, unit-testable, and the callback body
stays flat.
- Splice device_id into the predicate's identity context so the natural
``identity.device_id == "..."`` form works (DeviceIdentity itself
does not carry device_id; that lives on the runtime).
Wire format (tools.py + device.py):
- Rename the broadcast envelope's ``target_device_ids`` field to
``targets`` before any edge ships. Shorter, less prescriptive, and
matches the dispatcher-side ``candidates`` naming.
Subscription handle (tools.py):
- Fix a race in Subscription.read(): truncate by the snapshot length
captured BEFORE iteration, not by clearing post-iteration. A message
appended by the messaging callback during draining now survives to
the next read instead of being silently dropped.
- Add __iter__ so ``for msg in sub:`` works with a sensible 30s idle
timeout, matching the standard Python iteration protocol.
CLI (statectl/operations_cli.py):
- statectl subscribe now catches KeyboardInterrupt cleanly (exit 130),
distinguishes "got messages" (exit 0) from "idle timeout with no
messages" (exit 4), so shell pipelines can branch on either outcome.
- statectl invoke-many exits 3 when any target failed (alongside the
existing 1 for top-level errors), so partial failure is visible to
callers without parsing JSON.
ASCII compliance (predicate.py, tools.py):
- Drop a banned-vocabulary token from a docstring.
- Replace an em-dash in invoke_device's docstring with ASCII text.
New tests:
- Unit: __iter__ protocol + race-safety guard for Subscription.read.
- Integration: broadcast where=identity.device_id in bindings.allow
(exercises the new identity context + bindings path),
await_replies(until=) early-return timing, ``for msg in sub:``
iteration end-to-end, and subscribe(event(...)) live-event capture.
…ters Phases 4-5 added broadcast() and await_replies() to the agent-tools surface but the adapter migration in feat(invoke) only carried invoke / invoke_many across. The flashlight-auditorium demo needs the LLM to issue selector-driven broadcasts with where + bindings + fire_at, so broadcast and await_replies both need to be Strands/LangChain/Claude tools as well. Tool descriptions for the Claude adapter spell out the broadcast + await_replies pairing (caller fires broadcast, then awaits replies by correlation_id) so agents discover the workflow from the tool docs. subscribe() is intentionally NOT exposed via the adapters: it returns a Subscription handle that does not serialise cleanly as a tool result and is more natural to call from operator code or the CLI than from an LLM. Agents needing the same shape use broadcast + await_replies.
The broadcast handler built the where-predicate context from
``caps.identity`` -- but DeviceCapabilities does not carry an
``identity`` field; that lives on the driver as a separate
DeviceIdentity model. The ``getattr(caps, "identity", None)`` fallback
masked the bug: identity_dict was always just ``{"device_id": ...}``
with none of the driver's extra fields (seat_row, seat_col, x-mhp slot
metadata, ...) reaching the predicate.
Symptom: a where predicate like
``bindings.mask[identity.seat_row][identity.seat_col] == 1`` failed at
every candidate (CEL surfaces undefined field access as CELEvalError,
fail-closed fires, nobody self-elects).
Fix: read identity from ``self._driver.identity`` and splice in
``device_id`` from the runtime. Backwards-compatible with drivers that
don't expose an identity property (driver_identity is None -> only
device_id is present, same as before for those drivers).
Surfaced while building the flashlight-auditorium demo, where each
phone exposes its seat coordinates as extra fields on DeviceIdentity
and the spell-CMU broadcast indexes a 2D mask by those coordinates.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Stacked on top of PR #28. Completes the discovery design's operations
layer (phases 4-6 of the spec) and the optional CEL
wherepredicate.With this PR merged, the discovery API ships end-to-end: labels,
selectors, discovery, invocation, async fan-out, and operator CLI.
This PR targets
updated-hierarchical-discoveryso the diff shows onlythe operations work. GitHub auto-retargets to
mainonce #28 merges.What's new
invoke(selector, params, llm_reasoning)— selector-driven synccall. Selector must resolve to one (device, function) tuple; zero or
multiple matches return structured errors (
no_match,ambiguous_match).invoke_devicebecomes advisory-deprecated.invoke_many(selector, params, timeout, max_concurrency)— syncparallel fan-out. Partial-failure semantics: per-target results and
errors are returned even if some fail. Per-target 30s timeout default.
broadcast(selector, params, where=, bindings=, fire_at=, on_late=)— async fan-out. Returns
correlation_idimmediately; replies streamon
device-connect.<zone>.<device_id>.event.async_reply.<correlation_id>.wherepredicate evaluated at each candidate device against{identity, labels, status, bindings}. Self-deselection is silent;compile-validated at the dispatcher before publication. Optional
dependency:
pip install device-connect-agent-tools[predicate].fire_at(wall-clock epoch seconds) +on_late(skip|fire). Each device holds the message and firesfrom its own clock at the deadline. NTP-typical spread is 5-10 ms.
subscribe(selector)returns aSubscriptionhandle for liveevents. Two selector forms:
"correlation:<id>"for broadcastreplies, or event-scoped selectors (
event(<name>)/device(...).event(<name>)). Supportsread(),iter(timeout),context-manager
with, and the standardfor msg in sub:protocol.await_replies(correlation_id, timeout, until)— sync helper forthe common broadcast-then-collect pattern.
devctl discover/discover-labels— selector-driven discoveryon the CLI. Historical
devctl discover(mDNS scan) renamed todevctl mdns-scanwithscanalias.statectl invoke/invoke-many/broadcast/subscribe/await— operator-facing wrappers for the new tools. JSON-shaped--param k=v, structured exit codes for pipelines.docs/discovery.mdextended with operations, edge-sidewhere,synchronized fan-out, worked examples, and CLI reference.
Commits
The final commit is the result of running the same three reviews used
on PR #28 (
code-review-and-quality,code-simplification,api-and-interface-design) on this branch and addressing 11 findings:race fix in
Subscription.read, broadcast handler no longer blocks thesubscription callback on
fire_at, wire-format rename fromtarget_device_idstotargets,__iter__protocol on Subscription,CLI SIGINT handling, structured exit codes,
identity.device_idinpredicate context, ASCII compliance.
Backwards compatibility
invoke_device(device_id, function, ...)still works, now emits aDeprecationWarningpointing atinvoke('device(<id>).function(<name>)', params).in-tree
StrandsOpenAIDeviceConnectAgent) migrated toinvoke/invoke_many;invoke_deviceis removed from their tool surfaces.invoke_device_with_fallbackis kept unchanged — no selectorequivalent exists for "try a list of devices in order".
devctl discovermDNS-scan verb is renamed todevctl mdns-scan(withscanalias). Scripts that calleddevctl discoverfor the mDNS path should switch todevctl scan.Test plan
tests/tests/test_tools_invoke.py(9 tests) andtests/tests/test_tools_broadcast.py(9 tests) exercise:function-only selector
correlation_idfire_atsynchronization (spread under 0.5s with on_late=skip),late-arrival drop with on_late=skip
await_replies(until=K)early returnsubscribe("correlation:<id>")streamingsubscribe(event(<name>))live event capturefor msg in sub:iter protocol end-to-end