Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@

All notable changes to cueapi-cli will be documented here.

## [0.2.0] - 2026-05-01

### Added
- `cueapi fire <cue-id>` for ad-hoc one-shot triggers and for using cues as a messaging channel between agents. Optional `--payload-override` (JSON) and `--merge-strategy` (`merge` default, `replace` opt-in). Wraps `POST /v1/cues/{id}/fire`.
- `cueapi executions` subgroup with seven subcommands closing the receive-claim-process-complete loop for worker-transport executions:
- `executions list` historical executions across all cues.
- `executions list-claimable [--task] [--agent]` unclaimed executions ready for processing, server-side filtered. Required for single-purpose workers; without a filter, sibling tasks ahead in the LIMIT 50 window starve your handler.
- `executions get <id>` fetch one execution by ID.
- `executions claim <id> --worker-id ID` atomically claim a specific execution.
- `executions claim-next --worker-id ID [--task]` claim the next available execution. With `--task`, the CLI internally fans out (filtered list, pick oldest, claim by ID) since the server's claim endpoint does not accept a task filter today.
- `executions heartbeat <id> --worker-id ID` extend the claim lease. Sends `worker_id` via the `X-Worker-Id` request header (the server's actual transport for that field).
- `executions report-outcome <id> --success/--failure [...]` report a write-once outcome with optional `--external-id`, `--result-url`, `--summary`.

### Changed
- `__version__` in `cueapi/__init__.py` had drifted to 0.1.3 while `pyproject.toml` was at 0.1.5. Both now aligned at 0.2.0.

## [0.1.0] - 2025-03-28

### Added
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ Cron fires a job. That is it. No retries. No delivery confirmation. No outcome t
| `cueapi pause <id>` | Pause a cue |
| `cueapi resume <id>` | Resume a cue |
| `cueapi delete <id>` | Delete a cue |
| `cueapi fire <id>` | Fire an existing cue immediately, optional `--payload-override` |
| `cueapi executions list` | List historical executions across all cues |
| `cueapi executions list-claimable` | List unclaimed worker executions, filter by `--task` / `--agent` |
| `cueapi executions get <id>` | Fetch one execution by ID |
| `cueapi executions claim <id> --worker-id ID` | Atomically claim an execution |
| `cueapi executions claim-next --worker-id ID [--task]` | Claim the next available execution |
| `cueapi executions heartbeat <id> --worker-id ID` | Extend the claim lease |
| `cueapi executions report-outcome <id> --success/--failure` | Report a write-once outcome |
| `cueapi usage` | Show current usage and limits |
| `cueapi key regenerate` | Regenerate API key |
| `cueapi upgrade` | Open billing |
Expand Down
2 changes: 1 addition & 1 deletion cueapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""CueAPI CLI — Your Agents' Cue to Act."""

__version__ = "0.1.3"
__version__ = "0.2.0"
329 changes: 329 additions & 0 deletions cueapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,45 @@ def delete(ctx: click.Context, cue_id: str, yes: bool) -> None:
click.echo(str(e))


# --- Fire (ad-hoc trigger / messaging via cues) ---


@main.command()
@click.argument("cue_id")
@click.option("--payload-override", "payload_override", default=None, help="JSON payload override for this fire only")
@click.option("--merge-strategy", "merge_strategy", type=click.Choice(["merge", "replace"]), default=None, help="How payload-override combines with the cue's stored payload (default: merge, server-side)")
@click.pass_context
def fire(ctx: click.Context, cue_id: str, payload_override: Optional[str], merge_strategy: Optional[str]) -> None:
"""Fire an existing cue immediately, optionally overriding its payload."""
body: dict = {}
if payload_override:
try:
body["payload_override"] = json.loads(payload_override)
except json.JSONDecodeError:
raise click.UsageError("--payload-override must be valid JSON")
if merge_strategy:
body["merge_strategy"] = merge_strategy

try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.post(f"/cues/{cue_id}/fire", json=body)
if resp.status_code in (200, 201, 202):
data = resp.json()
exec_id = data.get("id") or data.get("execution_id", "?")
scheduled = data.get("scheduled_for", "")
echo_success(f"Fired: {cue_id}")
echo_info("Execution:", exec_id)
if scheduled:
echo_info("Scheduled:", scheduled)
elif resp.status_code == 404:
echo_error(f"Cue not found: {cue_id}")
else:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
except click.ClickException as e:
click.echo(str(e))


# --- Billing commands ---


Expand Down Expand Up @@ -508,6 +547,296 @@ def usage(ctx: click.Context) -> None:
click.echo(str(e))


# --- Executions ---


@main.group()
def executions() -> None:
"""Manage executions (worker-transport claim / heartbeat / outcome lifecycle)."""
pass


@executions.command(name="list")
@click.option("--cue-id", "cue_id", default=None, help="Filter to a specific cue")
@click.option("--status", default=None, help="Filter by execution status")
@click.option("--limit", default=20, type=int, help="Max results")
@click.option("--offset", default=0, type=int, help="Offset for pagination")
@click.pass_context
def executions_list(ctx: click.Context, cue_id: Optional[str], status: Optional[str], limit: int, offset: int) -> None:
"""List historical executions across all cues."""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
params: dict = {"limit": limit, "offset": offset}
if cue_id:
params["cue_id"] = cue_id
if status:
params["status"] = status
resp = client.get("/executions", params=params)
if resp.status_code != 200:
echo_error(f"Failed (HTTP {resp.status_code})")
return
data = resp.json()
execs = data.get("executions", [])
if not execs:
click.echo("\nNo executions found.\n")
return
click.echo()
rows = []
for ex in execs:
ts = (ex.get("scheduled_for") or "")[:16].replace("T", " ")
rows.append([ex.get("id", "?"), ex.get("cue_id", "?"), format_status(ex.get("status", "?")), ts])
echo_table(["ID", "CUE", "STATUS", "SCHEDULED"], rows, widths=[26, 22, 14, 22])
click.echo()
except click.ClickException as e:
click.echo(str(e))


@executions.command(name="list-claimable")
@click.option("--task", default=None, help="Filter by payload.task (server-side SQL filter)")
@click.option("--agent", default=None, help="Filter by payload.agent (server-side SQL filter)")
@click.pass_context
def executions_list_claimable(ctx: click.Context, task: Optional[str], agent: Optional[str]) -> None:
"""List unclaimed worker-transport executions ready for processing.

Filters server-side via task / agent query params. Required for single-purpose
workers; without --task, sibling tasks ahead in the LIMIT 50 window starve
your handler.
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
params: dict = {}
if task:
params["task"] = task
if agent:
params["agent"] = agent
resp = client.get("/executions/claimable", params=params)
if resp.status_code != 200:
echo_error(f"Failed (HTTP {resp.status_code})")
return
data = resp.json()
execs = data.get("executions", [])
if not execs:
filt = []
if task:
filt.append(f"task={task}")
if agent:
filt.append(f"agent={agent}")
qual = f" ({', '.join(filt)})" if filt else ""
click.echo(f"\nNo claimable executions{qual}.\n")
return
click.echo()
rows = []
for ex in execs:
ts = (ex.get("scheduled_for") or "")[:16].replace("T", " ")
rows.append([
ex.get("execution_id", "?"),
ex.get("cue_name", "?"),
ex.get("task") or "",
ts,
str(ex.get("attempt", 1)),
])
echo_table(["ID", "CUE", "TASK", "SCHEDULED", "ATTEMPT"], rows, widths=[26, 22, 22, 18, 8])
click.echo()
except click.ClickException as e:
click.echo(str(e))


@executions.command(name="get")
@click.argument("execution_id")
@click.pass_context
def executions_get(ctx: click.Context, execution_id: str) -> None:
"""Fetch a single execution by ID."""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.get(f"/executions/{execution_id}")
if resp.status_code == 404:
echo_error(f"Execution not found: {execution_id}")
return
if resp.status_code != 200:
echo_error(f"Failed (HTTP {resp.status_code})")
return
ex = resp.json()
click.echo()
echo_info("ID:", ex.get("id", execution_id))
echo_info("Cue:", ex.get("cue_id", "?"))
echo_info("Status:", format_status(ex.get("status", "?")))
if ex.get("scheduled_for"):
echo_info("Scheduled:", ex["scheduled_for"])
if ex.get("started_at"):
echo_info("Started:", ex["started_at"])
if ex.get("claimed_by_worker"):
echo_info("Claimed by:", ex["claimed_by_worker"])
if ex.get("attempts") is not None:
echo_info("Attempts:", str(ex["attempts"]))
if ex.get("http_status") is not None:
echo_info("HTTP status:", str(ex["http_status"]))
if ex.get("error_message"):
echo_info("Error:", ex["error_message"])
click.echo()
except click.ClickException as e:
click.echo(str(e))


@executions.command(name="claim")
@click.argument("execution_id")
@click.option("--worker-id", "worker_id", required=True, help="Stable identifier for this worker")
@click.pass_context
def executions_claim(ctx: click.Context, execution_id: str, worker_id: str) -> None:
"""Atomically claim a specific worker-transport execution.

Returns 409 if already claimed or not eligible.
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.post(f"/executions/{execution_id}/claim", json={"worker_id": worker_id})
if resp.status_code == 200:
data = resp.json()
echo_success(f"Claimed: {execution_id}")
if data.get("lease_seconds") is not None:
echo_info("Lease:", f"{data['lease_seconds']}s")
elif resp.status_code == 409:
echo_error("Not claimable (already claimed, wrong status, or wrong owner)")
elif resp.status_code == 404:
echo_error(f"Execution not found: {execution_id}")
else:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
except click.ClickException as e:
click.echo(str(e))


@executions.command(name="claim-next")
@click.option("--worker-id", "worker_id", required=True, help="Stable identifier for this worker")
@click.option("--task", default=None, help="Filter to a specific task. Without it, the server picks the oldest pending across any of your worker cues.")
@click.pass_context
def executions_claim_next(ctx: click.Context, worker_id: str, task: Optional[str]) -> None:
"""Claim the next available worker-transport execution.

With --task, fans out (list-claimable filtered, pick oldest, claim by ID).
The server's claim endpoint does not accept a task filter today.
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
if task:
# Fan-out: filtered list + pick oldest + claim by ID. Tiny race
# window between list and claim is bounded by the atomic claim
# returning 409, in which case the caller retries.
lr = client.get("/executions/claimable", params={"task": task})
if lr.status_code != 200:
echo_error(f"Failed to list claimable (HTTP {lr.status_code})")
return
execs = lr.json().get("executions", [])
if not execs:
click.echo(f"\nNo claimable executions for task={task}.\n")
return
next_id = execs[0].get("execution_id")
resp = client.post(f"/executions/{next_id}/claim", json={"worker_id": worker_id})
if resp.status_code == 200:
data = resp.json()
echo_success(f"Claimed: {next_id}")
if data.get("lease_seconds") is not None:
echo_info("Lease:", f"{data['lease_seconds']}s")
elif resp.status_code == 409:
echo_error(f"Lost the race on {next_id} (another worker beat us). Retry.")
else:
echo_error(f"Failed (HTTP {resp.status_code})")
return

resp = client.post("/executions/claim", json={"worker_id": worker_id})
if resp.status_code == 200:
data = resp.json()
echo_success(f"Claimed: {data.get('execution_id', '?')}")
if data.get("lease_seconds") is not None:
echo_info("Lease:", f"{data['lease_seconds']}s")
elif resp.status_code == 409:
click.echo("\nNo executions available for claiming.\n")
else:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
except click.ClickException as e:
click.echo(str(e))


@executions.command(name="heartbeat")
@click.argument("execution_id")
@click.option("--worker-id", "worker_id", required=True, help="Same worker-id used at claim time. Sent as X-Worker-Id header.")
@click.pass_context
def executions_heartbeat(ctx: click.Context, execution_id: str, worker_id: str) -> None:
"""Extend the claim lease on an in-flight execution.

Returns 403 if worker-id does not match the worker that claimed.
Returns 409 if the execution is no longer in 'delivering' state.
"""
try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
# X-Worker-Id is a request header on the heartbeat endpoint, not body.
resp = client.post(
f"/executions/{execution_id}/heartbeat",
headers={"X-Worker-Id": worker_id},
)
if resp.status_code == 200:
data = resp.json()
echo_success(f"Heartbeat acknowledged: {execution_id}")
if data.get("lease_extended_until"):
echo_info("Lease until:", data["lease_extended_until"])
elif resp.status_code == 403:
echo_error("Worker-id does not match the worker that claimed this execution")
elif resp.status_code == 409:
echo_error("Execution is no longer in 'delivering' state")
elif resp.status_code == 404:
echo_error(f"Execution not found: {execution_id}")
else:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
except click.ClickException as e:
click.echo(str(e))


@executions.command(name="report-outcome")
@click.argument("execution_id")
@click.option("--success/--failure", "success", required=True, help="Outcome: success or failure")
@click.option("--external-id", "external_id", default=None, help="ID from the downstream system")
@click.option("--result-url", "result_url", default=None, help="Public URL proving the work happened")
@click.option("--summary", default=None, help="Short human summary (max 500 chars)")
@click.pass_context
def executions_report_outcome(
ctx: click.Context,
execution_id: str,
success: bool,
external_id: Optional[str],
result_url: Optional[str],
summary: Optional[str],
) -> None:
"""Report the outcome of an execution. Write-once; the outcome is immutable."""
body: dict = {"success": success}
if external_id:
body["external_id"] = external_id
if result_url:
body["result_url"] = result_url
if summary:
body["summary"] = summary

try:
with CueAPIClient(api_key=ctx.obj.get("api_key"), profile=ctx.obj.get("profile")) as client:
resp = client.post(f"/executions/{execution_id}/outcome", json=body)
if resp.status_code in (200, 201):
echo_success(f"Outcome recorded: {execution_id}")
if not success:
echo_info("Marked:", "failure")
elif resp.status_code == 404:
echo_error(f"Execution not found: {execution_id}")
elif resp.status_code == 409:
echo_error("Outcome already recorded (write-once)")
else:
error = resp.json().get("detail", {}).get("error", {})
echo_error(error.get("message", f"Failed (HTTP {resp.status_code})"))
except click.ClickException as e:
click.echo(str(e))


main.add_command(executions)


# --- Key management ---


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cueapi"
version = "0.1.5"
version = "0.2.0"
description = "Official CLI for CueAPI — open-source execution accountability primitive for AI agents. Schedule agent work, require evidence-backed outcomes, and gate execution with write-once verification from your terminal."
readme = "README.md"
license = { text = "MIT" }
Expand Down
Loading
Loading