Skip to content
Open
184 changes: 182 additions & 2 deletions docs/discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ function(estop) fleet emergency-st

## Tools

### `discover(selector, offset=0, limit=200)`
### Discovery

#### `discover(selector, offset=0, limit=200)`

Resolves a selector to matched entities. Returns devices, function tuples,
or event tuples depending on the selector scope. The response includes a
Expand All @@ -108,7 +110,7 @@ and switches to a name-and-labels summary above
`DEVICE_CONNECT_FUNCTION_THRESHOLD` (default 20). The threshold is
configurable via environment variable.

### `discover_labels(key=None, offset=0, limit=50)`
#### `discover_labels(key=None, offset=0, limit=50)`

Returns the fleet label vocabulary. Use this first when you do not know
which dimensions are available.
Expand All @@ -118,6 +120,84 @@ which dimensions are available.
- With a `key` like `"device.location"` or `"function.direction"`:
paginates the full value list for that one key.

### Operations

Calling a function on devices is one logical operation; the only choice
is whether the caller waits for replies and how they arrive.

| Tool | Selector resolves to | Reply mode |
| --- | --- | --- |
| `invoke(selector, params)` | exactly one (device, function) tuple | sync, single result |
| `invoke_many(selector, params, timeout=)` | any number of (device, function) tuples | sync, aggregated |
| `broadcast(selector, params, where=, bindings=, fire_at=, on_late=)` | any number of (device, function) tuples | async; correlation-tagged replies stream as events |
| `subscribe(selector)` | events, or `"correlation:<id>"` for broadcast replies | live stream (`Subscription` handle) |
| `await_replies(correlation_id, timeout=, until=)` | replies for one broadcast | sync helper that subscribes, collects, returns |

`invoke_many` runs every target's call in parallel and returns when each
target has finished or hit its per-target timeout (30 s default). Partial
failures do not abort siblings; the response carries both `results` and
`errors` lists.

`broadcast` does the same fan-out asynchronously: the caller gets a
`correlation_id` immediately and replies stream back on a per-device
subject keyed by that id. Subscribe with `subscribe("correlation:<id>")`
or block with `await_replies(correlation_id, timeout=...)`.

### Edge-side `where` predicate

`broadcast` accepts an optional `where` expression that runs at each
candidate device. The predicate is a CEL (Common Expression Language)
string and sees four variables:

- `identity` — device-local identity dict (`device_id`, `device_type`, ...)
- `labels` — device labels (the same labels selectors filter on)
- `status` — device status (heartbeat-updated: `location`, `availability`,
`battery`, `online`, ...)
- `bindings` — the shared payload passed to `broadcast` (selection masks,
thresholds, lookup tables)

```python
broadcast(
"device(category:camera).function(capture_image)",
params={"resolution": "4k"},
where="status.battery > 50 && labels.location == 'lab-A'",
)
```

The `where` predicate is sandboxed by CEL (no I/O, no filesystem). The
predicate evaluator is an optional install:

```
pip install device-connect-agent-tools[predicate]
```

Without the extra, calling `broadcast(..., where=...)` returns an
`invalid_predicate` error immediately at the dispatcher; calls without a
`where` work unchanged.

### Synchronized fan-out (`fire_at` + `on_late`)

`broadcast` accepts an optional `fire_at` (wall-clock epoch seconds).
Each device holds the message and fires from its own clock at the
deadline. `on_late` controls behaviour when a device receives the
message past the deadline:

- `"skip"` (default) — drop the call to preserve coherence.
- `"fire"` — execute immediately.

```python
broadcast(
"device(category:phone).function(set_flashlight)",
params={"on": True, "color": "white"},
fire_at=time.time() + 0.500, # 500 ms in the future
on_late="skip",
)
```

With NTP-synced devices the achieved spread is typically 5-10 ms
(clock-sync residual) rather than the 50-150 ms a naive fire-on-receipt
broadcast would produce.

## Response envelope

`discover` returns a stable envelope:
Expand Down Expand Up @@ -229,3 +309,103 @@ while True:
break
offset = page["next_offset"]
```

### Invoke a single function

```python
from device_connect_agent_tools import invoke

result = invoke(
"device(robot-001).function(grip_close)",
{"force_n": 10},
)
# {"success": True, "device_id": "robot-001", "function": "grip_close",
# "result": {...}}
```

### Fan out across every camera in lab-A

```python
from device_connect_agent_tools import invoke_many

result = invoke_many(
"device(category:camera, location:lab-A).function(capture_image)",
{"resolution": "4k"},
)
# {"candidates": 12, "matched": 12, "succeeded": 12, "failed": 0,
# "results": [...], "errors": []}
```

### Async fleet emergency stop

```python
from device_connect_agent_tools import broadcast, await_replies

result = broadcast("function(estop)")
# {"correlation_id": "br-7f3a91", "candidates": 240, ...}

replies = await_replies(result["correlation_id"], timeout=5.0)
# list of {device_id, success, result|error, actually_fired_at}
```

### Synchronized actuation across a phone fleet

```python
import time
from device_connect_agent_tools import broadcast

mask = build_mask_from_scores(threshold=0.8) # caller-side selection
broadcast(
"device(category:phone, location:auditorium-A).function(set_flashlight)",
params={"on": True, "color": "white"},
where="mask[seat_row][seat_col] == 1 && status.battery > 30",
bindings={"mask": mask},
fire_at=time.time() + 0.5,
on_late="skip",
)
```

### Subscribe to motion events in lab-A

```python
from device_connect_agent_tools import subscribe

with subscribe("device(location:lab-A/*).event(modality:motion)") as sub:
for event in sub.iter(timeout=60.0):
handle(event)
```

## CLI

The same selector syntax drives the operator CLIs. Every CLI command
maps to the matching Python tool call.

```
# Discovery (devctl)
devctl discover "<selector>" [--offset N] [--limit M]
devctl discover-labels [--key K] [--offset N] [--limit M]

# Operations (statectl)
statectl invoke "<selector>" [--param k=v ...]
statectl invoke-many "<selector>" [--param k=v ...] [--timeout T]
statectl broadcast "<selector>" [--param k=v ...] [--where E]
[--bindings JSON] [--fire-at T]
[--on-late skip|fire]
statectl subscribe "<selector>" [--timeout T] [--until N]
statectl await <correlation_id> [--timeout T] [--until N]
```

`--param k=v` accepts JSON-shaped values (numbers, booleans, arrays,
objects); everything else passes through as a string. So
`--param resolution=4k` and `--param zones='[1,2,3]'` both work
without quoting heroics.

Each verb exits non-zero on tool-side errors so the verbs compose into
shell pipelines:

```
statectl broadcast "device(category:camera).function(capture_image)" \
--param resolution=4k \
| jq -r .correlation_id \
| xargs statectl await --timeout 5
```
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@

"""Device Connect Tools — framework-agnostic SDK for Device Connect IoT.

Selector-driven discovery keeps LLM context small:
Selector-driven discovery and invocation keep LLM context small:

from device_connect_agent_tools import connect, discover, discover_labels
from device_connect_agent_tools import connect, discover, discover_labels, invoke

connect()
vocab = discover_labels() # fleet vocabulary
cams = discover("device(category:camera, location:zone-A/*)") # device roster
writes = discover("device(*).function(direction:write)") # function tuples
result = invoke_device("camera-001", "capture_image", {"resolution": "1080p"})
result = invoke("device(camera-001).function(capture_image)",
{"resolution": "1080p"})

The older ``describe_fleet`` / ``list_devices`` / ``get_device_functions``
trio remains available for one release as advisory-deprecated wrappers --
prefer ``discover`` / ``discover_labels`` for new code.
The older ``describe_fleet`` / ``list_devices`` / ``get_device_functions`` /
``invoke_device`` family remains available for one release as
advisory-deprecated wrappers -- prefer ``discover`` / ``discover_labels`` /
``invoke`` / ``invoke_many`` for new code.
"""

from device_connect_agent_tools.agent import DeviceConnectAgent
Expand All @@ -25,14 +27,22 @@
# Selector-driven discovery (preferred)
discover,
discover_labels,
# Invocation
invoke_device,
# Selector-driven invocation (preferred)
invoke,
invoke_many,
broadcast,
# Selector-driven subscription
Subscription,
subscribe,
await_replies,
# Other invocation helpers
invoke_device_with_fallback,
get_device_status,
# Advisory-deprecated discovery wrappers (one-release transition)
# Advisory-deprecated wrappers (one-release transition)
describe_fleet,
list_devices,
get_device_functions,
invoke_device,
discover_devices,
)

Expand All @@ -46,13 +56,21 @@
# Selector-driven discovery (preferred)
"discover",
"discover_labels",
# Invocation
"invoke_device",
# Selector-driven invocation (preferred)
"invoke",
"invoke_many",
"broadcast",
# Selector-driven subscription
"Subscription",
"subscribe",
"await_replies",
# Other invocation helpers
"invoke_device_with_fallback",
"get_device_status",
# Advisory-deprecated -- use discover() / discover_labels() instead
# Advisory-deprecated -- use discover / discover_labels / invoke instead
"describe_fleet",
"list_devices",
"get_device_functions",
"invoke_device",
"discover_devices",
]
Loading