From c09a408c0251c065c596561dc3f5ff25d2f1c35b Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Fri, 8 May 2026 02:51:37 -0400 Subject: [PATCH] Expose launcher tag primitives --- CHANGELOG.md | 7 + README.md | 125 ++---- crates/relayburn-cli/Cargo.toml | 27 +- crates/relayburn-cli/src/cli.rs | 38 -- crates/relayburn-cli/src/commands/ingest.rs | 2 + crates/relayburn-cli/src/commands/mod.rs | 2 - crates/relayburn-cli/src/commands/run.rs | 358 ------------------ crates/relayburn-cli/src/commands/summary.rs | 303 +++++++++++++-- crates/relayburn-cli/src/harnesses/mod.rs | 50 +-- .../relayburn-cli/src/harnesses/registry.rs | 4 +- crates/relayburn-cli/src/lib.rs | 9 +- crates/relayburn-cli/src/main.rs | 1 - crates/relayburn-cli/src/util/time.rs | 4 +- crates/relayburn-cli/tests/smoke.rs | 56 +-- crates/relayburn-sdk-node/src/lib.rs | 279 ++++++++++++++ crates/relayburn-sdk/src/ingest/gap.rs | 7 +- crates/relayburn-sdk/src/ingest/ingest.rs | 66 +++- .../src/ingest/orchestration_tests.rs | 64 +++- .../src/ingest/pending_stamps.rs | 14 +- .../src/ingest/pending_stamps_compat_tests.rs | 21 + .../src/ingest/watch_loop_tests.rs | 1 + crates/relayburn-sdk/src/query_verbs.rs | 124 +++++- crates/relayburn-sdk/tests/integration.rs | 28 +- packages/relayburn/CHANGELOG.md | 3 + packages/sdk-node/CHANGELOG.md | 4 + packages/sdk-node/README.md | 3 + packages/sdk-node/src/binding.d.ts | 1 + packages/sdk-node/src/index.cjs | 1 + packages/sdk-node/src/index.d.ts | 40 ++ packages/sdk-node/src/index.js | 4 + packages/sdk-node/test/conformance.test.js | 43 ++- packages/sdk-node/test/esbuild-smoke.test.js | 2 + tests/fixtures/cli-golden/README.md | 4 +- tests/fixtures/cli-golden/invocations.json | 6 - .../cli-golden/snapshots/run-help.stdout.txt | 11 - .../snapshots/top-level-help.stdout.txt | 10 +- 36 files changed, 1032 insertions(+), 690 deletions(-) delete mode 100644 crates/relayburn-cli/src/commands/run.rs delete mode 100644 tests/fixtures/cli-golden/snapshots/run-help.stdout.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 93fd0f82..3dc814c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,17 @@ Cross-package release notes for relayburn. Package changelogs contain package-le ### Changed +- `relayburn-cli` / `relayburn-sdk`: `burn summary` now accepts repeatable + `--tag k=v` filters and `--group-by-tag ` to report cost/tokens by + generic folded enrichment tags; Claude, Codex, and OpenCode pending stamps + can now be written by external launchers. - `relayburn-sdk` (Rust): reader hot loops in `claude.rs` and `codex.rs` now stream JSONL line-by-line via `BufReader::read_until` instead of pre-allocating a `(size - start_offset)`-byte buffer up front; only the longest single line stays resident. `memchr_newline` in the codex parser now actually uses the `memchr` crate for SIMD-accelerated newline scanning. The main `parse_claude_session` loop also drops `BufReader::lines()` in favor of `read_line` into a reused `String`. (#323) ### Removed +- `relayburn-cli`: removed the `burn run` launcher wrapper from the CLI + surface. Launchers should write attribution with `writePendingStamp()` and + ingest through `burn ingest` / SDK `ingest()`. - Removed the old TypeScript implementation packages from the workspace. The Rust crates now own the SDK and CLI implementation, with npm packages kept for the Node SDK facade, MCP server, and prebuilt CLI wrappers. diff --git a/README.md b/README.md index 107e9842..c674096d 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,6 @@ Burn stores data under `~/.agentworkforce/burn/` by default. Set | [`burn hotspots`](#burn-hotspots) | Find expensive files, commands, and subagents. | | [`burn overhead`](#burn-overhead) | Attribute cached prompt cost to `CLAUDE.md`, `.claude/CLAUDE.md`, and `AGENTS.md`. | | [`burn compare`](#burn-compare) | Compare observed model performance by activity: cost per turn, one-shot rate, and sample size. | -| [`burn run`](#burn-run) | Spawn Claude, Codex, or OpenCode with attribution tags and automatic ingest. | | [`burn ingest`](#burn-ingest) | Import existing or live session logs without wrapping the harness. | | [`burn mcp-server`](#burn-mcp-server) | Expose read-only cost queries to an agent through stdio MCP. | | [`burn state`](#burn-state) | Inspect, rebuild, and prune derived ledger artifacts. | @@ -38,6 +37,8 @@ tokens they used, and what they cost. | `--since ` | Limit to a relative range like `24h`, `7d`, or `4w`, or an ISO timestamp. | | `--project ` | Limit to a project path or git-canonical project key. | | `--session ` | Limit to one session. | +| `--tag k=v` | Filter by folded enrichment tag. Repeatable; all tags must match. | +| `--group-by-tag ` | Group totals by a folded enrichment tag value. | | `--by-provider` | Group totals by provider instead of model. | | `--json` | Emit machine-readable output. | @@ -46,6 +47,8 @@ tokens they used, and what they cost. | `burn summary` | All-time cost by model. | | `burn summary --since 24h` | Cost from the last 24 hours. | | `burn summary --by-provider` | Cost grouped by effective provider. | +| `burn summary --tag persona=code-reviewer` | Cost for sessions stamped with that persona tag. | +| `burn summary --group-by-tag persona` | Cost grouped by persona value. | Synthetic-routed models are recognized from `hf:*`, `accounts/fireworks/models/*`, and `synthetic/*`. @@ -123,38 +126,6 @@ testing, review, exploration, docs, and refactoring. Run `burn summary --by-provider` to discover model IDs present in your ledger. -## `burn run` - -Use `burn run` when you want Burn to launch the harness, stamp metadata, watch -for session logs, and ingest turns as the child exits. Supported harnesses: -`claude`, `codex`, and `opencode`. - -| Option | What it does | -|---|---| -| `` | Required harness: `claude`, `codex`, or `opencode`. | -| `--tag k=v` | Stamp metadata onto the session. Repeatable. | -| `-- ` | Pass everything after `--` to the underlying harness. | - -| Example | Result | -|---|---| -| `burn run claude --tag workflow=refactor -- --resume` | Resume Claude and stamp `workflow=refactor`. | -| `burn run codex --tag workflow=refactor` | Run Codex with workflow attribution. | -| `burn run opencode --tag agentId=ag-42 --tag tier=best` | Run OpenCode with agent and tier tags. | - -Burn also reads these attribution environment variables and re-exports them to -the child process: - -| Env var | Stamp key | -|---|---| -| `RELAYBURN_WORKFLOW_ID` | `workflowId` | -| `RELAYBURN_STEP_ID` | `stepId` | -| `RELAYBURN_AGENT_ID` | `agentId` | -| `RELAYBURN_PARENT_AGENT_ID` | `parentAgentId` | -| `RELAYBURN_PERSONA` | `persona` | -| `RELAYBURN_TIER` | `tier` | - -Explicit `--tag k=v` values win over environment-derived tags. - ## `burn ingest` Use `burn ingest` when sessions already exist, or when another process owns the @@ -223,7 +194,7 @@ content/search data in `content.sqlite`. | `~/.agentworkforce/burn/burn.sqlite` | Events, stamps, sessions, relationships, and archive metadata. | | `~/.agentworkforce/burn/content.sqlite` | Content blobs and the FTS5 search index. | | `~/.agentworkforce/burn/config.json` | Content-storage and retention configuration. | -| `~/.agentworkforce/burn/pending-stamps/` | Temporary manifests used by harnesses that do not expose a session ID before spawn. | +| `~/.agentworkforce/burn/pending-stamps/` | Temporary manifests used by launchers that do not expose a session ID before spawn. | | `RELAYBURN_HOME` | Override the whole Burn data directory. | | `RELAYBURN_SQLITE_PATH` | Override the events database path. | | `RELAYBURN_CONTENT_PATH` | Override the content database path. | @@ -311,9 +282,12 @@ ID, and the tier. Burn accepts that context, attaches it to the session, and makes it queryable later alongside the usage data from the session log. The primitive is **stamping**: attach metadata to a session by ID, before or -after any turns have been recorded. For command-line use, prefer `burn run - --tag k=v`; direct Rust embedders can use `relayburn_sdk::Stamp` -and `relayburn_sdk::StampSelector` against a `LedgerHandle`. +after any turns have been recorded. Launchers that do not know the session ID +before spawn should call `@relayburn/sdk` `writePendingStamp()` before +starting the agent, then run `burn ingest` / `ingest()` to fold the tags onto +the discovered turns. Direct Rust embedders with an exact session ID can use +`relayburn_sdk::Stamp` and `relayburn_sdk::StampSelector` against a +`LedgerHandle`. Stamp selectors: @@ -330,33 +304,35 @@ context and decides what to attach. ### Spawner-integrated ingest -All three supported harnesses - Claude, Codex, and OpenCode - ship under one -verb: +The recommended launcher integration is the Node SDK pending-stamp primitive: -```bash -burn run [--tag k=v ...] [-- ] -``` - -For Claude, the adapter generates a session UUID up front so metadata can be -stamped before the agent starts. It passes `--session-id` to Claude, applies -any `--tag k=v` pairs as stamps, and ingests the session into the ledger when -Claude exits. If you are building an orchestrator, the same pattern applies: -generate the UUID, stamp first, spawn with the UUID, then let burn pick up the -session log on ingest. +```js +import { writePendingStamp } from "@relayburn/sdk"; -```bash -burn run claude --tag workflow=refactor --tag persona=senior-eng -- --resume abc +await writePendingStamp({ + harness: "codex", + cwd: process.cwd(), + enrichment: { + persona: "code-reviewer", + personaTier: "senior", + agentworkforce: "1", + }, +}); ``` -Codex and OpenCode do not expose Claude-style hooks or a pre-spawn session ID. -Their adapters write a pending-stamp manifest under -`$RELAYBURN_HOME/pending-stamps/` before spawning, then resolve it against the -first matching session file before the first turn is appended. They also run -burn's foreground watch loop for the child process lifetime, so long sessions -become visible incrementally instead of only after exit. Abandoned pending -manifests are cleaned up after 24 hours. +Then spawn the harness normally and let `burn ingest` or `ingest()` scan the +session stores. Claude launchers can either preallocate `--session-id` and +write an exact session stamp from Rust, or use `writePendingStamp({ harness: +"claude", ... })` when the final session ID is not available before spawn. -For passive ingest without a wrapper, run: +Codex and OpenCode do not expose a pre-spawn session ID. `writePendingStamp()` +writes a pending-stamp manifest under `$RELAYBURN_HOME/pending-stamps/` before +the launcher spawns the agent. Ingest resolves the manifest against the first +matching session file before the first turn is appended. Claude launchers can +use the same pending-stamp path when the final session ID is not available +before spawn. Abandoned pending manifests are cleaned up after 24 hours. + +For passive ingest, run: ```bash burn ingest @@ -367,37 +343,6 @@ burn ingest --watch [--interval ] cursor and dedup path as the reporting commands. `burn ingest --watch` keeps that scan loop running in the foreground. -### Spawner env-var contract - -For orchestrators that spawn many agent sessions, threading `--tag` through -every wrapper invocation is awkward. All three adapters also read a fixed set -of `RELAYBURN_*` env vars and fold them into the stamp bag: - -| Env var | Stamp key | -|---|---| -| `RELAYBURN_WORKFLOW_ID` | `workflowId` | -| `RELAYBURN_STEP_ID` | `stepId` | -| `RELAYBURN_AGENT_ID` | `agentId` | -| `RELAYBURN_PARENT_AGENT_ID` | `parentAgentId` | -| `RELAYBURN_PERSONA` | `persona` | -| `RELAYBURN_TIER` | `tier` | - -`--tag k=v` flags win on key collision. The merged values are re-exported on -the child harness environment under their canonical names, so a transitive -`burn ...` invocation inside the child session inherits the same context -without the orchestrator having to re-thread it. - -```bash -export RELAYBURN_WORKFLOW_ID=wf-refactor-auth -export RELAYBURN_AGENT_ID=ag-42 -burn run codex -burn run opencode --tag agentId=ag-43 -``` - -Other `RELAYBURN_*` variables (`RELAYBURN_HOME`, `RELAYBURN_SESSION_ID`, -`RELAYBURN_CONTENT_STORE`, `RELAYBURN_CONTENT_TTL_DAYS`) are burn internals and -are not treated as stamp tags. - ### Hook-based ingest for orchestrators If your code already controls the Claude Code spawn, you can install burn's diff --git a/crates/relayburn-cli/Cargo.toml b/crates/relayburn-cli/Cargo.toml index 0a5cfe19..74259040 100644 --- a/crates/relayburn-cli/Cargo.toml +++ b/crates/relayburn-cli/Cargo.toml @@ -11,11 +11,10 @@ description = "The `burn` CLI — published to crates.io. Crate name is relaybur name = "burn" path = "src/main.rs" -# A library target is added alongside the `burn` binary so the harness -# substrate (`HarnessAdapter` trait, registry, pending-stamp factory) -# under `src/harnesses/` can be unit-tested with `cargo test -p -# relayburn-cli` and so future integration tests under `tests/` can -# reach it without re-declaring the module tree. The binary path +# A library target is added alongside the `burn` binary so internal +# modules can be unit-tested with `cargo test -p relayburn-cli` and so +# future integration tests under `tests/` can reach them without +# re-declaring the module tree. The binary path # (`src/main.rs`) and the library path (`src/lib.rs`) live side-by-side # — cargo treats them as two separate compile units that share the # same package metadata. @@ -66,25 +65,17 @@ serde_json = { workspace = true, features = ["preserve_order"] } anyhow = { workspace = true } thiserror = { workspace = true } -# Harness substrate (#248-b): the `HarnessAdapter` trait under `harnesses/` -# uses `async fn` in trait — `async-trait` desugars to `Pin>` futures -# without forcing every adapter to spell that out. `phf` macros give us a -# perfect-hash registry that's evaluated at compile time, so harness lookup -# costs nothing at startup. `tokio::sync` is needed for the watch controller -# wiring exposed by `relayburn-sdk` (`WatchController` holds a `Mutex`). -# `rt` is required by Wave 2 read-path commands so they can drive -# `relayburn_sdk::ingest_all` (async) from the otherwise-sync presenter -# bodies via a current-thread runtime. +# Harness substrate unit tests still use `async-trait` and `phf`. `tokio::sync` +# is needed by MCP / ingest presenters, and `rt` is required by read-path +# commands so they can drive `relayburn_sdk::ingest_all` from otherwise-sync +# presenter bodies via a current-thread runtime. async-trait = "0.1" phf = { version = "0.11", features = ["macros"] } -# `process` is needed by `commands/run.rs` so the `burn run` driver can -# `await` the child via `tokio::process::Command::status()` rather than -# blocking the current-thread runtime with `std::process::Command::status()`. # `signal` is needed for the `burn ingest --watch` SIGINT/SIGTERM trap # (#248 D8); the watch loop blocks the foreground until a stop signal # comes in. `rt` drives the current-thread runtime that wraps the SDK's # async ingest verb from otherwise-sync presenter bodies. -tokio = { workspace = true, features = ["sync", "rt", "process", "signal"] } +tokio = { workspace = true, features = ["sync", "rt", "signal"] } # `IndexMap` preserves first-seen iteration order, which matters for the # Wave 2 read-path commands so their grouped output (`summary --by-model`, diff --git a/crates/relayburn-cli/src/cli.rs b/crates/relayburn-cli/src/cli.rs index 3bd8c283..0d95e8af 100644 --- a/crates/relayburn-cli/src/cli.rs +++ b/crates/relayburn-cli/src/cli.rs @@ -101,10 +101,6 @@ pub enum Command { /// Compare cost across two or more models on the same workload. Compare(CompareArgs), - /// Run an agent CLI under a harness wrapper that ingests its - /// session log on exit. - Run(RunArgs), - /// Inspect or rebuild derived state under `~/.agentworkforce/burn`. State(StateArgs), @@ -182,40 +178,6 @@ pub struct McpServerArgs { pub debug: bool, } -/// `burn run [--tag k=v ...] [-- ]` — flags + -/// trailing argv for the harness driver. Mirrors the TS surface in -/// `packages/cli/src/commands/run.ts`. -/// -/// The first positional is the harness name (`claude`, `codex`, -/// `opencode`). Everything after `--` (or any unknown flag, courtesy of -/// `trailing_var_arg`) is captured into `passthrough` and forwarded to -/// the spawned binary verbatim. `--tag k=v` may be repeated; bad shapes -/// (no `=`, empty key) are rejected at runtime by the driver with the -/// same error message as the TS sibling. -#[derive(Debug, Clone, ClapArgs)] -pub struct RunArgs { - /// Lowercase harness identifier (`claude`, `codex`, `opencode`). - /// Optional so `burn run --help` and `burn run` both succeed; the - /// driver translates a missing name to a help-or-exit-2 outcome - /// matching the TS sibling. - #[arg(value_name = "HARNESS")] - pub harness: Option, - - /// User-supplied stamp enrichment. Repeatable — `--tag workflow=foo - /// --tag agent=bar` produces `{"workflow":"foo","agent":"bar"}` on - /// the resulting [`relayburn_sdk::Stamp`]. - #[arg(long = "tag", value_name = "K=V")] - pub tag: Vec, - - /// Everything after the harness name (or `--`). Forwarded to the - /// spawned binary in `SpawnPlan::args` after the adapter's own - /// transport-level args. `trailing_var_arg = true` makes clap stop - /// option parsing at the first non-flag token so `burn run claude - /// --resume` works without an explicit `--`. - #[arg(trailing_var_arg = true, allow_hyphen_values = true, value_name = "ARGS")] - pub passthrough: Vec, -} - /// Per-command flag set for `burn compare`. Mirrors /// `packages/cli/src/commands/compare.ts` so the CLI surfaces match /// byte-for-byte; see that file for the canonical help text. diff --git a/crates/relayburn-cli/src/commands/ingest.rs b/crates/relayburn-cli/src/commands/ingest.rs index 2ce99582..4eef02b1 100644 --- a/crates/relayburn-cli/src/commands/ingest.rs +++ b/crates/relayburn-cli/src/commands/ingest.rs @@ -359,6 +359,7 @@ mod tests { scanned_sessions: 1, ingested_sessions: 1, appended_turns: 1, + applied_pending_stamps: 0, }); assert_eq!(one, "[burn] ingest: ingested 1 session (+1 turn)\n"); @@ -366,6 +367,7 @@ mod tests { scanned_sessions: 3, ingested_sessions: 2, appended_turns: 5, + applied_pending_stamps: 0, }); assert_eq!(many, "[burn] ingest: ingested 2 sessions (+5 turns)\n"); diff --git a/crates/relayburn-cli/src/commands/mod.rs b/crates/relayburn-cli/src/commands/mod.rs index 7027312a..8c38fcb1 100644 --- a/crates/relayburn-cli/src/commands/mod.rs +++ b/crates/relayburn-cli/src/commands/mod.rs @@ -11,7 +11,6 @@ //! - `hotspots` — wraps `relayburn_sdk::hotspots` //! - `overhead` — wraps `relayburn_sdk::overhead` (+ `overhead trim`) //! - `compare` — wraps `relayburn_sdk::compare` -//! - `run` — driver around `HarnessAdapter` (added in #248-b) //! - `state` — status / rebuild / prune / reset //! - `ingest` — no-flag, `--watch`, `--hook claude --quiet` //! - `mcp_server` — rmcp wrapper around the SDK query verbs @@ -24,6 +23,5 @@ pub mod hotspots; pub mod ingest; pub mod mcp_server; pub mod overhead; -pub mod run; pub mod state; pub mod summary; diff --git a/crates/relayburn-cli/src/commands/run.rs b/crates/relayburn-cli/src/commands/run.rs deleted file mode 100644 index 3d646918..00000000 --- a/crates/relayburn-cli/src/commands/run.rs +++ /dev/null @@ -1,358 +0,0 @@ -//! `burn run ` — wrapper that spawns an agent CLI under a -//! `HarnessAdapter` and ingests its session log on exit. -//! -//! Mirrors `packages/cli/src/commands/run.ts`. Lifecycle: -//! -//! 1. Resolve the named adapter from -//! [`relayburn_cli::harnesses::lookup`]. Unknown name → typed error -//! listing the known set. -//! 2. Build a [`relayburn_cli::harnesses::PlanCtx`] from `cwd`, -//! `passthrough`, and the merged `--tag` / `RELAYBURN_*` enrichment. -//! 3. `adapter.plan(&ctx).await` → [`relayburn_cli::harnesses::SpawnPlan`]. -//! 4. `adapter.before_spawn(&ctx, &plan).await` — claude stamps now; -//! pending-stamp adapters drop a manifest the post-exit pass resolves. -//! 5. Optional `adapter.start_watcher(&ctx, sink)` — claude returns -//! `None`; codex/opencode (D6) drain their session store while the -//! child runs. Reports flow into the same accumulator as `after_exit`. -//! 6. Spawn the child. `stdio: 'inherit'` mirrors the TS sibling. -//! 7. Wait for exit. The driver is **transparent** — the user-visible -//! exit code is the child's; relayburn's own ingest failures fall -//! through `report_error`. -//! 8. Stop the watcher (if any), run `adapter.after_exit(&ctx, &plan).await`, -//! fold both reports into a single -//! `[burn] ingest: N session(s) (+M turns)` line on stderr. -//! -//! The driver is async so adapter calls can stay async; we drive it on a -//! current-thread tokio runtime, the same pattern the D1 summary -//! presenter uses for `ingest_all`. Process spawn goes through -//! `tokio::process::Command::status().await` so the watcher can tick -//! while the child is alive — `std::process::Command::status()` would -//! synchronously block the only thread on the current-thread runtime. - -use std::collections::BTreeMap; -use std::process::{ExitStatus, Stdio}; -use std::sync::{Arc, Mutex}; - -use relayburn_cli::harnesses::{list_harness_names, lookup, HarnessAdapter, PlanCtx}; -use relayburn_cli::util::time::iso_from_system_time; -use relayburn_sdk::{Enrichment, IngestReport, ReportSink}; -use tokio::process::Command as TokioCommand; - -use crate::cli::{GlobalArgs, RunArgs}; -use crate::render::error::report_error; - -/// Spawner-owned tagging contract. Mirrors `SPAWN_ENV_TAG_KEYS` in -/// `packages/cli/src/spawn-tags.ts` byte-for-byte. Keep this in lockstep -/// with the TS sibling — orchestrators thread the same env vars across. -const SPAWN_ENV_TAG_KEYS: &[(&str, &str)] = &[ - ("RELAYBURN_WORKFLOW_ID", "workflowId"), - ("RELAYBURN_STEP_ID", "stepId"), - ("RELAYBURN_AGENT_ID", "agentId"), - ("RELAYBURN_PARENT_AGENT_ID", "parentAgentId"), - ("RELAYBURN_PERSONA", "persona"), - ("RELAYBURN_TIER", "tier"), -]; - -const RUN_HELP_PREFIX: &str = "burn run — spawn an agent harness with attribution\n\n\ -Usage:\n burn run [--tag k=v ...] [-- ]\n\n"; - -const RUN_HELP_EXAMPLES: &str = "\nExamples:\n \ -burn run claude --tag workflow=refactor -- --resume\n \ -burn run codex --tag workflow=refactor\n \ -burn run opencode --tag workflow=refactor\n"; - -pub fn run(globals: &GlobalArgs, args: RunArgs) -> i32 { - match run_inner(globals, args) { - Ok(code) => code, - Err(err) => report_error(&err, globals), - } -} - -fn run_inner(globals: &GlobalArgs, args: RunArgs) -> anyhow::Result { - // No harness positional → print help + exit 2 (TS sibling does the - // same; clap won't trigger this for `burn run --help` because clap's - // built-in help short-circuits the dispatch entirely with exit 0). - let harness_name = match args.harness.as_deref() { - Some(name) if !name.is_empty() => name.to_string(), - _ => { - print_run_help(); - return Ok(2); - } - }; - - let adapter = match lookup(&harness_name) { - Some(a) => a, - None => { - let known = list_harness_names().join(", "); - return Err(anyhow::anyhow!( - "unknown harness \"{harness_name}\". Known: {known}" - )); - } - }; - - let tags = build_enrichment(&args.tag)?; - - // `--ledger-path` is honored by setting RELAYBURN_HOME for the rest - // of this process. The adapter's `before_spawn`/`after_exit` open - // their own `Ledger` via env-var fallback, and the spawned child - // inherits the same value. Mirrors how summary.rs threads - // `globals.ledger_path` into `LedgerOpenOptions::with_home`, but - // works through env so adapter calls see it. - if let Some(p) = globals.ledger_path.as_deref() { - std::env::set_var("RELAYBURN_HOME", p); - } - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - rt.block_on(drive(globals, adapter, args.passthrough, tags)) -} - -fn print_run_help() { - let mut s = String::new(); - s.push_str(RUN_HELP_PREFIX); - s.push_str("Known harnesses: "); - s.push_str(&list_harness_names().join(", ")); - s.push('\n'); - s.push_str(RUN_HELP_EXAMPLES); - print!("{s}"); -} - -/// Async core. Owns the plan → before_spawn → spawn → after_exit -/// sequence and aggregates ingest reports. -async fn drive( - globals: &GlobalArgs, - adapter: &'static dyn HarnessAdapter, - passthrough: Vec, - user_tags: Enrichment, -) -> anyhow::Result { - // Merge env-derived defaults with explicit `--tag` flags. CLI flags - // win on key collision. - let mut tags: Enrichment = read_spawn_env_tags(); - for (k, v) in user_tags { - tags.insert(k, v); - } - tags.insert("harness".to_string(), adapter.name().to_string()); - tags.insert("burnSpawn".to_string(), "1".to_string()); - let spawn_start_ts = std::time::SystemTime::now(); - tags.insert("burnSpawnTs".to_string(), iso_from_system_time(spawn_start_ts)); - - let cwd = std::env::current_dir()?; - let ctx = PlanCtx { - cwd, - passthrough, - tags: tags.clone(), - ledger_home: globals.ledger_path.clone(), - spawn_start_ts, - }; - - let plan = adapter.plan(&ctx).await?; - adapter.before_spawn(&ctx, &plan).await?; - - // Watcher accumulator: every tick adds to the running totals; we - // aggregate after_exit's report on top. The TS sibling does the same. - let totals = Arc::new(Mutex::new(IngestReport::default())); - let totals_for_sink = totals.clone(); - let on_report: ReportSink = - Arc::new(move |report: &IngestReport| { - if let Ok(mut t) = totals_for_sink.lock() { - t.scanned_sessions += report.scanned_sessions; - t.ingested_sessions += report.ingested_sessions; - t.appended_turns += report.appended_turns; - } - }); - - let watcher = adapter.start_watcher(&ctx, on_report); - if watcher.is_some() { - eprintln!("[burn] {}: ingest watcher ready", adapter.name()); - } - eprintln!("[burn] {}: starting {}", adapter.name(), plan.binary); - - // Spawn the child. inherits stdio so the user-facing harness UI - // stays interactive. Layer plan.env_overrides on top of the parent - // env, plus re-export the merged tag bag so transitive `burn …` - // invocations inside the child see the same context. - // - // Use `tokio::process::Command::status().await` (not - // `std::process::Command::status()`): the driver runs on a - // current-thread tokio runtime so a synchronous `status()` call - // would block the only thread, starving any watcher ticks scheduled - // on the same runtime. The async variant yields between tokio - // primitives so periodic watcher work can land while the child - // lives. - let mut cmd = TokioCommand::new(&plan.binary); - cmd.args(&plan.args); - cmd.stdin(Stdio::inherit()) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()); - for (k, v) in spawn_tag_env_overrides(&tags) { - cmd.env(k, v); - } - for (k, v) in &plan.env_overrides { - cmd.env(k, v); - } - - // First tick fires immediately so a fast-finishing child has at - // least one chance to drain new sessions before exit. This mirrors - // `void watcher.tick()` in run.ts. We swallow tick errors on - // purpose — the watch loop logs internally and the after_exit pass - // is the source-of-truth fallback. - if let Some(w) = &watcher { - w.tick().await; - } - - // Capture the spawn outcome up front so cleanup ALWAYS runs: - // `before_spawn` may have written a stamp / pending manifest that - // `after_exit` needs to reconcile, and the watcher may have - // accumulated reports during its first tick. Returning early on - // spawn failure (the previous shape) skipped both. The TS sibling - // runs finalization regardless of spawn success; this matches that. - let spawn_outcome: SpawnOutcome = match cmd.status().await { - Ok(status) => SpawnOutcome::Exited(status), - Err(err) => { - eprintln!("[burn] failed to spawn {}: {err}", plan.binary); - SpawnOutcome::SpawnFailed - } - }; - - if let Some(w) = &watcher { - w.stop().await; - } - - // `after_exit` may itself fail (stamp resolve, ledger I/O); fold - // that error into the summary line rather than short-circuiting, - // so the user always gets the `[burn] ingest: …` line. - match adapter.after_exit(&ctx, &plan).await { - Ok(final_report) => { - let mut t = totals.lock().unwrap(); - t.scanned_sessions += final_report.scanned_sessions; - t.ingested_sessions += final_report.ingested_sessions; - t.appended_turns += final_report.appended_turns; - } - Err(err) => { - eprintln!("[burn] {} after_exit failed: {err}", adapter.name()); - } - } - let totals = totals.lock().unwrap().clone(); - let session_word = if totals.ingested_sessions == 1 { - "session" - } else { - "sessions" - }; - let turn_word = if totals.appended_turns == 1 { - "turn" - } else { - "turns" - }; - eprintln!( - "[burn] {} ingest: {} {} (+{} {})", - adapter.name(), - totals.ingested_sessions, - session_word, - totals.appended_turns, - turn_word, - ); - - // Match the TS sibling: 127 for spawn failure (POSIX "command not - // found"-ish), otherwise propagate the child's exit code (0 if it - // exited via signal, mirroring `ExitStatus::code().unwrap_or(0)`). - Ok(match spawn_outcome { - SpawnOutcome::Exited(status) => status.code().unwrap_or(0), - SpawnOutcome::SpawnFailed => 127, - }) -} - -/// Captured spawn result. The driver finalizes (stops the watcher, runs -/// `after_exit`, emits the summary line) regardless of which arm fired, -/// then maps to a process exit code at the very end. -enum SpawnOutcome { - Exited(ExitStatus), - SpawnFailed, -} - -/// Parse `--tag k=v` repetitions into an [`Enrichment`]. Mirrors the -/// TS sibling's `--tag` parser shape — bad input throws a typed error -/// rather than silently dropping the entry. -fn build_enrichment(tags: &[String]) -> anyhow::Result { - let mut out: Enrichment = BTreeMap::new(); - for raw in tags { - let (k, v) = raw - .split_once('=') - .ok_or_else(|| anyhow::anyhow!("--tag expects k=v, got \"{raw}\""))?; - if k.is_empty() { - return Err(anyhow::anyhow!("--tag key must be non-empty (got \"{raw}\")")); - } - out.insert(k.to_string(), v.to_string()); - } - Ok(out) -} - -/// Read `RELAYBURN_*` env vars into an enrichment bag. Mirrors -/// `readSpawnEnvTags` in `spawn-tags.ts`. -fn read_spawn_env_tags() -> Enrichment { - let mut out: Enrichment = BTreeMap::new(); - for (env, tag) in SPAWN_ENV_TAG_KEYS { - if let Ok(v) = std::env::var(env) { - if !v.is_empty() { - out.insert((*tag).to_string(), v); - } - } - } - out -} - -/// Inverse of `read_spawn_env_tags`: re-export the merged tag bag as -/// `RELAYBURN_*` env so the spawned harness's transitive `burn …` -/// invocations inherit the same context. -fn spawn_tag_env_overrides(final_tags: &Enrichment) -> Vec<(String, String)> { - let mut out = Vec::new(); - for (env, tag) in SPAWN_ENV_TAG_KEYS { - if let Some(v) = final_tags.get(*tag) { - if !v.is_empty() { - out.push(((*env).to_string(), v.clone())); - } - } - } - out -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn build_enrichment_parses_kv_pairs() { - let got = build_enrichment(&[ - "workflow=refactor".into(), - "agent=alpha".into(), - ]) - .unwrap(); - assert_eq!(got.get("workflow").map(String::as_str), Some("refactor")); - assert_eq!(got.get("agent").map(String::as_str), Some("alpha")); - } - - #[test] - fn build_enrichment_rejects_missing_eq() { - let err = build_enrichment(&["workflow".into()]).unwrap_err(); - assert!(format!("{err}").contains("--tag expects k=v")); - } - - #[test] - fn build_enrichment_rejects_empty_key() { - let err = build_enrichment(&["=missing-key".into()]).unwrap_err(); - assert!(format!("{err}").contains("--tag key must be non-empty")); - } - - #[test] - fn spawn_tag_env_overrides_re_exports_known_keys() { - let mut tags: Enrichment = BTreeMap::new(); - tags.insert("workflowId".into(), "wf-1".into()); - tags.insert("agentId".into(), "agent-x".into()); - tags.insert("burnSpawn".into(), "1".into()); // not in keys → dropped - let env = spawn_tag_env_overrides(&tags); - let map: BTreeMap<_, _> = env.into_iter().collect(); - assert_eq!(map.get("RELAYBURN_WORKFLOW_ID").map(String::as_str), Some("wf-1")); - assert_eq!(map.get("RELAYBURN_AGENT_ID").map(String::as_str), Some("agent-x")); - assert!(!map.contains_key("RELAYBURN_BURN_SPAWN")); - } -} diff --git a/crates/relayburn-cli/src/commands/summary.rs b/crates/relayburn-cli/src/commands/summary.rs index 8d81face..90403a04 100644 --- a/crates/relayburn-cli/src/commands/summary.rs +++ b/crates/relayburn-cli/src/commands/summary.rs @@ -23,7 +23,7 @@ //! [`relayburn_sdk::summarize_replacement_savings`]. //! 5. Render JSON or human format. -use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use clap::Args; use indexmap::IndexMap; @@ -97,6 +97,14 @@ pub struct SummaryArgs { #[arg(long, value_name = "WORKFLOW_ID")] pub workflow: Option, + /// Filter by folded enrichment tag. Repeatable; every tag must match. + #[arg(long = "tag", value_name = "K=V")] + pub tag: Vec, + + /// Group totals by a folded enrichment tag value. + #[arg(long = "group-by-tag", value_name = "KEY")] + pub group_by_tag: Option, + /// Provider filter (CSV of provider names; case-insensitive). #[arg(long, value_name = "PROVIDERS")] pub provider: Option, @@ -127,29 +135,43 @@ fn run_inner(globals: &GlobalArgs, args: SummaryArgs) -> anyhow::Result { && (args.by_provider || args.by_subagent_type || args.by_relationship.is_some() - || args.subagent_tree.is_some()) + || args.subagent_tree.is_some() + || args.group_by_tag.is_some()) { eprintln!( - "burn: --by-tool cannot be combined with --by-provider/--by-subagent-type/--by-relationship/--subagent-tree" + "burn: --by-tool cannot be combined with --by-provider/--by-subagent-type/--by-relationship/--subagent-tree/--group-by-tag" ); return Ok(2); } if args.by_provider - && (args.by_subagent_type || args.by_relationship.is_some() || args.subagent_tree.is_some()) + && (args.by_subagent_type + || args.by_relationship.is_some() + || args.subagent_tree.is_some() + || args.group_by_tag.is_some()) { eprintln!( - "burn: --by-provider cannot be combined with --by-subagent-type/--by-relationship/--subagent-tree" + "burn: --by-provider cannot be combined with --by-subagent-type/--by-relationship/--subagent-tree/--group-by-tag" ); return Ok(2); } - if args.by_subagent_type && (args.by_relationship.is_some() || args.subagent_tree.is_some()) { + if args.by_subagent_type + && (args.by_relationship.is_some() + || args.subagent_tree.is_some() + || args.group_by_tag.is_some()) + { eprintln!( - "burn: --by-subagent-type cannot be combined with --by-relationship/--subagent-tree" + "burn: --by-subagent-type cannot be combined with --by-relationship/--subagent-tree/--group-by-tag" ); return Ok(2); } - if args.by_relationship.is_some() && args.subagent_tree.is_some() { - eprintln!("burn: --by-relationship cannot be combined with --subagent-tree"); + if args.by_relationship.is_some() + && (args.subagent_tree.is_some() || args.group_by_tag.is_some()) + { + eprintln!("burn: --by-relationship cannot be combined with --subagent-tree/--group-by-tag"); + return Ok(2); + } + if args.subagent_tree.is_some() && args.group_by_tag.is_some() { + eprintln!("burn: --subagent-tree cannot be combined with --group-by-tag"); return Ok(2); } if let Some(rel) = &args.by_relationship { @@ -233,13 +255,28 @@ fn run_inner(globals: &GlobalArgs, args: SummaryArgs) -> anyhow::Result { None }; - if args.by_provider { + if let Some(tag_key) = args.group_by_tag.as_deref() { + let (rows, values) = aggregate_by_tag(&enriched, tag_key, &pricing); + emit_grouped( + globals, + SummaryGroup::Tag { + key: tag_key, + values: &values, + }, + &rows, + &turns, + &ingest_report, + &fidelity, + &savings, + quality.as_ref(), + ); + } else if args.by_provider { let rows = aggregate_by_provider(&turns, AggregateByProviderOptions::new(&pricing)); let provider_rows: Vec = rows.into_iter().map(provider_to_aggregate_row).collect(); emit_grouped( globals, - true, + SummaryGroup::Provider, &provider_rows, &turns, &ingest_report, @@ -251,7 +288,7 @@ fn run_inner(globals: &GlobalArgs, args: SummaryArgs) -> anyhow::Result { let rows = aggregate_by_model(&turns, &pricing); emit_grouped( globals, - false, + SummaryGroup::Model, &rows, &turns, &ingest_report, @@ -416,14 +453,50 @@ fn build_query(args: &SummaryArgs) -> anyhow::Result { if let Some(since) = normalize_since(args.since.as_deref())? { q.since = Some(since); } + if let Some(tag_key) = args.group_by_tag.as_deref() { + if tag_key.is_empty() { + anyhow::bail!("burn: --group-by-tag requires a non-empty key"); + } + } + let mut enrichment = BTreeMap::new(); if let Some(workflow) = &args.workflow { - let mut enrichment = std::collections::BTreeMap::new(); enrichment.insert("workflowId".to_string(), workflow.clone()); + } + for (key, value) in parse_tag_filters(&args.tag)? { + if let Some(existing) = enrichment.get(&key) { + if existing != &value { + anyhow::bail!( + "burn: conflicting filters for tag \"{key}\" ({existing:?} vs {value:?})" + ); + } + } + enrichment.insert(key, value); + } + if !enrichment.is_empty() { q.enrichment = Some(enrichment); } Ok(q) } +fn parse_tag_filters(tags: &[String]) -> anyhow::Result> { + let mut out = BTreeMap::new(); + for raw in tags { + let (key, value) = raw + .split_once('=') + .ok_or_else(|| anyhow::anyhow!("burn: --tag expects k=v, got \"{raw}\""))?; + if key.is_empty() { + anyhow::bail!("burn: --tag key must be non-empty (got \"{raw}\")"); + } + if let Some(existing) = out.get(key) { + anyhow::bail!( + "burn: duplicate --tag filter for key \"{key}\" ({existing:?} vs {value:?})" + ); + } + out.insert(key.to_string(), value.to_string()); + } + Ok(out) +} + /// Run an ingest sweep on the open handle. Builds a current-thread tokio /// runtime so the otherwise-sync presenter can drive the async verb. fn run_ingest(handle: &mut LedgerHandle) -> anyhow::Result { @@ -515,6 +588,58 @@ fn aggregate_by_model( rows } +fn aggregate_by_tag( + turns: &[EnrichedTurn], + tag_key: &str, + pricing: &relayburn_sdk::PricingTable, +) -> (Vec, Vec>) { + let mut by_value: HashMap, UsageCostAggregateRow> = HashMap::new(); + let mut order: Vec> = Vec::new(); + for enriched in turns { + let value = enriched.enrichment.get(tag_key).cloned(); + let label = value.clone().unwrap_or_else(|| "(untagged)".to_string()); + let row = by_value.entry(value.clone()).or_insert_with(|| { + order.push(value.clone()); + empty_row(&label) + }); + row.turns += 1; + row.usage.input += enriched.turn.usage.input; + row.usage.output += enriched.turn.usage.output; + row.usage.reasoning += enriched.turn.usage.reasoning; + row.usage.cache_read += enriched.turn.usage.cache_read; + row.usage.cache_create_5m += enriched.turn.usage.cache_create_5m; + row.usage.cache_create_1h += enriched.turn.usage.cache_create_1h; + accumulate_coverage( + &mut row.coverage, + enriched.turn.fidelity.as_ref().map(|f| &f.coverage), + ); + if let Some(c) = cost_for_turn(&enriched.turn, pricing) { + row.cost.total += c.total; + row.cost.input += c.input; + row.cost.output += c.output; + row.cost.reasoning += c.reasoning; + row.cost.cache_read += c.cache_read; + row.cost.cache_create += c.cache_create; + } + } + + let mut pairs: Vec<(Option, UsageCostAggregateRow)> = order + .into_iter() + .map(|value| { + let row = by_value.remove(&value).unwrap(); + (value, row) + }) + .collect(); + pairs.sort_by(|a, b| { + b.1.cost + .total + .partial_cmp(&a.1.cost.total) + .unwrap_or(std::cmp::Ordering::Equal) + }); + let (values, rows): (Vec<_>, Vec<_>) = pairs.into_iter().unzip(); + (rows, values) +} + fn provider_to_aggregate_row(p: ProviderAggregateRow) -> UsageCostAggregateRow { UsageCostAggregateRow { label: p.label, @@ -594,10 +719,45 @@ fn coverage_cell(value: u64, c: &relayburn_sdk::FieldCoverage) -> String { format_uint(value) } +enum SummaryGroup<'a> { + Model, + Provider, + Tag { + key: &'a str, + values: &'a [Option], + }, +} + +impl<'a> SummaryGroup<'a> { + fn json_key(&self) -> &'static str { + match self { + SummaryGroup::Model => "byModel", + SummaryGroup::Provider => "byProvider", + SummaryGroup::Tag { .. } => "byTag", + } + } + + fn human_label(&self) -> &'static str { + match self { + SummaryGroup::Model => "model", + SummaryGroup::Provider => "provider", + SummaryGroup::Tag { .. } => "value", + } + } + + fn per_cell_group_by(&self) -> &'static str { + match self { + SummaryGroup::Model => "model", + SummaryGroup::Provider => "provider", + SummaryGroup::Tag { .. } => "tag", + } + } +} + #[allow(clippy::too_many_arguments)] fn emit_grouped( globals: &GlobalArgs, - by_provider: bool, + group: SummaryGroup<'_>, rows: &[UsageCostAggregateRow], turns: &[TurnRecord], ingest_report: &relayburn_sdk::IngestReport, @@ -609,7 +769,7 @@ fn emit_grouped( if globals.json { emit_json( - by_provider, + group, rows, turns, ingest_report, @@ -620,7 +780,7 @@ fn emit_grouped( return; } emit_human( - by_provider, + &group, rows, ingest_report, &total_cost, @@ -631,7 +791,7 @@ fn emit_grouped( } fn emit_json( - by_provider: bool, + group: SummaryGroup<'_>, rows: &[UsageCostAggregateRow], turns: &[TurnRecord], ingest_report: &relayburn_sdk::IngestReport, @@ -639,29 +799,41 @@ fn emit_json( fidelity: &FidelitySummary, savings: &relayburn_sdk::ReplacementSavingsSummary, ) { - let key = if by_provider { "byProvider" } else { "byModel" }; - let label_key = if by_provider { "provider" } else { "model" }; - let group_rows: Vec = rows .iter() - .map(|r| { - json!({ - label_key: r.label, - "turns": r.turns, - "usage": { + .enumerate() + .map(|(idx, r)| { + let mut row = match &group { + SummaryGroup::Model => json!({ + "model": r.label, + }), + SummaryGroup::Provider => json!({ + "provider": r.label, + }), + SummaryGroup::Tag { key, values } => json!({ + "tag": key, + "value": values.get(idx).cloned().flatten(), + }), + }; + let obj = row.as_object_mut().unwrap(); + obj.insert("turns".into(), json!(r.turns)); + obj.insert( + "usage".into(), + json!({ "input": r.usage.input, "output": r.usage.output, "reasoning": r.usage.reasoning, "cacheRead": r.usage.cache_read, "cacheCreate5m": r.usage.cache_create_5m, "cacheCreate1h": r.usage.cache_create_1h, - }, - "cost": cost_breakdown_to_json(&r.cost), - }) + }), + ); + obj.insert("cost".into(), cost_breakdown_to_json(&r.cost)); + row }) .collect(); - let per_cell = build_per_cell_fidelity(rows, by_provider); + let per_cell = build_per_cell_fidelity(rows, group.per_cell_group_by()); let mut payload = Map::new(); payload.insert( @@ -673,7 +845,7 @@ fn emit_json( ); payload.insert("turns".into(), json!(turns.len())); payload.insert("totalCost".into(), cost_breakdown_to_json(total_cost)); - payload.insert(key.into(), Value::Array(group_rows)); + payload.insert(group.json_key().into(), Value::Array(group_rows)); payload.insert( "fidelity".into(), json!({ @@ -1817,7 +1989,7 @@ fn fidelity_summary_to_json(s: &FidelitySummary) -> Value { Value::Object(out) } -fn build_per_cell_fidelity(rows: &[UsageCostAggregateRow], by_provider: bool) -> Value { +fn build_per_cell_fidelity(rows: &[UsageCostAggregateRow], group_by: &str) -> Value { let cells: Vec = rows .iter() .map(|r| { @@ -1851,7 +2023,7 @@ fn build_per_cell_fidelity(rows: &[UsageCostAggregateRow], by_provider: bool) -> }) .collect(); json!({ - "groupBy": if by_provider { "provider" } else { "model" }, + "groupBy": group_by, "cells": cells, }) } @@ -1889,7 +2061,7 @@ fn replacement_savings_to_json(savings: &relayburn_sdk::ReplacementSavingsSummar } fn emit_human( - by_provider: bool, + group: &SummaryGroup<'_>, rows: &[UsageCostAggregateRow], ingest_report: &relayburn_sdk::IngestReport, total_cost: &CostBreakdown, @@ -1923,9 +2095,8 @@ fn emit_human( return; } - let header_label = if by_provider { "provider" } else { "model" }; let header = vec![ - header_label.to_string(), + group.human_label().to_string(), "turns".into(), "input".into(), "output".into(), @@ -2138,6 +2309,68 @@ mod tests { ); } + #[test] + fn parse_tag_filters_requires_kv_with_non_empty_key() { + let got = parse_tag_filters(&["persona=code-reviewer".to_string()]).unwrap(); + assert_eq!( + got.get("persona").map(String::as_str), + Some("code-reviewer") + ); + + let missing_eq = parse_tag_filters(&["persona".to_string()]).unwrap_err(); + assert!(format!("{missing_eq}").contains("--tag expects k=v")); + + let empty_key = parse_tag_filters(&["=value".to_string()]).unwrap_err(); + assert!(format!("{empty_key}").contains("--tag key must be non-empty")); + + let duplicate = parse_tag_filters(&[ + "persona=code-reviewer".to_string(), + "persona=qa".to_string(), + ]) + .unwrap_err(); + assert!(format!("{duplicate}").contains("duplicate --tag filter")); + } + + #[test] + fn aggregate_by_tag_groups_missing_and_present_values() { + let pricing = load_pricing(None); + let mut tagged_enrichment = BTreeMap::new(); + tagged_enrichment.insert("persona".to_string(), "code-reviewer".to_string()); + let rows = vec![ + EnrichedTurn { + turn: turn( + 0, + "assistant-1", + Usage { + input: 100, + ..Usage::default() + }, + vec![], + ), + enrichment: tagged_enrichment, + }, + EnrichedTurn { + turn: turn( + 1, + "assistant-2", + Usage { + input: 50, + ..Usage::default() + }, + vec![], + ), + enrichment: BTreeMap::new(), + }, + ]; + + let (groups, values) = aggregate_by_tag(&rows, "persona", &pricing); + + assert_eq!(groups.len(), 2); + assert!(values.contains(&Some("code-reviewer".to_string()))); + assert!(values.contains(&None)); + assert_eq!(groups.iter().map(|r| r.turns).sum::(), 2); + } + #[test] fn collect_agent_session_tree_follows_nested_child_sessions_and_agent_ids() { let rels = vec![ diff --git a/crates/relayburn-cli/src/harnesses/mod.rs b/crates/relayburn-cli/src/harnesses/mod.rs index 26770fd4..45ae9e7a 100644 --- a/crates/relayburn-cli/src/harnesses/mod.rs +++ b/crates/relayburn-cli/src/harnesses/mod.rs @@ -1,10 +1,10 @@ -//! Harness substrate — Rust port of `packages/cli/src/harnesses/types.ts` +//! Legacy harness substrate — Rust port of `packages/cli/src/harnesses/types.ts` //! and friends. //! -//! `burn run ` is a wrapper that spawns a coding-agent process -//! (Claude Code, Codex, OpenCode, …), babysits its session log while it -//! runs, and feeds the resulting turns into the relayburn ledger. Every -//! adapter contributes the same five-step shape: +//! The CLI no longer exposes a command that launches agent processes, but +//! these adapters remain as unit-tested reference code for launcher +//! integrations built on the SDK pending-stamp primitives. Every adapter +//! contributes the same five-step shape: //! //! 1. **`plan`** — compute the spawn plan (binary + args + env). Per-harness //! transports inject session ids or hook arguments here. @@ -17,27 +17,18 @@ //! adapters that share the pending-stamp shape (codex, opencode) wire //! the watch loop through [`pending_stamp::adapter`]. //! 4. **`after_exit`** — run a final ingest pass after the child exits and -//! return an [`IngestReport`] so the driver can fold it into the unified -//! `[burn] ingest: …` line. -//! 5. The driver itself owns step zero — collecting `cwd`, passthrough +//! return an [`IngestReport`] for the launcher to report. +//! 5. The launcher itself owns step zero — collecting `cwd`, passthrough //! args, and any user-provided enrichment tags into a [`PlanCtx`] — //! and step six — joining the watcher and reporting summary stats. //! -//! ## Where this fits -//! -//! This PR (#248 part b) is the substrate. The Wave 2 PRs (#248-d/e/f) -//! plug the three concrete adapters into [`registry`] and the -//! `burn run` driver in `commands::run` consumes them. The CLI scaffold -//! (#248 part a, sibling worktree) lands the clap entrypoint independently. -//! //! ## Trait shape vs the TS sibling //! //! `HarnessAdapter` is a `Send + Sync` trait object so the registry can //! hand out `&'static dyn HarnessAdapter` references. `async fn` in trait //! is mediated by `async_trait::async_trait` to keep adapter impls -//! ergonomic; the desugared `Pin>` matches the -//! shape expected by the `burn run` driver, which `tokio::spawn`s the -//! result of `plan` / `after_exit` and joins them at the top level. +//! ergonomic; the desugared `Pin>` is easy for +//! launcher code to spawn and join at the top level. use std::path::PathBuf; @@ -64,16 +55,14 @@ pub use registry::{list_harness_names, lookup}; /// stamp record — the pending-stamp serializer canonicalizes ordering. #[derive(Debug, Clone)] pub struct PlanCtx { - /// Working directory the user invoked `burn run` from. Forwarded to - /// the spawned harness so it picks up project-local config. + /// Working directory for the spawned harness so it picks up + /// project-local config. pub cwd: PathBuf, - /// Argv tail after the subcommand boundary, e.g. `burn run claude -- - /// "explain this"` ⇒ `["explain this"]`. Adapters splice this into - /// their generated argv via [`SpawnPlan::args`]. + /// Argv tail the launcher wants to pass through. Adapters splice + /// this into their generated argv via [`SpawnPlan::args`]. pub passthrough: Vec, /// User-supplied enrichment that will be merged onto the resulting - /// stamp. Keys are free-form (`task`, `pr`, …); the Wave 2 driver - /// translates `--tag k=v` flags into entries here. + /// stamp. Keys are free-form (`task`, `pr`, …). pub tags: Enrichment, /// Optional ledger home selected by `--ledger-path`. Pending-stamp /// adapters use this for both manifest writes and ingest passes so @@ -86,9 +75,9 @@ pub struct PlanCtx { pub spawn_start_ts: std::time::SystemTime, } -/// Spawn plan returned by [`HarnessAdapter::plan`]. The `burn run` -/// driver owns the actual `tokio::process::Command` construction; this -/// struct is the per-adapter contribution to it. +/// Spawn plan returned by [`HarnessAdapter::plan`]. Launcher code owns +/// the actual process construction; this struct is the per-adapter +/// contribution to it. /// /// `session_id` is filled in by adapters that know the session id up /// front (claude can mint one and inject it via `--session-id` so the @@ -200,7 +189,7 @@ impl WatcherController { } /// Stop the periodic loop and await any in-flight tick. Idempotent. - /// `burn run` calls this once the spawned child exits. + /// Call this once the spawned child exits. pub async fn stop(&self) { self.inner.stop().await; } @@ -216,8 +205,7 @@ impl WatcherController { mod tests { use super::*; - /// Smoke test: `SpawnPlan::new` produces an inherit-env plan the - /// driver can hand straight to `tokio::process::Command`. Catches + /// Smoke test: `SpawnPlan::new` produces an inherit-env plan. Catches /// accidental shape changes on the struct. #[test] fn spawn_plan_new_minimal_shape() { diff --git a/crates/relayburn-cli/src/harnesses/registry.rs b/crates/relayburn-cli/src/harnesses/registry.rs index a4a7bd39..36e33dc2 100644 --- a/crates/relayburn-cli/src/harnesses/registry.rs +++ b/crates/relayburn-cli/src/harnesses/registry.rs @@ -115,8 +115,8 @@ static RUNTIME_ADAPTER_NAMES: &[&str] = &[ ]; /// Look up an adapter by name. Returns `None` for unknown names; the -/// `burn run` driver maps `None` to a "did you mean …?" diagnostic -/// using [`list_harness_names`]. +/// Callers can map `None` to a "did you mean …?" diagnostic using +/// [`list_harness_names`]. /// /// Eager adapters (single perfect-hash probe) are checked first; the /// runtime map is consulted only on a miss so common-case lookups diff --git a/crates/relayburn-cli/src/lib.rs b/crates/relayburn-cli/src/lib.rs index b06c3046..9025c816 100644 --- a/crates/relayburn-cli/src/lib.rs +++ b/crates/relayburn-cli/src/lib.rs @@ -5,11 +5,10 @@ //! test -p relayburn-cli` and so future integration tests under `tests/` //! can reach the harness substrate without re-declaring the module tree. //! -//! Today the only public surface here is [`harnesses`] — the `HarnessAdapter` -//! trait, the lazy registry, and the shared pending-stamp adapter factory -//! introduced in #248-b. Wave 2 PRs (claude / codex / opencode) will plug -//! their adapters in via [`harnesses::registry`]; the CLI binary will reach -//! them through `lookup` / `list_harness_names`. +//! Today the only public surface here is [`harnesses`] — legacy adapter +//! reference code plus the shared pending-stamp adapter factory introduced +//! in #248-b. Runtime launcher integrations should prefer the public +//! `relayburn-sdk` / `@relayburn/sdk` pending-stamp APIs. //! //! Keeping this surface as a library crate alongside the binary lets the //! Wave 2 fan-out PRs add per-adapter modules and unit tests without diff --git a/crates/relayburn-cli/src/main.rs b/crates/relayburn-cli/src/main.rs index 4b38b334..5ee05948 100644 --- a/crates/relayburn-cli/src/main.rs +++ b/crates/relayburn-cli/src/main.rs @@ -35,7 +35,6 @@ fn dispatch(args: Args) -> i32 { Command::Hotspots(sub) => commands::hotspots::run(&globals, sub), Command::Overhead(args) => commands::overhead::run(&globals, args), Command::Compare(args) => commands::compare::run(&globals, args), - Command::Run(args) => commands::run::run(&globals, args), Command::State(args) => commands::state::run(&globals, args), Command::Ingest(args) => commands::ingest::run(&globals, args), Command::McpServer(args) => commands::mcp_server::run(&globals, args), diff --git a/crates/relayburn-cli/src/util/time.rs b/crates/relayburn-cli/src/util/time.rs index 224bb7c2..c5b10d50 100644 --- a/crates/relayburn-cli/src/util/time.rs +++ b/crates/relayburn-cli/src/util/time.rs @@ -1,5 +1,5 @@ -//! Tiny time helpers shared by the harness adapter ([`super::super::harnesses::claude`]) -//! and the `burn run` driver ([`super::super::commands::run`]). +//! Tiny time helpers shared by the harness adapter +//! ([`super::super::harnesses::claude`]). //! //! Both call sites need the same two operations: //! diff --git a/crates/relayburn-cli/tests/smoke.rs b/crates/relayburn-cli/tests/smoke.rs index c3bcaa61..63787808 100644 --- a/crates/relayburn-cli/tests/smoke.rs +++ b/crates/relayburn-cli/tests/smoke.rs @@ -4,7 +4,7 @@ //! through `assert_cmd` to prove that: //! //! 1. `burn --help` exits 0 and emits non-empty stdout listing all -//! eight subcommands (the contract Wave 2 fan-out PRs depend on). +//! registered subcommands. //! 2. `burn --help` exits 0 for every subcommand we have a //! stub for. clap auto-generates the help block from the `Command` //! enum's doc comments, so a regression in the derive layer would @@ -28,7 +28,6 @@ const SUBCOMMANDS: &[&str] = &[ "hotspots", "overhead", "compare", - "run", "state", "ingest", "mcp-server", @@ -36,8 +35,8 @@ const SUBCOMMANDS: &[&str] = &[ /// Subcommands that still print "not yet implemented" when invoked /// without args. Wave 2 D1 wired up `summary` and `hotspots`, D2 wired -/// up `overhead`, D3 wired up `compare`, D4 wired up `state`, D5 wired -/// up `run`, and D8 wired up `ingest` + `mcp-server` as real +/// up `overhead`, D3 wired up `compare`, D4 wired up `state`, and D8 wired +/// up `ingest` + `mcp-server` as real /// presenters — every subcommand is now wired, so this list is empty /// and `each_stub_exits_one_with_not_yet_implemented_message` becomes /// a no-op iteration. The constant is retained so a future scaffold @@ -61,6 +60,12 @@ fn top_level_help_lists_every_subcommand() { "expected `--help` to mention subcommand `{sub}`; got:\n{stdout}", ); } + assert!( + !stdout + .lines() + .any(|line| line.trim_start().starts_with("run ")), + "`burn --help` must not advertise removed `run` command; got:\n{stdout}", + ); } #[test] @@ -156,44 +161,6 @@ fn json_mode_emits_error_envelope_on_argument_failure() { ); } -#[test] -fn run_command_lists_known_harnesses_when_invoked_without_args() { - // `burn run` (Wave 2 D5) prints help + exits 2 when no harness - // positional is supplied — the same shape as the TS sibling. - let output = burn().arg("run").assert().code(2).get_output().clone(); - let stdout = String::from_utf8(output.stdout).expect("stdout should be valid UTF-8"); - assert!( - stdout.contains("Known harnesses:"), - "expected `burn run` to list known harnesses; got:\n{stdout}", - ); - assert!( - stdout.contains("claude"), - "expected `burn run` help to mention claude; got:\n{stdout}", - ); -} - -#[test] -fn run_command_rejects_unknown_harness() { - // Unknown harness must exit non-zero with a typed error mentioning - // both the bogus name and the known set. Driver maps this through - // `report_error`, which lands at exit code 2 in human mode. - let output = burn() - .args(["run", "definitely-not-a-real-harness"]) - .assert() - .code(2) - .get_output() - .clone(); - let stderr = String::from_utf8(output.stderr).expect("stderr should be valid UTF-8"); - assert!( - stderr.contains("definitely-not-a-real-harness"), - "expected stderr to echo the unknown harness name; got:\n{stderr}", - ); - assert!( - stderr.contains("claude"), - "expected stderr to list claude as a known harness; got:\n{stderr}", - ); -} - #[test] fn version_flag_exits_zero() { burn() @@ -210,3 +177,8 @@ fn unknown_subcommand_exits_non_zero() { .assert() .failure(); } + +#[test] +fn run_subcommand_is_not_registered() { + burn().args(["run", "--help"]).assert().failure(); +} diff --git a/crates/relayburn-sdk-node/src/lib.rs b/crates/relayburn-sdk-node/src/lib.rs index 429034fe..9059ae50 100644 --- a/crates/relayburn-sdk-node/src/lib.rs +++ b/crates/relayburn-sdk-node/src/lib.rs @@ -85,8 +85,10 @@ #![allow(clippy::needless_pass_by_value)] +use std::collections::{BTreeMap, HashMap}; use std::path::PathBuf; use std::ptr; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use napi::bindgen_prelude::{BigInt, Error as NapiError, Result as NapiResult, ToNapiValue}; use napi::sys; @@ -157,6 +159,10 @@ fn invalid_arg(msg: impl Into) -> NapiError<&'static str> { NapiError::new(INVALID_ARGUMENT_ERROR_CODE, msg.into()) } +fn io_err(e: std::io::Error) -> NapiError<&'static str> { + NapiError::new(IO_ERROR_CODE, e.to_string()) +} + // --------------------------------------------------------------------------- // Helpers — small repeating conversions // --------------------------------------------------------------------------- @@ -368,6 +374,227 @@ fn open_options(home: Option, content_home: Option) -> sdk::Ledg } } +// --------------------------------------------------------------------------- +// writePendingStamp +// --------------------------------------------------------------------------- + +#[napi(string_enum)] +pub enum PendingStampHarness { + #[napi(value = "claude")] + Claude, + #[napi(value = "codex")] + Codex, + #[napi(value = "opencode")] + Opencode, +} + +impl From for sdk::PendingStampHarness { + fn from(h: PendingStampHarness) -> Self { + match h { + PendingStampHarness::Claude => sdk::PendingStampHarness::Claude, + PendingStampHarness::Codex => sdk::PendingStampHarness::Codex, + PendingStampHarness::Opencode => sdk::PendingStampHarness::Opencode, + } + } +} + +fn harness_to_string(h: sdk::PendingStampHarness) -> String { + match h { + sdk::PendingStampHarness::Claude => "claude", + sdk::PendingStampHarness::Codex => "codex", + sdk::PendingStampHarness::Opencode => "opencode", + } + .to_string() +} + +#[napi(object)] +pub struct WritePendingStampOptions { + pub harness: PendingStampHarness, + pub cwd: String, + pub enrichment: HashMap, + pub session_dir_hint: Option, + pub spawn_start_ts: Option, + pub spawner_pid: Option, + pub ledger_home: Option, +} + +#[napi(object)] +pub struct PendingStamp { + pub v: u32, + pub harness: String, + pub spawner_pid: u32, + pub spawn_start_ts: String, + pub cwd: String, + pub enrichment: HashMap, + pub session_dir_hint: Option, +} + +#[napi(object)] +pub struct PendingStampWriteResult { + pub file: String, + pub stamp: PendingStamp, +} + +impl From for PendingStamp { + fn from(stamp: sdk::PendingStamp) -> Self { + PendingStamp { + v: stamp.v as u32, + harness: harness_to_string(stamp.harness), + spawner_pid: stamp.spawner_pid, + spawn_start_ts: stamp.spawn_start_ts, + cwd: stamp.cwd, + enrichment: stamp.enrichment.into_iter().collect(), + session_dir_hint: stamp.session_dir_hint, + } + } +} + +impl From for PendingStampWriteResult { + fn from(result: sdk::PendingStampWriteResult) -> Self { + PendingStampWriteResult { + file: result.file.to_string_lossy().into_owned(), + stamp: PendingStamp::from(result.stamp), + } + } +} + +#[napi] +pub fn write_pending_stamp( + opts: WritePendingStampOptions, +) -> Result { + if opts.cwd.is_empty() { + return Err(invalid_arg("cwd must be non-empty")); + } + if opts.enrichment.is_empty() { + return Err(invalid_arg("enrichment must contain at least one tag")); + } + for key in opts.enrichment.keys() { + if key.is_empty() { + return Err(invalid_arg("enrichment keys must be non-empty")); + } + } + let spawn_start_ts = opts + .spawn_start_ts + .as_deref() + .map(parse_iso_system_time) + .transpose()?; + let raw = sdk::PendingStampWriteOptions { + harness: opts.harness.into(), + ledger_home: maybe_path(opts.ledger_home), + cwd: opts.cwd, + enrichment: opts.enrichment.into_iter().collect::>(), + session_dir_hint: opts.session_dir_hint, + spawn_start_ts, + spawner_pid: opts.spawner_pid, + }; + sdk::write_pending_stamp(raw) + .map(PendingStampWriteResult::from) + .map_err(io_err) +} + +fn parse_iso_system_time(s: &str) -> std::result::Result { + let Some(raw) = s.strip_suffix('Z') else { + return Err(invalid_arg("spawnStartTs must be an ISO-8601 Z timestamp")); + }; + let Some((date, time)) = raw.split_once('T') else { + return Err(invalid_arg("spawnStartTs must contain a T separator")); + }; + let mut date_parts = date.split('-'); + let year: i64 = parse_i64_part(date_parts.next(), "year")?; + let month: u32 = parse_u32_part(date_parts.next(), "month")?; + let day: u32 = parse_u32_part(date_parts.next(), "day")?; + if date_parts.next().is_some() { + return Err(invalid_arg("spawnStartTs date has too many fields")); + } + + let mut time_parts = time.split(':'); + let hour: u32 = parse_u32_part(time_parts.next(), "hour")?; + let minute: u32 = parse_u32_part(time_parts.next(), "minute")?; + let second_raw = time_parts + .next() + .ok_or_else(|| invalid_arg("spawnStartTs missing seconds"))?; + if time_parts.next().is_some() { + return Err(invalid_arg("spawnStartTs time has too many fields")); + } + let (second_part, frac_part) = second_raw + .split_once('.') + .map(|(sec, frac)| (sec, Some(frac))) + .unwrap_or((second_raw, None)); + let second: u32 = second_part + .parse() + .map_err(|_| invalid_arg("spawnStartTs second is invalid"))?; + let nanos = parse_fractional_nanos(frac_part)?; + + let max_day = days_in_month(year, month); + if max_day == 0 + || day == 0 + || day > max_day + || hour > 23 + || minute > 59 + || second > 60 + { + return Err(invalid_arg("spawnStartTs is outside the supported range")); + } + let days = days_from_civil(year, month, day); + if days < 0 { + return Err(invalid_arg("spawnStartTs must be at or after 1970-01-01")); + } + let secs = days as u64 * 86_400 + hour as u64 * 3_600 + minute as u64 * 60 + second as u64; + Ok(UNIX_EPOCH + Duration::from_secs(secs) + Duration::from_nanos(nanos as u64)) +} + +fn parse_i64_part(part: Option<&str>, name: &str) -> std::result::Result { + part.ok_or_else(|| invalid_arg(format!("spawnStartTs missing {name}")))? + .parse() + .map_err(|_| invalid_arg(format!("spawnStartTs {name} is invalid"))) +} + +fn parse_u32_part(part: Option<&str>, name: &str) -> std::result::Result { + part.ok_or_else(|| invalid_arg(format!("spawnStartTs missing {name}")))? + .parse() + .map_err(|_| invalid_arg(format!("spawnStartTs {name} is invalid"))) +} + +fn parse_fractional_nanos(part: Option<&str>) -> std::result::Result { + let Some(part) = part else { + return Ok(0); + }; + if part.is_empty() || !part.bytes().all(|b| b.is_ascii_digit()) { + return Err(invalid_arg("spawnStartTs fractional seconds are invalid")); + } + let mut nanos = 0u32; + let mut scale = 100_000_000u32; + for b in part.bytes().take(9) { + nanos += ((b - b'0') as u32) * scale; + scale /= 10; + } + Ok(nanos) +} + +fn days_from_civil(year: i64, month: u32, day: u32) -> i64 { + let y = year - i64::from(month <= 2); + let era = if y >= 0 { y } else { y - 399 } / 400; + let yoe = y - era * 400; + let mp = month as i64 + if month > 2 { -3 } else { 9 }; + let doy = (153 * mp + 2) / 5 + day as i64 - 1; + let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; + era * 146_097 + doe - 719_468 +} + +fn days_in_month(year: i64, month: u32) -> u32 { + match month { + 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31, + 4 | 6 | 9 | 11 => 30, + 2 if is_leap_year(year) => 29, + 2 => 28, + _ => 0, + } +} + +fn is_leap_year(year: i64) -> bool { + (year % 4 == 0 && year % 100 != 0) || year % 400 == 0 +} + // --------------------------------------------------------------------------- // summary // --------------------------------------------------------------------------- @@ -379,6 +606,8 @@ pub struct SummaryOptions { /// ISO timestamp (e.g. `2026-04-01T00:00:00Z`) or relative range /// (`24h`, `7d`, `4w`, `2m`). pub since: Option, + pub tags: Option>, + pub group_by_tag: Option, pub ledger_home: Option, } @@ -397,6 +626,15 @@ pub struct SummaryModelRow { pub cost: f64, } +#[napi(object)] +pub struct SummaryTagRow { + pub tag: String, + pub value: Option, + pub tokens: BigInt, + pub cost: f64, + pub turn_count: BigInt, +} + #[napi(object)] pub struct ReplacementSavingsToolRow { pub tool: String, @@ -440,6 +678,7 @@ pub struct Summary { pub turn_count: BigInt, pub by_tool: Vec, pub by_model: Vec, + pub by_tag: Option>, pub replacement_savings: Option, } @@ -468,6 +707,17 @@ impl From for Summary { cost: r.cost, }) .collect(), + by_tag: s.by_tag.map(|rows| { + rows.into_iter() + .map(|r| SummaryTagRow { + tag: r.tag, + value: r.value, + tokens: u64_to_bigint(r.tokens), + cost: r.cost, + turn_count: u64_to_bigint(r.turn_count), + }) + .collect() + }), replacement_savings: s.replacement_savings.map(ReplacementSavingsSummary::from), } } @@ -479,12 +729,18 @@ pub fn summary(opts: Option) -> Result { session: None, project: None, since: None, + tags: None, + group_by_tag: None, ledger_home: None, }); let raw = sdk::SummaryOptions { session: opts.session, project: opts.project, since: opts.since, + tags: opts + .tags + .map(|tags| tags.into_iter().collect::>()), + group_by_tag: opts.group_by_tag, ledger_home: maybe_path(opts.ledger_home), }; sdk::summary(raw).map(Summary::from).map_err(sdk_err) @@ -947,6 +1203,7 @@ pub struct IngestReport { pub scanned_sessions: BigInt, pub ingested_sessions: BigInt, pub appended_turns: BigInt, + pub applied_pending_stamps: BigInt, } impl From for IngestReport { @@ -955,6 +1212,7 @@ impl From for IngestReport { scanned_sessions: u64_to_bigint(r.scanned_sessions as u64), ingested_sessions: u64_to_bigint(r.ingested_sessions as u64), appended_turns: u64_to_bigint(r.appended_turns as u64), + applied_pending_stamps: u64_to_bigint(r.applied_pending_stamps as u64), } } } @@ -1073,6 +1331,27 @@ mod tests { assert!(bigint_to_u64(two_words).is_err()); } + #[test] + fn parse_iso_system_time_accepts_pending_stamp_timestamp_shape() { + let parsed = parse_iso_system_time("2026-04-23T00:00:00.123Z").unwrap(); + let elapsed = parsed.duration_since(UNIX_EPOCH).unwrap(); + assert_eq!(elapsed.subsec_millis(), 123); + } + + #[test] + fn parse_iso_system_time_rejects_non_zulu_values() { + let err = parse_iso_system_time("2026-04-23T00:00:00").unwrap_err(); + assert!(err.reason.contains("ISO-8601 Z timestamp")); + } + + #[test] + fn parse_iso_system_time_rejects_impossible_dates() { + let err = parse_iso_system_time("2026-02-31T00:00:00Z").unwrap_err(); + assert!(err.reason.contains("outside the supported range")); + assert!(parse_iso_system_time("2024-02-29T00:00:00Z").is_ok()); + assert!(parse_iso_system_time("2025-02-29T00:00:00Z").is_err()); + } + #[test] fn maybe_path_threads_string_to_pathbuf() { assert!(maybe_path(None).is_none()); diff --git a/crates/relayburn-sdk/src/ingest/gap.rs b/crates/relayburn-sdk/src/ingest/gap.rs index 7d50dcc5..f0291528 100644 --- a/crates/relayburn-sdk/src/ingest/gap.rs +++ b/crates/relayburn-sdk/src/ingest/gap.rs @@ -32,10 +32,9 @@ //! fires `ingest_all` repeatedly and relies on this). //! //! Tying the state to a `Ledger` handle would force callers to thread -//! the same handle through every ingest call to get suppression — the -//! `burn run` path uses two distinct handles (per-session pre-spawn, -//! sweep post-spawn) and would lose suppression. Process-global keeps -//! the behaviour byte-equivalent to TS without burdening callers. +//! the same handle through every ingest call to get suppression. Launchers +//! can use distinct handles for per-session ingest and later sweeps; a +//! process-global tracker keeps suppression stable without burdening callers. use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex, MutexGuard, OnceLock}; diff --git a/crates/relayburn-sdk/src/ingest/ingest.rs b/crates/relayburn-sdk/src/ingest/ingest.rs index ab022350..c1c4ddc7 100644 --- a/crates/relayburn-sdk/src/ingest/ingest.rs +++ b/crates/relayburn-sdk/src/ingest/ingest.rs @@ -54,6 +54,8 @@ pub struct IngestReport { pub scanned_sessions: usize, pub ingested_sessions: usize, pub appended_turns: usize, + #[serde(default)] + pub applied_pending_stamps: usize, } impl IngestReport { @@ -65,6 +67,7 @@ impl IngestReport { self.scanned_sessions += other.scanned_sessions; self.ingested_sessions += other.ingested_sessions; self.appended_turns += other.appended_turns; + self.applied_pending_stamps += other.applied_pending_stamps; } } @@ -179,7 +182,13 @@ pub async fn ingest_all(ledger: &mut Ledger, opts: &IngestOptions) -> anyhow::Re // returning Err does not swallow a gap the earlier adapter already // recorded against work that was already appended. progress(opts, "scanning Claude Code sessions"); - let r = ingest_claude_into(ledger, &mut after, &opts.roots, content_mode)?; + let r = ingest_claude_into( + ledger, + &mut after, + &opts.roots, + content_mode, + opts.ledger_home.as_deref(), + )?; report.merge(&r); emit_gap_warning(AdapterName::Claude, content_mode, on_warn); @@ -219,7 +228,13 @@ pub async fn ingest_claude_projects( let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?; let mut after = before.clone(); let content_mode = resolve_content_mode(opts.ledger_home.as_deref()); - let report = ingest_claude_into(ledger, &mut after, &opts.roots, content_mode)?; + let report = ingest_claude_into( + ledger, + &mut after, + &opts.roots, + content_mode, + opts.ledger_home.as_deref(), + )?; emit_gap_warning( AdapterName::Claude, content_mode, @@ -279,10 +294,9 @@ pub async fn ingest_opencode_sessions( Ok(report) } -/// Per-session fast-path used by the claude harness adapter after a -/// `burn run` exits. Caller already knows the sessionId from the spawn -/// plan, so we go straight to the one JSONL file and persist a cursor at -/// EOF — a later `ingest_all` sweep then skips it. +/// Per-session fast-path used when a Claude launcher already knows the +/// sessionId from the spawn plan. We go straight to the one JSONL file and +/// persist a cursor at EOF — a later `ingest_all` sweep then skips it. pub async fn ingest_claude_session( ledger: &mut Ledger, cwd: &str, @@ -318,6 +332,7 @@ pub async fn ingest_claude_session( scanned_sessions: 1, ingested_sessions: 0, appended_turns: 0, + applied_pending_stamps: 0, }); } @@ -361,6 +376,7 @@ pub async fn ingest_claude_session( scanned_sessions: 1, ingested_sessions: 1, appended_turns, + applied_pending_stamps: 0, }) } @@ -376,6 +392,7 @@ fn ingest_claude_into( cursors: &mut Cursors, roots: &IngestRoots, content_mode: ContentStoreMode, + ledger_home: Option<&Path>, ) -> anyhow::Result { let mut report = IngestReport::empty(); let projects_root = claude_projects_dir(roots); @@ -445,6 +462,17 @@ fn ingest_claude_into( }; if !parsed.turns.is_empty() { + let session_id = parsed.turns[0].session_id.clone(); + let cwd = parsed.turns.first().and_then(|t| t.project.clone()); + let candidate = PendingStampSessionCandidate { + harness: PendingStampHarness::Claude, + session_id, + session_path: Some(file.clone()), + session_mtime_ms: Some(mtime), + cwd, + }; + resolve_pending_stamps_for_report(ledger, &candidate, ledger_home, &mut report); + report.appended_turns += parsed.turns.len(); report.ingested_sessions += 1; ledger.append_turns(&parsed.turns)?; @@ -618,7 +646,7 @@ fn ingest_codex_into( session_mtime_ms: Some(mtime), cwd, }; - let _ = resolve_pending_stamps_for_session_in(ledger, &candidate, ledger_home); + resolve_pending_stamps_for_report(ledger, &candidate, ledger_home, &mut report); } report.appended_turns += parsed.turns.len(); report.ingested_sessions += 1; @@ -737,7 +765,7 @@ fn ingest_opencode_into( session_mtime_ms: Some(session_mtime_ms), cwd, }; - let _ = resolve_pending_stamps_for_session_in(ledger, &candidate, ledger_home); + resolve_pending_stamps_for_report(ledger, &candidate, ledger_home, &mut report); report.appended_turns += parsed.turns.len(); report.ingested_sessions += 1; ledger.append_turns(&parsed.turns)?; @@ -946,6 +974,25 @@ fn last_completed_turn_to_value(t: &CodexLastCompletedTurn) -> Value { Value::Object(m) } +fn resolve_pending_stamps_for_report( + ledger: &mut Ledger, + candidate: &PendingStampSessionCandidate, + ledger_home: Option<&Path>, + report: &mut IngestReport, +) { + match resolve_pending_stamps_for_session_in(ledger, candidate, ledger_home) { + Ok(resolved) => { + report.applied_pending_stamps += resolved.applied; + } + Err(err) => { + let home = ledger_home + .map(|p| p.display().to_string()) + .unwrap_or_else(|| "".to_string()); + eprintln!("[burn] pending stamp resolution failed for {candidate:?} in {home}: {err}"); + } + } +} + // --- filesystem helpers -------------------------------------------------- fn list_dirs(parent: &Path) -> Vec { @@ -1021,16 +1068,19 @@ mod tests { scanned_sessions: 1, ingested_sessions: 2, appended_turns: 3, + applied_pending_stamps: 4, }; let b = IngestReport { scanned_sessions: 10, ingested_sessions: 20, appended_turns: 30, + applied_pending_stamps: 40, }; a.merge(&b); assert_eq!(a.scanned_sessions, 11); assert_eq!(a.ingested_sessions, 22); assert_eq!(a.appended_turns, 33); + assert_eq!(a.applied_pending_stamps, 44); } #[test] diff --git a/crates/relayburn-sdk/src/ingest/orchestration_tests.rs b/crates/relayburn-sdk/src/ingest/orchestration_tests.rs index e41133f5..4b551a6b 100644 --- a/crates/relayburn-sdk/src/ingest/orchestration_tests.rs +++ b/crates/relayburn-sdk/src/ingest/orchestration_tests.rs @@ -6,7 +6,8 @@ //! //! `ingest_all_walks_each_harness_root_once` exercises the unified verb //! across all three harnesses simultaneously. `ingest_claude_session_*` -//! covers the per-session fast-path used by the `burn run claude` adapter. +//! covers the per-session fast-path used when the caller already knows the +//! Claude session id. //! //! ## Concurrency note //! @@ -32,10 +33,11 @@ use std::path::{Path, PathBuf}; use crate::ingest::{ ingest_all, ingest_claude_projects, ingest_claude_session, ingest_codex_sessions, - ingest_opencode_sessions, IngestOptions, IngestRoots, + ingest_opencode_sessions, write_pending_stamp, IngestOptions, IngestRoots, PendingStampHarness, + WriteOptions, }; use crate::ingest::{load_cursors, ClaudeCursor, FileCursor}; -use crate::ledger::{Ledger, LedgerLayout, Query}; +use crate::ledger::{Enrichment, Ledger, LedgerLayout, Query}; use tempfile::TempDir; // Shared with gap_warning_tests / watch_loop_tests so that @@ -72,6 +74,10 @@ fn isolated_relayburn_home<'a>(tmp: &TempDir) -> std::sync::MutexGuard<'a, ()> { /// `sessionId` baked into every event so the parser doesn't depend on the /// filename to derive it. fn claude_minimal_session(session_id: &str) -> String { + claude_minimal_session_with_cwd(session_id, "/tmp/project") +} + +fn claude_minimal_session_with_cwd(session_id: &str, cwd: &str) -> String { let user = serde_json::json!({ "parentUuid": null, "isSidechain": false, @@ -79,7 +85,7 @@ fn claude_minimal_session(session_id: &str) -> String { "message": {"role": "user", "content": "hi"}, "uuid": "u-user-1", "timestamp": "2026-04-22T00:00:00.000Z", - "cwd": "/tmp/project", + "cwd": cwd, "sessionId": session_id, }); let assistant = serde_json::json!({ @@ -106,7 +112,7 @@ fn claude_minimal_session(session_id: &str) -> String { "type": "assistant", "uuid": "u-asst-1", "timestamp": "2026-04-22T00:00:01.000Z", - "cwd": "/tmp/project", + "cwd": cwd, "sessionId": session_id, }); format!("{}\n{}\n", user, assistant) @@ -161,6 +167,54 @@ async fn ingest_claude_projects_round_trips_a_fixture_session() { } } +#[tokio::test] +async fn ingest_claude_projects_resolves_pending_stamp_tags() { + let tmp = TempDir::new().unwrap(); + let _env = isolated_relayburn_home(&tmp); + let roots = pinned_roots(&tmp); + + let mut enrichment = Enrichment::new(); + enrichment.insert("persona".to_string(), "code-reviewer".to_string()); + let cwd = tmp.path().join("project"); + fs::create_dir_all(&cwd).unwrap(); + let cwd = cwd.to_string_lossy().into_owned(); + write_pending_stamp(WriteOptions { + harness: PendingStampHarness::Claude, + cwd: cwd.clone(), + enrichment: enrichment.clone(), + ..Default::default() + }) + .unwrap(); + + let project_dir = roots + .claude_projects_dir + .as_ref() + .unwrap() + .join("-tmp-project"); + fs::create_dir_all(&project_dir).unwrap(); + let sid = "33333333-3333-3333-3333-333333333333"; + let session_file = project_dir.join(format!("{sid}.jsonl")); + fs::write(&session_file, claude_minimal_session_with_cwd(sid, &cwd)).unwrap(); + + let mut ledger = open_ledger_in(&tmp); + let opts = IngestOptions { + roots, + ..Default::default() + }; + let report = ingest_claude_projects(&mut ledger, &opts).await.unwrap(); + + assert!(report.appended_turns >= 1, "expected >=1 turn ingested"); + assert_eq!(report.applied_pending_stamps, 1); + let turns = ledger + .query_turns(&Query { + enrichment: Some(enrichment), + ..Default::default() + }) + .unwrap(); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].turn.session_id, sid); +} + #[tokio::test] async fn ingest_codex_sessions_round_trips_a_fixture_session() { let tmp = TempDir::new().unwrap(); diff --git a/crates/relayburn-sdk/src/ingest/pending_stamps.rs b/crates/relayburn-sdk/src/ingest/pending_stamps.rs index d941ca2e..9589d0b2 100644 --- a/crates/relayburn-sdk/src/ingest/pending_stamps.rs +++ b/crates/relayburn-sdk/src/ingest/pending_stamps.rs @@ -1,7 +1,7 @@ //! Pending-stamp coordination — Rust port of `packages/ingest/src/pending-stamps.ts`. //! -//! Wrapper harnesses (`burn run codex`, `burn run opencode`) that spawn a -//! child process before the session id is known drop a JSON manifest into +//! Launchers that spawn a child process before the session id is known drop a +//! JSON manifest into //! `$RELAYBURN_HOME/pending-stamps/` (or an explicitly supplied ledger home). //! After the child exits, the next ingest pass tries to match each manifest //! against a freshly-discovered session and folds the manifest's enrichment @@ -9,9 +9,8 @@ //! //! ## Wire-format compatibility //! -//! The on-disk JSON shape is binary-compatible with the TS adapter so a -//! Rust-resident watch loop and a TS-resident `burn run` wrapper can coexist -//! during the migration. Specifically: +//! The on-disk JSON shape is stable so external launchers and Rust ingest can +//! coordinate through the same pending-stamp directory. Specifically: //! //! * Object keys are emitted in insertion order: `v`, `harness`, `spawnerPid`, //! `spawnStartTs`, `cwd`, `enrichment`, then optional `sessionDirHint`. @@ -29,7 +28,7 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use crate::ledger::{Enrichment, Ledger, Stamp, StampSelector, ledger_home}; +use crate::ledger::{ledger_home, Enrichment, Ledger, Stamp, StampSelector}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -48,6 +47,7 @@ const MTIME_SLOP_MS: i64 = 1; pub enum PendingStampHarness { #[default] Codex, + Claude, Opencode, } @@ -55,6 +55,7 @@ impl PendingStampHarness { fn as_str(self) -> &'static str { match self { PendingStampHarness::Codex => "codex", + PendingStampHarness::Claude => "claude", PendingStampHarness::Opencode => "opencode", } } @@ -410,6 +411,7 @@ pub fn parse_pending_stamp(raw: &str) -> Option { return None; } let harness = match obj.get("harness").and_then(Value::as_str)? { + "claude" => PendingStampHarness::Claude, "codex" => PendingStampHarness::Codex, "opencode" => PendingStampHarness::Opencode, _ => return None, diff --git a/crates/relayburn-sdk/src/ingest/pending_stamps_compat_tests.rs b/crates/relayburn-sdk/src/ingest/pending_stamps_compat_tests.rs index d68798fd..467bc9e7 100644 --- a/crates/relayburn-sdk/src/ingest/pending_stamps_compat_tests.rs +++ b/crates/relayburn-sdk/src/ingest/pending_stamps_compat_tests.rs @@ -75,6 +75,27 @@ fn rust_written_stamp_round_trips_through_parser() { ); } +#[test] +fn claude_pending_stamp_round_trips_through_parser() { + use crate::ingest::pending_stamps::{parse_pending_stamp, serialize_stamp}; + use crate::ingest::{PendingStamp, PendingStampHarness}; + + let mut enrichment = std::collections::BTreeMap::new(); + enrichment.insert("persona".into(), "code-reviewer".into()); + let original = PendingStamp { + v: 1, + harness: PendingStampHarness::Claude, + spawner_pid: 42, + spawn_start_ts: "2026-04-23T08:09:10.011Z".into(), + cwd: "/var/tmp/work".into(), + enrichment, + session_dir_hint: None, + }; + let serialized = serialize_stamp(&original); + let reparsed = parse_pending_stamp(&serialized).unwrap(); + assert_eq!(reparsed, original); +} + #[test] fn rejects_wrong_version() { let raw = r#"{"v":2,"harness":"codex","spawnerPid":1,"spawnStartTs":"2025-05-01T00:00:00.000Z","cwd":"/x","enrichment":{}}"#; diff --git a/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs b/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs index a55c4156..05df0a03 100644 --- a/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs +++ b/crates/relayburn-sdk/src/ingest/watch_loop_tests.rs @@ -30,6 +30,7 @@ async fn watch_loop_drains_pending_work_within_two_ticks() { scanned_sessions: p, ingested_sessions: p, appended_turns: p, + applied_pending_stamps: 0, }) }) }); diff --git a/crates/relayburn-sdk/src/query_verbs.rs b/crates/relayburn-sdk/src/query_verbs.rs index 26ed2496..8e242e87 100644 --- a/crates/relayburn-sdk/src/query_verbs.rs +++ b/crates/relayburn-sdk/src/query_verbs.rs @@ -32,7 +32,7 @@ use crate::analyze::{ summarize_fidelity_from_iter, summarize_replacement_savings, tool_call_pattern_to_finding, tool_output_bloat_to_finding, user_claude_settings_path, }; -use crate::ledger::Query; +use crate::ledger::{EnrichedTurn, Enrichment, Query}; use crate::reader::{ BashParse, FidelityClass, SourceKind, TurnRecord, UserTurnRecord, parse_bash_command, resolve_project, @@ -209,6 +209,10 @@ pub struct SummaryOptions { pub session: Option, pub project: Option, pub since: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tags: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub group_by_tag: Option, pub ledger_home: Option, } @@ -229,6 +233,17 @@ pub struct SummaryModelRow { pub cost: f64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SummaryTagRow { + pub tag: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub value: Option, + pub tokens: u64, + pub cost: f64, + pub turn_count: u64, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Summary { @@ -238,19 +253,36 @@ pub struct Summary { pub by_tool: Vec, pub by_model: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] + pub by_tag: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] pub replacement_savings: Option, } impl LedgerHandle { pub fn summary(&self, opts: SummaryOptions) -> Result { - let q = build_query( + let mut q = build_query( opts.session.as_deref(), opts.project.as_deref(), opts.since.as_deref(), )?; - let turns = collect_turns(self, &q)?; + if let Some(tags) = opts.tags.clone() { + validate_tags(&tags)?; + if !tags.is_empty() { + q.enrichment = Some(tags); + } + } + let group_by_tag = opts.group_by_tag.clone(); + if let Some(tag) = group_by_tag.as_deref() { + validate_tag_key(tag, "groupByTag")?; + } + let enriched = self.inner.query_turns(&q)?; + let turns: Vec = enriched.iter().map(|e| e.turn.clone()).collect(); let pricing = load_pricing(None); - Ok(compute_summary(&turns, &pricing)) + let mut summary = compute_summary(&turns, &pricing); + if let Some(tag) = group_by_tag { + summary.by_tag = Some(compute_summary_by_tag(&enriched, &tag, &pricing)); + } + Ok(summary) } } @@ -262,6 +294,20 @@ pub fn summary(opts: SummaryOptions) -> Result { }) } +fn validate_tags(tags: &Enrichment) -> Result<()> { + for key in tags.keys() { + validate_tag_key(key, "tag")?; + } + Ok(()) +} + +fn validate_tag_key(key: &str, label: &str) -> Result<()> { + if key.is_empty() { + anyhow::bail!("{label} key must be non-empty"); + } + Ok(()) +} + fn compute_summary(turns: &[TurnRecord], pricing: &PricingTable) -> Summary { // First-seen iteration order matches TS `Map` semantics. let mut by_tool_order: Vec = Vec::new(); @@ -310,7 +356,11 @@ fn compute_summary(turns: &[TurnRecord], pricing: &PricingTable) -> Summary { } let savings = summarize_replacement_savings(turns, None); - let replacement_savings = if savings.calls > 0 { Some(savings) } else { None }; + let replacement_savings = if savings.calls > 0 { + Some(savings) + } else { + None + }; Summary { total_tokens, @@ -324,10 +374,61 @@ fn compute_summary(turns: &[TurnRecord], pricing: &PricingTable) -> Summary { .into_iter() .map(|k| by_model.remove(&k).unwrap()) .collect(), + by_tag: None, replacement_savings, } } +fn compute_summary_by_tag( + enriched: &[EnrichedTurn], + tag: &str, + pricing: &PricingTable, +) -> Vec { + let mut order: Vec> = Vec::new(); + let mut rows: HashMap, SummaryTagRow> = HashMap::new(); + + for e in enriched { + let value = e.enrichment.get(tag).cloned(); + let tokens = total_tokens_for_turn(&e.turn); + let cost = cost_for_turn(&e.turn, pricing) + .map(|c| c.total) + .unwrap_or(0.0); + let row = rows.entry(value.clone()).or_insert_with(|| { + order.push(value.clone()); + SummaryTagRow { + tag: tag.to_string(), + value, + tokens: 0, + cost: 0.0, + turn_count: 0, + } + }); + row.tokens += tokens; + row.cost += cost; + row.turn_count += 1; + } + + let mut out: Vec = order + .into_iter() + .map(|k| rows.remove(&k).unwrap()) + .collect(); + out.sort_by(|a, b| { + b.cost + .partial_cmp(&a.cost) + .unwrap_or(std::cmp::Ordering::Equal) + }); + out +} + +fn total_tokens_for_turn(t: &TurnRecord) -> u64 { + t.usage.input + + t.usage.output + + t.usage.reasoning + + t.usage.cache_read + + t.usage.cache_create_5m + + t.usage.cache_create_1h +} + // --------------------------------------------------------------------------- // session_cost // --------------------------------------------------------------------------- @@ -2162,11 +2263,10 @@ mod tests { assert_eq!(r.min_sample, 5); assert!(r.models.contains(&"claude-sonnet-4-6".to_string())); assert!(r.models.contains(&"claude-haiku-4-5".to_string())); - assert!( - r.cells - .iter() - .any(|c| c.model == "claude-sonnet-4-6" && c.turns == 2) - ); + assert!(r + .cells + .iter() + .any(|c| c.model == "claude-sonnet-4-6" && c.turns == 2)); assert_eq!(r.fidelity.minimum, FidelityClass::Partial); assert_eq!(r.fidelity.excluded.total, 0); @@ -2537,7 +2637,9 @@ mod tests { let turns = vec![make_turn_with_calls(vec![tc])]; let pricing = load_pricing(None); let result = compute_summary(&turns, &pricing); - let savings = result.replacement_savings.expect("should have replacement_savings"); + let savings = result + .replacement_savings + .expect("should have replacement_savings"); assert_eq!(savings.calls, 1); assert_eq!(savings.collapsed_calls, 9); assert!(!savings.by_tool.is_empty()); diff --git a/crates/relayburn-sdk/tests/integration.rs b/crates/relayburn-sdk/tests/integration.rs index 4a04680e..c6720025 100644 --- a/crates/relayburn-sdk/tests/integration.rs +++ b/crates/relayburn-sdk/tests/integration.rs @@ -11,12 +11,12 @@ use std::path::Path; use tempfile::TempDir; use relayburn_sdk::{ - CompareOptions, ContentKind, ContentRecord, ContentRole, Enrichment, ExportLedgerOptions, - ExportStampsOptions, HotspotsOptions, HotspotsResult, IngestOptions, IngestRoots, Ledger, - LedgerOpenOptions, OverheadOptions, OverheadTrimOptions, SearchQueryOptions, - SessionCostOptions, SourceKind, Stamp, StampSelector, SummaryOptions, ToolCall, TurnRecord, - Usage, compare, export_ledger, export_stamps, hotspots, ingest, overhead, overhead_trim, - search, session_cost, summary, + compare, export_ledger, export_stamps, hotspots, ingest, overhead, overhead_trim, search, + session_cost, summary, CompareOptions, ContentKind, ContentRecord, ContentRole, Enrichment, + ExportLedgerOptions, ExportStampsOptions, HotspotsOptions, HotspotsResult, IngestOptions, + IngestRoots, Ledger, LedgerOpenOptions, OverheadOptions, OverheadTrimOptions, + SearchQueryOptions, SessionCostOptions, SourceKind, Stamp, StampSelector, SummaryOptions, + ToolCall, TurnRecord, Usage, }; const SESSION_ID: &str = "ses_integration_001"; @@ -124,6 +124,22 @@ fn all_ten_verbs_round_trip_against_a_fixture_ledger() { }) .expect("free summary"); assert_eq!(s2.turn_count, 1); + let tagged = handle + .summary(SummaryOptions { + tags: Some(Enrichment::from([( + "role".to_string(), + "integration-test".to_string(), + )])), + group_by_tag: Some("role".to_string()), + ..Default::default() + }) + .expect("tagged summary"); + assert_eq!(tagged.turn_count, 1); + let by_tag = tagged.by_tag.expect("byTag rows"); + assert_eq!(by_tag.len(), 1); + assert_eq!(by_tag[0].tag, "role"); + assert_eq!(by_tag[0].value.as_deref(), Some("integration-test")); + assert_eq!(by_tag[0].turn_count, 1); // 2. session_cost — handle + free let sc = handle diff --git a/packages/relayburn/CHANGELOG.md b/packages/relayburn/CHANGELOG.md index ab7f824f..1bdf193f 100644 --- a/packages/relayburn/CHANGELOG.md +++ b/packages/relayburn/CHANGELOG.md @@ -6,6 +6,9 @@ All notable changes to `relayburn`. ### Removed +- Removed the `burn run` launcher wrapper from the CLI surface. Launchers + should write attribution with `writePendingStamp()` and ingest through + `burn ingest` / SDK `ingest()`. - Removed the fallback to the old TypeScript `@relayburn/cli`; `relayburn` now resolves only the Rust prebuilt platform packages. diff --git a/packages/sdk-node/CHANGELOG.md b/packages/sdk-node/CHANGELOG.md index eb0ed150..f7f0edef 100644 --- a/packages/sdk-node/CHANGELOG.md +++ b/packages/sdk-node/CHANGELOG.md @@ -8,6 +8,10 @@ ### Added +- Exported `writePendingStamp()` so Node launchers can write generic + enrichment tags before spawning Claude, Codex, or OpenCode directly. +- `summary()` options now accept `tags` and `groupByTag` for generic + enrichment filtering and cost/token grouping. - Exported `computeCompareExcluded()` from the Node facade for callers that need the same fidelity-exclusion breakdown used by `compare()`. diff --git a/packages/sdk-node/README.md b/packages/sdk-node/README.md index 09cd2823..4f047ffd 100644 --- a/packages/sdk-node/README.md +++ b/packages/sdk-node/README.md @@ -24,3 +24,6 @@ Windows (`win32-x64-msvc`) is not yet shipped — see #247 follow-up. - The SDK exposes read verbs such as `summary()`, `sessionCost()`, `hotspots()`, `compare()`, `search()`, `exportLedger()`, and `exportStamps()`. +- Launchers can call `writePendingStamp({ harness, cwd, enrichment })` + before spawning Claude, Codex, or OpenCode, then run `ingest()` to fold + those generic enrichment tags onto the discovered turns. diff --git a/packages/sdk-node/src/binding.d.ts b/packages/sdk-node/src/binding.d.ts index 7c9b02e3..afb73482 100644 --- a/packages/sdk-node/src/binding.d.ts +++ b/packages/sdk-node/src/binding.d.ts @@ -19,3 +19,4 @@ export declare function overhead(opts?: unknown): Promise; export declare function overheadTrim(opts?: unknown): Promise; export declare function hotspots(opts?: unknown): Promise; export declare function compare(opts: unknown): Promise; +export declare function writePendingStamp(opts: unknown): unknown; diff --git a/packages/sdk-node/src/index.cjs b/packages/sdk-node/src/index.cjs index 261426c0..90144126 100644 --- a/packages/sdk-node/src/index.cjs +++ b/packages/sdk-node/src/index.cjs @@ -92,6 +92,7 @@ module.exports = { overheadTrim: async (opts) => coerceBigInts(await binding.overheadTrim(opts)), hotspots: async (opts) => coerceBigInts(await binding.hotspots(opts)), compare: async (opts) => coerceBigInts(await binding.compare(opts)), + writePendingStamp: async (opts) => coerceBigInts(await binding.writePendingStamp(opts)), computeCompareExcluded, search: async (opts) => coerceBigInts(await binding.search(normalizeSearchOptions(opts))), exportLedger: async (opts) => coerceBigInts(await binding.exportLedger(opts)), diff --git a/packages/sdk-node/src/index.d.ts b/packages/sdk-node/src/index.d.ts index cc92c7ca..ec9e7126 100644 --- a/packages/sdk-node/src/index.d.ts +++ b/packages/sdk-node/src/index.d.ts @@ -26,14 +26,47 @@ export interface IngestReport { scannedSessions: number | bigint; ingestedSessions: number | bigint; appendedTurns: number | bigint; + appliedPendingStamps: number | bigint; } export declare function ingest(opts?: IngestOptions): Promise +export type PendingStampHarness = 'claude' | 'codex' | 'opencode'; +export interface WritePendingStampOptions { + harness: PendingStampHarness; + cwd: string; + enrichment: Record; + sessionDirHint?: string; + /** ISO timestamp, e.g. `2026-04-23T00:00:00.000Z`. Defaults to now. */ + spawnStartTs?: string; + spawnerPid?: number; + ledgerHome?: string; +} +export interface PendingStamp { + v: number; + harness: PendingStampHarness; + spawnerPid: number; + spawnStartTs: string; + cwd: string; + enrichment: Record; + sessionDirHint?: string; +} +export interface PendingStampWriteResult { + file: string; + stamp: PendingStamp; +} +export declare function writePendingStamp( + opts: WritePendingStampOptions, +): Promise + export interface SummaryOptions { session?: string; project?: string; /** ISO timestamp (e.g. `2026-04-01T00:00:00Z`) or relative range (`24h`, `7d`, `4w`, `2m`). */ since?: string; + /** Folded enrichment tag filters; every key/value pair must match. */ + tags?: Record; + /** Group summary costs/tokens by this folded enrichment tag key. */ + groupByTag?: string; ledgerHome?: string; } export declare function summary(opts?: SummaryOptions): Promise<{ @@ -42,6 +75,13 @@ export declare function summary(opts?: SummaryOptions): Promise<{ turnCount: number; byTool: Array<{ tool: string; tokens: number | bigint; cost: number; count: number }>; byModel: Array<{ model: string; tokens: number | bigint; cost: number }>; + byTag?: Array<{ + tag: string; + value?: string; + tokens: number | bigint; + cost: number; + turnCount: number | bigint; + }>; replacementSavings?: { calls: number | bigint; collapsedCalls: number | bigint; diff --git a/packages/sdk-node/src/index.js b/packages/sdk-node/src/index.js index 9be1a06e..8ba9d300 100644 --- a/packages/sdk-node/src/index.js +++ b/packages/sdk-node/src/index.js @@ -120,6 +120,10 @@ export async function compare(opts) { return coerceBigInts(await binding.compare(opts)); } +export async function writePendingStamp(opts) { + return coerceBigInts(await binding.writePendingStamp(opts)); +} + export function computeCompareExcluded(summary, minimum) { const out = { total: 0, aggregateOnly: 0, costOnly: 0, partial: 0, usageOnly: 0 }; if (minimum === 'partial') return out; diff --git a/packages/sdk-node/test/conformance.test.js b/packages/sdk-node/test/conformance.test.js index aab965b1..6423922e 100644 --- a/packages/sdk-node/test/conformance.test.js +++ b/packages/sdk-node/test/conformance.test.js @@ -8,7 +8,7 @@ import { test } from 'node:test'; import assert from 'node:assert/strict'; -import { mkdtempSync, rmSync, cpSync, mkdirSync } from 'node:fs'; +import { mkdtempSync, rmSync, cpSync, mkdirSync, readdirSync, readFileSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join, resolve, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -67,6 +67,7 @@ test('sdk facade exposes the expected verb set', async (t) => { 'overheadTrim', 'hotspots', 'compare', + 'writePendingStamp', 'computeCompareExcluded', 'search', 'exportLedger', @@ -87,6 +88,15 @@ test('read verbs return stable shapes against the fixture ledger', async (t) => assert.ok(Array.isArray(summary.byModel)); assert.ok(Array.isArray(summary.byTool)); + const taggedSummary = await sdk.summary({ + ledgerHome, + tags: { workflowId: 'wf-golden' }, + groupByTag: 'workflowId', + }); + assert.equal(taggedSummary.turnCount, 3); + assert.equal(taggedSummary.byTag[0].tag, 'workflowId'); + assert.equal(taggedSummary.byTag[0].value, 'wf-golden'); + const session = await sdk.sessionCost({ ledgerHome, session: '11111111-1111-1111-1111-111111111111', @@ -165,6 +175,36 @@ test('2.x extension verbs return stable shapes against the fixture ledger', asyn } }); +test('writePendingStamp writes a launcher-safe manifest', async (t) => { + const sdk = await loadNapiSdk(t); + if (!sdk) return; + + const ledgerHome = mkdtempSync(join(tmpdir(), 'relayburn-sdk-pending-')); + try { + const result = await sdk.writePendingStamp({ + ledgerHome, + harness: 'claude', + cwd: '/tmp/project', + enrichment: { persona: 'code-reviewer', agentworkforce: '1' }, + sessionDirHint: '/tmp/project/sessions', + spawnStartTs: '2026-04-23T00:00:00.000Z', + spawnerPid: 12345, + }); + + assert.match(result.file, /pending-stamps[/\\]claude-12345-/); + assert.equal(result.stamp.harness, 'claude'); + assert.equal(result.stamp.enrichment.persona, 'code-reviewer'); + + const files = readdirSync(join(ledgerHome, 'pending-stamps')); + assert.equal(files.length, 1); + const manifest = JSON.parse(readFileSync(join(ledgerHome, 'pending-stamps', files[0]), 'utf8')); + assert.equal(manifest.harness, 'claude'); + assert.equal(manifest.enrichment.agentworkforce, '1'); + } finally { + rmSync(ledgerHome, { recursive: true, force: true }); + } +}); + test('ingest scans an isolated empty home', async (t) => { const sdk = await loadNapiSdk(t); if (!sdk) return; @@ -180,6 +220,7 @@ test('ingest scans an isolated empty home', async (t) => { assert.equal(typeof report.scannedSessions, 'number'); assert.equal(typeof report.ingestedSessions, 'number'); assert.equal(typeof report.appendedTurns, 'number'); + assert.equal(typeof report.appliedPendingStamps, 'number'); } finally { if (prevHome === undefined) delete process.env.HOME; else process.env.HOME = prevHome; diff --git a/packages/sdk-node/test/esbuild-smoke.test.js b/packages/sdk-node/test/esbuild-smoke.test.js index f1a3858d..12b282fa 100644 --- a/packages/sdk-node/test/esbuild-smoke.test.js +++ b/packages/sdk-node/test/esbuild-smoke.test.js @@ -31,6 +31,7 @@ import { overheadTrim, hotspots, compare, + writePendingStamp, computeCompareExcluded, search, exportLedger, @@ -48,6 +49,7 @@ export const refs = { overheadTrim, hotspots, compare, + writePendingStamp, computeCompareExcluded, search, exportLedger, diff --git a/tests/fixtures/cli-golden/README.md b/tests/fixtures/cli-golden/README.md index b924fec2..1495056f 100644 --- a/tests/fixtures/cli-golden/README.md +++ b/tests/fixtures/cli-golden/README.md @@ -24,8 +24,8 @@ tests/fixtures/cli-golden/ `invocations.json` lists the CLI surfaces the diff runner knows about. The set covers read-path commands (`summary`, `hotspots`, `overhead`, `overhead trim`, `compare`, `state status`) in human and JSON forms, plus help text for -action-path commands (`burn ingest --help`, `burn run --help`, -`burn mcp-server --help`) and top-level `burn --help`. +action-path commands (`burn ingest --help`, `burn mcp-server --help`) and +top-level `burn --help`. Action-path commands themselves are deliberately not snapshotted: their output depends on a real spawn lifecycle or watch loop, which cannot be reproduced diff --git a/tests/fixtures/cli-golden/invocations.json b/tests/fixtures/cli-golden/invocations.json index da73f6d0..89227783 100644 --- a/tests/fixtures/cli-golden/invocations.json +++ b/tests/fixtures/cli-golden/invocations.json @@ -77,12 +77,6 @@ "expectStatus": 0, "enabled": false }, - { - "name": "run-help", - "args": ["run", "--help"], - "expectStatus": 0, - "enabled": false - }, { "name": "mcp-server-help", "args": ["mcp-server", "--help"], diff --git a/tests/fixtures/cli-golden/snapshots/run-help.stdout.txt b/tests/fixtures/cli-golden/snapshots/run-help.stdout.txt deleted file mode 100644 index 1fef452d..00000000 --- a/tests/fixtures/cli-golden/snapshots/run-help.stdout.txt +++ /dev/null @@ -1,11 +0,0 @@ -burn run — spawn an agent harness with attribution - -Usage: - burn run [--tag k=v ...] [-- ] - -Known harnesses: claude, codex, opencode - -Examples: - burn run claude --tag workflow=refactor -- --resume - burn run codex --tag workflow=refactor - burn run opencode --tag workflow=refactor diff --git a/tests/fixtures/cli-golden/snapshots/top-level-help.stdout.txt b/tests/fixtures/cli-golden/snapshots/top-level-help.stdout.txt index 965d84b5..ababd0f4 100644 --- a/tests/fixtures/cli-golden/snapshots/top-level-help.stdout.txt +++ b/tests/fixtures/cli-golden/snapshots/top-level-help.stdout.txt @@ -1,15 +1,14 @@ burn — token usage & cost attribution for agent CLIs Usage: - burn summary [--since 7d] [--project ] [--session ] [--workflow ] [--agent ] [--provider

] [--quality] - [--by-provider | --by-tool | --by-subagent-type | --by-relationship[=subagent] | --subagent-tree ] [--no-archive] + burn summary [--since 7d] [--project ] [--session ] [--workflow ] [--tag k=v] [--agent ] [--provider

] [--quality] + [--by-provider | --by-tool | --by-subagent-type | --by-relationship[=subagent] | --subagent-tree | --group-by-tag ] [--no-archive] (mode flags are mutually exclusive; --by-tool emits tool | calls | attributedCost) burn hotspots [--since 7d] [--project ] [--workflow ] [--provider

] [--all] [--json] [--session [id]] [--explain-drift] [--patterns[=retries,failures,compaction,reverts]] [--findings] burn overhead [trim] [--project ] [--since 7d] [--kind ] [--top ] [--json] burn compare [--since 7d] [--project ] [--session ] [--workflow ] [--agent ] [--min-sample ] [--json|--csv] - burn run [--tag k=v ...] [-- ] burn ingest [--watch|--hook ] [--interval ] [--quiet] burn mcp-server [--session-id ] (stdio MCP server for in-session self-query) burn state [status] [--json] @@ -20,6 +19,8 @@ Usage: Examples: burn summary --since 24h burn summary --by-provider --provider synthetic + burn summary --tag persona=code-reviewer + burn summary --group-by-tag persona burn summary --subagent-tree burn summary --by-subagent-type --since 7d burn summary --by-relationship --since 7d @@ -33,9 +34,6 @@ Examples: burn overhead trim --top 3 burn overhead trim --json burn compare claude-sonnet-4-6,claude-haiku-4-5 --since 30d - burn run claude --tag workflow=refactor -- --resume - burn run codex --tag workflow=refactor - burn run opencode --tag workflow=refactor burn ingest burn ingest --watch burn ingest --watch --opencode-stream