From a1b4660e401abb547a938b806140c06214557f1b Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Wed, 6 May 2026 19:45:43 -0400 Subject: [PATCH 1/2] relayburn-cli: burn ingest + burn mcp-server (#248 D8, closes #210) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire `burn ingest` as a thin presenter over `relayburn_sdk::ingest_all` plus the SDK's `start_watch_loop` controller. Three modes mirror the TS sibling at `packages/cli/src/commands/ingest.ts`: - No flags: one full sweep, then exit. Logs `[burn] ingest: ingested N session(s) (+M turn(s))` on stderr. - `--watch [--interval MS]`: foreground poll loop. Skips empty-tick log lines (TS shape); SIGINT / SIGTERM stop drains in-flight ticks before returning. - `--hook claude [--quiet]`: stdin-driven Claude hook entrypoint. Validates the `session_id` / `transcript_path` payload shape, then drives a full sweep (cursor short-circuit means cost is bounded by new turns, not the hook payload). Failure paths log + exit 0 to avoid blocking the parent Claude tool call. Wire `burn mcp-server` as a hand-rolled JSON-RPC 2.0 stdio server, mirroring `packages/mcp/src/server.ts` rather than depending on `rmcp`. The on-wire surface (`initialize`, `ping`, `tools/list`, `tools/call`) is small enough that a tight in-tree implementation beats freezing a heavy SDK version. Single tool ships in this PR: `burn__sessionCost`, a 1:1 port of `packages/mcp/src/tools/session-cost.ts` delegating to `LedgerHandle::session_cost`. Future tools (summary, hotspots, …) land separately. Closes #210. Smoke test drops `ingest` and `mcp-server` from `UNIMPLEMENTED_SUBCOMMANDS`; `--help` for both subcommands still passes the existing help-shape assertion. Manual stdio handshake verified: `initialize` echoes the client's `protocolVersion`, `tools/list` returns `burn__sessionCost`, and `tools/call burn__sessionCost` returns the SDK payload as both stringified `text` content and a `structuredContent` mirror. --- CHANGELOG.md | 1 + Cargo.lock | 31 ++ crates/relayburn-cli/Cargo.toml | 6 +- crates/relayburn-cli/src/cli.rs | 68 ++- crates/relayburn-cli/src/commands/ingest.rs | 365 +++++++++++++++- .../relayburn-cli/src/commands/mcp_server.rs | 408 +++++++++++++++++- crates/relayburn-cli/src/main.rs | 4 +- crates/relayburn-cli/tests/smoke.rs | 12 +- 8 files changed, 869 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3eaaa258..aa8d8406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Cross-package release notes for relayburn. Package changelogs contain package-le ## [Unreleased] +- `relayburn-cli` (Rust): wire `burn ingest` (no-flag scan, `--watch` poll loop, `--hook claude --quiet`) and `burn mcp-server` stdio subcommand exposing `burn__sessionCost`; closes #210. (#248 D8) - `relayburn-cli` (Rust): wire `burn compare` as a presenter over `relayburn_sdk::analyze::compare` building blocks (`build_compare_table` + the per-turn fidelity gate), matching the TS CLI flag set (positional comma-separated model list, `--include-partial` / `--fidelity` / `--since` / `--project` / `--session` / `--min-sample` / `--csv` / `--no-archive`) and producing byte-equivalent stdout for the cli-golden `compare` / `compare-json` invocations. (#248 D3) - `relayburn-cli` (Rust): port `burn overhead` and `burn overhead trim` as thin presenters over `relayburn_sdk::overhead` / `::overhead_trim`. Output (human + `--json`) is byte-equivalent with the TS CLI. (#248 D2) - `relayburn-cli` (Rust): wire `burn state` as a typed clap subcommand with `status` (default), `rebuild`, `prune`, and `reset` verbs over `relayburn-sdk`. `state status` reports per-table row counts in `burn.sqlite`, the row count in `content.sqlite`, the `archive_state` schema/last-built/last-rebuild fields, and the resolved retention config; `--json` emits the structured `StateStatus` payload. `state rebuild {index,content,archive,all}` drives `Ledger::rebuild_derivable`; `state prune` drives `Ledger::prune_content_older_than`. `state reset` and standalone `state rebuild classify` are stubbed pending a follow-up. (#248 D4) diff --git a/Cargo.lock b/Cargo.lock index 922b59a6..1ab10661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,6 +521,17 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mio" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + [[package]] name = "napi" version = "2.16.17" @@ -954,6 +965,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "siphasher" version = "1.0.3" @@ -1029,8 +1050,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "110a78583f19d5cdb2c5ccf321d1290344e71313c6c37d43520d386027d18386" dependencies = [ "bytes", + "libc", + "mio", "pin-project-lite", + "signal-hook-registry", "tokio-macros", + "windows-sys", ] [[package]] @@ -1101,6 +1126,12 @@ dependencies = [ "libc", ] +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + [[package]] name = "wasip2" version = "1.0.3+wasi-0.2.9" diff --git a/crates/relayburn-cli/Cargo.toml b/crates/relayburn-cli/Cargo.toml index 669a8cd8..a3635c94 100644 --- a/crates/relayburn-cli/Cargo.toml +++ b/crates/relayburn-cli/Cargo.toml @@ -77,7 +77,11 @@ thiserror = { workspace = true } # bodies via a current-thread runtime. async-trait = "0.1" phf = { version = "0.11", features = ["macros"] } -tokio = { workspace = true, features = ["sync", "rt"] } +# `signal` is needed for the `burn ingest --watch` SIGINT/SIGTERM trap +# (#248 D8); the watch loop blocks the foreground until a stop signal +# comes in. `rt` drives the current-thread runtime that wraps the SDK's +# async ingest verb from otherwise-sync presenter bodies. +tokio = { workspace = true, features = ["sync", "rt", "signal"] } # `IndexMap` preserves first-seen iteration order, which matters for the # Wave 2 read-path commands so their grouped output (`summary --by-model`, diff --git a/crates/relayburn-cli/src/cli.rs b/crates/relayburn-cli/src/cli.rs index 7a123ea6..b73c3786 100644 --- a/crates/relayburn-cli/src/cli.rs +++ b/crates/relayburn-cli/src/cli.rs @@ -115,12 +115,76 @@ pub enum Command { State(StateArgs), /// Scan harness session stores and append new turns to the ledger. - Ingest, + Ingest(IngestArgs), /// Stdio MCP server exposing read-only ledger queries for /// in-session self-query. #[command(name = "mcp-server")] - McpServer, + McpServer(McpServerArgs), +} + +/// Per-command flags for `burn ingest`. Mirrors the TS surface in +/// `packages/cli/src/commands/ingest.ts` so flag muscle memory carries +/// across. +/// +/// Three modes, exactly one applies per invocation: +/// +/// - No flags: scan all known session stores once and exit. +/// - `--watch` (optionally with `--interval `): foreground poll loop +/// driven by [`relayburn_sdk::start_watch_loop`]. +/// - `--hook [--quiet]`: stdin-driven hook entrypoint. Today +/// only `--hook claude` is supported; the `--quiet` flag suppresses +/// non-error stderr breadcrumbs so it is safe to call from every +/// Claude Code hook. +/// +/// `--watch` and `--hook` are mutually exclusive; the presenter rejects +/// the combination at runtime with exit 2 (matching TS). +#[derive(Debug, Clone, ClapArgs)] +pub struct IngestArgs { + /// Stay running and poll session stores at `--interval` ms. + /// Mutually exclusive with `--hook`. + #[arg(long)] + pub watch: bool, + + /// Poll interval for `--watch`, in milliseconds. Defaults to 1000. + /// Ignored without `--watch`. + #[arg(long, value_name = "MS")] + pub interval: Option, + + /// Read a harness-specific hook payload from stdin and ingest the + /// transcript it references. Today only `claude` is supported. + /// Mutually exclusive with `--watch`. + #[arg(long, value_name = "HARNESS")] + pub hook: Option, + + /// Suppress non-error stderr breadcrumbs. Used by hook callers so + /// the surrounding tool invocation isn't blocked by a noisy + /// pipeline. + #[arg(long)] + pub quiet: bool, +} + +/// Per-command flags for `burn mcp-server`. The stdio MCP server speaks +/// JSON-RPC 2.0 line-delimited frames over stdin/stdout and exposes the +/// `burn__sessionCost` read-only tool. Closes #210. +/// +/// Global `--ledger-path` (on [`Args`]) is consulted as the SDK ledger +/// home. `--session-id` registers a default session id so MCP clients +/// that omit `sessionId` in `tools/call` get a useful answer (the +/// running agent's own session). +#[derive(Debug, Clone, ClapArgs)] +pub struct McpServerArgs { + /// Default sessionId to use when `tools/call burn__sessionCost` + /// omits the argument. Lets the host wrap the server with the + /// running agent's own session id so the agent can self-query + /// without knowing it. + #[arg(long = "session-id", value_name = "ID")] + pub session_id: Option, + + /// Emit protocol-level diagnostics to stderr. Off by default so a + /// well-behaved client doesn't see unexpected noise on the channel. + #[arg(long)] + pub debug: bool, } /// Per-command flag set for `burn compare`. Mirrors diff --git a/crates/relayburn-cli/src/commands/ingest.rs b/crates/relayburn-cli/src/commands/ingest.rs index 0b49eac2..8c76f1a1 100644 --- a/crates/relayburn-cli/src/commands/ingest.rs +++ b/crates/relayburn-cli/src/commands/ingest.rs @@ -2,13 +2,364 @@ //! known session store once; `--watch` keeps polling; `--hook claude //! --quiet` is the stdin-driven Claude hook path. //! -//! Stub. Wave 2 D8 wires this up over the `relayburn_sdk::ingest_all` -//! verb plus the `relayburn_sdk` watch-loop primitives. TS source of -//! truth: `packages/cli/src/commands/ingest.ts` plus -//! `packages/ingest/src/watch-loop.ts`. +//! Thin presenter over the SDK ingest verb plus the SDK's watch-loop +//! controller. TS source of truth: `packages/cli/src/commands/ingest.ts` +//! plus `packages/ingest/src/watch-loop.ts`. +//! +//! The Rust port keeps the three modes as a single subcommand so +//! `burn ingest` retains its TS muscle memory: +//! +//! - No flags = `runIngestOnce` — one full sweep, then exit. +//! - `--watch` = `runIngestWatch` — foreground poll loop until SIGINT +//! / SIGTERM. +//! - `--hook claude` = `runIngestHook` — stdin-driven hook payload. +//! Today only `--hook claude` is wired here (Codex / OpenCode hooks +//! were never part of the TS surface either). The hook path +//! currently ingests via a full `ingest_all` sweep, since the SDK +//! does not yet expose a single-transcript verb. Practically this +//! is no slower than the TS hook because Claude hooks fire at +//! session-end and the sweep short-circuits on unchanged cursors; +//! the cost is bounded by the number of new sessions, not by the +//! hook payload. +//! +//! Output shape: every successful run writes a single +//! `[burn] ingest: ingested N session(s) (+M turn(s))` line on stderr +//! (matching TS) so callers wrapping the binary can grep it without +//! parsing JSON. `--quiet` suppresses the line when the report is +//! empty; non-empty runs still log to stderr for parity with TS. + +use std::io::{self, Read}; +use std::sync::Arc; +use std::time::Duration; + +use relayburn_sdk::{ + ingest_all, start_watch_loop, IngestReport, Ledger, LedgerHandle, LedgerOpenOptions, + RawIngestOptions, StartWatchLoopOptions, +}; + +use crate::cli::{GlobalArgs, IngestArgs}; +use crate::render::error::report_error; + +/// Exit codes mirror the TS CLI: +/// - `0` happy path (including hook-mode empty-payload no-op). +/// - `1` typed/unknown errors during a non-watch run (parse, IO). +/// - `2` flag misuse (`--watch` + `--hook`, unsupported `--hook`, +/// `--hook` without value, `--interval` not a positive integer). +const EXIT_FLAG_MISUSE: i32 = 2; + +/// Entrypoint for the `burn ingest` subcommand. Dispatches on the flag +/// triple (`watch`, `hook`, default) and lets the SDK do the heavy +/// lifting. +pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 { + // Mutually-exclusive guard: TS rejects `--watch --hook` with exit 2 + // before doing any IO. Mirror that here so flag misuse gets a stable + // shell-script-friendly contract. + if args.watch && args.hook.is_some() { + eprintln!("burn: ingest --watch and --hook are mutually exclusive"); + return EXIT_FLAG_MISUSE; + } + + if let Some(hook) = args.hook.as_deref() { + return run_hook(globals, hook, args.quiet); + } + if args.watch { + return run_watch(globals, &args); + } + run_once(globals, args.quiet) +} + +/// One-shot scan: open the ledger, run a single `ingest_all`, log the +/// summary, exit. Drives a current-thread tokio runtime so the otherwise +/// sync presenter can drive the async SDK verb. +fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 { + let mut handle = match open_handle(globals) { + Ok(h) => h, + Err(err) => return report_error(&err, globals), + }; + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => return report_error(&err, globals), + }; + let opts = RawIngestOptions::default(); + match rt.block_on(ingest_all(handle.raw_mut(), &opts)) { + Ok(report) => { + log_report(&report, quiet); + 0 + } + Err(err) => report_error(&err, globals), + } +} + +/// `--watch` mode: spin up [`start_watch_loop`] over a persistent ledger +/// handle and a tokio runtime, then park on SIGINT / SIGTERM. +/// +/// We share the ledger handle across ticks via an `Arc` so the +/// poll loop reuses one open SQLite connection per process — same shape +/// as the TS adapter, which keeps a single `withLock('ledger', …)` +/// guarded handle alive for the duration of the watch. `RawIngestOptions` +/// is `Default` per tick because none of the per-tick state (progress +/// callbacks, etc.) needs to survive across ticks. +fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 { + let interval_ms = match args.interval { + Some(n) if n == 0 => { + eprintln!("burn: ingest --interval must be a positive integer in milliseconds"); + return EXIT_FLAG_MISUSE; + } + Some(n) => n, + None => 1000, + }; + + let handle = match open_handle(globals) { + Ok(h) => h, + Err(err) => return report_error(&err, globals), + }; + + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => return report_error(&err, globals), + }; + + let quiet = args.quiet; + if !quiet { + eprintln!( + "[burn] ingest: foreground ingest every {interval_ms}ms; Ctrl-C to stop", + ); + } + + rt.block_on(async move { + let handle_arc: Arc> = + Arc::new(tokio::sync::Mutex::new(handle)); + let handle_for_ingest = handle_arc.clone(); + let ingest_fn: relayburn_sdk::IngestFn = Arc::new(move || { + let h = handle_for_ingest.clone(); + Box::pin(async move { + let mut guard = h.lock().await; + ingest_all(guard.raw_mut(), &RawIngestOptions::default()).await + }) + }); + + let on_report: relayburn_sdk::ReportSink = Arc::new(move |report: &IngestReport| { + // Match TS: only log a summary when the tick actually + // appended turns. Empty ticks would otherwise drown the + // user with zero-progress lines. + if !quiet && report.appended_turns > 0 { + eprint!("{}", render_ingest_line(report)); + } + }); + + let on_error: relayburn_sdk::ErrorSink = Arc::new(|err: &anyhow::Error| { + eprintln!("[burn] ingest: {err}"); + }); + + let opts = StartWatchLoopOptions::new(ingest_fn) + .with_interval(Duration::from_millis(interval_ms)) + .with_immediate(true) + .with_on_report(on_report) + .with_on_error(on_error); + let controller = start_watch_loop(opts); + + wait_for_stop_signal().await; + controller.stop().await; + }); + + 0 +} + +/// `--hook `: read a JSON payload from stdin and ingest the +/// transcript it references. Today only `--hook claude` is supported. +/// +/// The TS implementation tries hard not to fail Claude Code hooks (a +/// non-zero exit can block the surrounding tool call); the Rust port +/// keeps that policy — every error is logged to stderr but the exit +/// code is `0` so the calling Claude Code session continues. +fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 { + if hook != "claude" { + eprintln!("burn: unsupported hook harness: {hook}"); + return EXIT_FLAG_MISUSE; + } + let raw = match read_stdin() { + Ok(s) => s, + Err(err) => { + // Hook callers expect us not to break the parent. Log + 0. + eprintln!("[burn] ingest: failed to read stdin: {err}"); + return 0; + } + }; + if raw.trim().is_empty() { + if !quiet { + eprintln!("[burn] ingest: empty stdin payload, nothing to do"); + } + return 0; + } + + // Validate the payload shape so we don't trigger a full sweep on + // unrelated stdin content. The TS hook ignores payloads missing + // `session_id` / `transcript_path`; mirror that. + match serde_json::from_str::(&raw) { + Ok(v) => { + let has_session = v.get("session_id").and_then(|x| x.as_str()).is_some(); + let has_transcript = v.get("transcript_path").and_then(|x| x.as_str()).is_some(); + if !has_session || !has_transcript { + if !quiet { + eprintln!( + "[burn] ingest: payload missing session_id or transcript_path; ignoring", + ); + } + return 0; + } + } + Err(err) => { + eprintln!("[burn] ingest: invalid JSON payload: {err}"); + return 0; + } + } + + // Drive a full sweep. The SDK does not (yet) expose a + // single-transcript verb; `ingest_all` short-circuits unchanged + // cursors so the practical cost is bounded by the new turns this + // hook fires for. Matches the TS hook's "ingest the matching + // session" intent — the Claude transcript that just changed will + // be picked up by `ingest_claude_into` on the same sweep. + let mut handle = match open_handle(globals) { + Ok(h) => h, + Err(err) => { + // Hook policy: never fail the parent. + eprintln!("[burn] ingest: {err}"); + return 0; + } + }; + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => { + eprintln!("[burn] ingest: {err}"); + return 0; + } + }; + let opts = RawIngestOptions::default(); + match rt.block_on(ingest_all(handle.raw_mut(), &opts)) { + Ok(report) => { + // In hook mode we keep stderr quiet by default; only log + // when work was actually done so a per-tool-call hook + // doesn't spam the user. + if !quiet && report.appended_turns > 0 { + eprint!("{}", render_ingest_line(&report)); + } + } + Err(err) => { + eprintln!("[burn] ingest: {err}"); + } + } + 0 +} + +/// Open a ledger honoring the global `--ledger-path` override. +fn open_handle(globals: &GlobalArgs) -> anyhow::Result { + let opts = match globals.ledger_path.as_deref() { + Some(h) => LedgerOpenOptions::with_home(h), + None => LedgerOpenOptions::default(), + }; + Ok(Ledger::open(opts)?) +} + +/// Format an `IngestReport` as the canonical TS log line. Kept as a +/// pure helper so the watch loop and one-shot mode share output shape. +fn render_ingest_line(report: &IngestReport) -> String { + let session_word = if report.ingested_sessions == 1 { + "session" + } else { + "sessions" + }; + let turn_word = if report.appended_turns == 1 { + "turn" + } else { + "turns" + }; + format!( + "[burn] ingest: ingested {} {session_word} (+{} {turn_word})\n", + report.ingested_sessions, report.appended_turns, + ) +} + +/// Log the canonical `[burn] ingest: ...` line on stderr. Suppressed in +/// `--quiet` mode unless work was done — empty zero-progress reports +/// would otherwise be noise on every invocation. +fn log_report(report: &IngestReport, quiet: bool) { + if quiet && report.appended_turns == 0 { + return; + } + eprint!("{}", render_ingest_line(report)); +} + +/// Read all of stdin into a String. Returns empty string when stdin is +/// a TTY (no payload) — TS uses the same `process.stdin.isTTY` guard. +fn read_stdin() -> io::Result { + use std::io::IsTerminal; + let stdin = io::stdin(); + if stdin.is_terminal() { + return Ok(String::new()); + } + let mut buf = String::new(); + stdin.lock().read_to_string(&mut buf)?; + Ok(buf) +} + +/// Park until SIGINT or SIGTERM. Cross-platform via tokio's `ctrl_c` for +/// SIGINT; SIGTERM is wired only on Unix because Windows lacks the +/// signal. The watch loop's controller will drain in-flight ticks before +/// returning so callers see all observable side effects. +async fn wait_for_stop_signal() { + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + let mut sigterm = match signal(SignalKind::terminate()) { + Ok(s) => s, + Err(_) => { + // If we can't install SIGTERM, fall back to ctrl_c only. + let _ = tokio::signal::ctrl_c().await; + return; + } + }; + tokio::select! { + _ = tokio::signal::ctrl_c() => {} + _ = sigterm.recv() => {} + } + } + #[cfg(not(unix))] + { + let _ = tokio::signal::ctrl_c().await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn render_ingest_line_pluralizes_consistently() { + let one = render_ingest_line(&IngestReport { + scanned_sessions: 1, + ingested_sessions: 1, + appended_turns: 1, + }); + assert_eq!(one, "[burn] ingest: ingested 1 session (+1 turn)\n"); -use crate::cli::GlobalArgs; + let many = render_ingest_line(&IngestReport { + scanned_sessions: 3, + ingested_sessions: 2, + appended_turns: 5, + }); + assert_eq!(many, "[burn] ingest: ingested 2 sessions (+5 turns)\n"); -pub fn run(globals: &GlobalArgs) -> i32 { - super::not_yet_implemented("ingest", globals) + let zero = render_ingest_line(&IngestReport::default()); + assert_eq!(zero, "[burn] ingest: ingested 0 sessions (+0 turns)\n"); + } } diff --git a/crates/relayburn-cli/src/commands/mcp_server.rs b/crates/relayburn-cli/src/commands/mcp_server.rs index 29e439ab..ff152ce8 100644 --- a/crates/relayburn-cli/src/commands/mcp_server.rs +++ b/crates/relayburn-cli/src/commands/mcp_server.rs @@ -1,12 +1,408 @@ //! `burn mcp-server` — stdio MCP server exposing read-only ledger //! queries for in-session self-query (closes #210). //! -//! Stub. Wave 2 D8 wires this up via `rmcp` around the SDK's read-only -//! query verbs (`session_cost`, `summary`, `hotspots`, …). TS source -//! of truth: `packages/cli/src/commands/mcp-server.ts`. +//! The TS sibling (`packages/mcp/src/server.ts`) hand-rolls a minimal +//! JSON-RPC 2.0 line-delimited server rather than depending on a heavy +//! SDK; the Rust port mirrors that decision. The on-wire shape is tiny +//! (`initialize`, `ping`, `tools/list`, `tools/call`, plus +//! notifications), and freezing a specific `rmcp` version buys us +//! nothing for the read-only surface this command exposes. If the +//! protocol evolves, this module is localized enough to update in one +//! place — same trade-off the TS sibling makes. +//! +//! Tool surface for D8 is intentionally minimal: only `burn__sessionCost` +//! ships in this PR (it is the canonical thin-SDK-wrapper pattern, +//! mirroring `packages/mcp/src/tools/session-cost.ts` 1:1). Other tools +//! (`summary`, `hotspots`, …) are tracked as follow-ups so the scope of +//! D8 stays tight. + +use std::io::{BufRead, Write}; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use relayburn_sdk::{ + Ledger, LedgerHandle, LedgerOpenOptions, SessionCostOptions, SessionCostResult, +}; + +use crate::cli::{GlobalArgs, McpServerArgs}; +use crate::render::error::report_error; + +/// Latest MCP protocol revision we know how to speak. Clients negotiate; +/// we echo the client's declared version when present (treating it as a +/// superset declaration), else fall back to this baseline. Mirrors +/// `packages/mcp/src/server.ts`. +const PROTOCOL_VERSION: &str = "2025-03-26"; +/// Server name surfaced in the `initialize` reply. Kept distinct from +/// the binary name so MCP clients can disambiguate the Rust port from +/// the TS one in their server inventories. +const SERVER_NAME: &str = "relayburn-mcp"; +/// Server version surfaced in the `initialize` reply. Bumped manually +/// when the tool surface changes; `cargo` doesn't let us read the +/// package version at runtime without `env!`. +const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION"); + +pub fn run(globals: &GlobalArgs, args: McpServerArgs) -> i32 { + // Open the ledger up front so a config error fails loud before any + // MCP traffic flows. The handle is held by the tool dispatcher for + // the life of the server — one connection per process matches the + // TS server. + let handle = match open_handle(globals) { + Ok(h) => h, + Err(err) => return report_error(&err, globals), + }; + + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => return report_error(&err, globals), + }; + + let server = Server { + handle: Arc::new(tokio::sync::Mutex::new(handle)), + default_session_id: args.session_id.clone(), + debug: args.debug, + }; + + rt.block_on(server.run()); + 0 +} + +fn open_handle(globals: &GlobalArgs) -> anyhow::Result { + let opts = match globals.ledger_path.as_deref() { + Some(h) => LedgerOpenOptions::with_home(h), + None => LedgerOpenOptions::default(), + }; + Ok(Ledger::open(opts)?) +} + +// --------------------------------------------------------------------------- +// JSON-RPC envelopes +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct JsonRpcRequest { + /// Request id is optional in JSON-RPC: when absent the message is a + /// notification and we must not reply. We deserialize it as a `Value` + /// to preserve numeric / string id types verbatim on the way back — + /// MCP clients use both shapes. + #[serde(default)] + id: Option, + method: String, + #[serde(default)] + params: Value, +} + +#[derive(Debug, Serialize)] +struct JsonRpcSuccess<'a> { + jsonrpc: &'static str, + id: &'a Value, + result: Value, +} + +#[derive(Debug, Serialize)] +struct JsonRpcError<'a> { + jsonrpc: &'static str, + id: &'a Value, + error: JsonRpcErrorBody, +} + +#[derive(Debug, Serialize)] +struct JsonRpcErrorBody { + code: i32, + message: String, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, +} + +// --------------------------------------------------------------------------- +// Server +// --------------------------------------------------------------------------- + +struct Server { + handle: Arc>, + default_session_id: Option, + debug: bool, +} + +impl Server { + async fn run(self) { + // Read line-delimited JSON-RPC frames off stdin. Tokio doesn't + // give us a stable cross-platform stdin AsyncBufRead without + // pulling more deps, and the MCP spec is one frame per line, so + // a blocking BufRead loop on a dedicated thread is the cleanest + // shape. We marshal each frame back into the runtime via a + // bounded channel so tool handlers can use the SDK's async + // surface. + let (tx, mut rx) = tokio::sync::mpsc::channel::(64); + let stdin_thread = std::thread::spawn(move || { + let stdin = std::io::stdin(); + let lock = stdin.lock(); + for line in lock.lines() { + let line = match line { + Ok(l) => l, + Err(_) => break, + }; + if line.trim().is_empty() { + continue; + } + if tx.blocking_send(line).is_err() { + break; + } + } + }); + + while let Some(frame) = rx.recv().await { + self.handle_frame(&frame).await; + } + + let _ = stdin_thread.join(); + } + + async fn handle_frame(&self, frame: &str) { + let parsed: serde_json::Result = serde_json::from_str(frame); + let value = match parsed { + Ok(v) => v, + Err(err) => { + if self.debug { + eprintln!("[burn mcp] parse error: {err}"); + } + write_response(&error_envelope(&Value::Null, -32700, "parse error", None)); + return; + } + }; + if !value.is_object() { + write_response(&error_envelope(&Value::Null, -32600, "invalid request", None)); + return; + } + + // Notifications carry no `id` field. Per JSON-RPC 2.0 we must + // not reply to them. The MCP spec uses `notifications/initialized` + // and `notifications/cancelled`; both are safe to ignore for a + // tools-only server. + let has_id = value.get("id").is_some(); + if !has_id { + return; + } + + let req: JsonRpcRequest = match serde_json::from_value(value.clone()) { + Ok(r) => r, + Err(err) => { + if self.debug { + eprintln!("[burn mcp] bad request shape: {err}"); + } + let id = value.get("id").cloned().unwrap_or(Value::Null); + write_response(&error_envelope(&id, -32600, "invalid request", None)); + return; + } + }; + // Unwrap is safe: we already confirmed the field is present + // above. Default to `null` defensively so a misbehaving client + // can't crash the server by sending `id: null`. + let id = req.id.unwrap_or(Value::Null); + + match req.method.as_str() { + "initialize" => self.handle_initialize(&id, &req.params), + "ping" => write_success(&id, json!({})), + "tools/list" => self.handle_tools_list(&id), + "tools/call" => self.handle_tools_call(&id, &req.params).await, + other => { + write_response(&error_envelope( + &id, + -32601, + &format!("method not found: {other}"), + None, + )); + } + } + } + + fn handle_initialize(&self, id: &Value, params: &Value) { + let client_version = params + .get("protocolVersion") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let protocol_version = client_version.unwrap_or_else(|| PROTOCOL_VERSION.to_string()); + let result = json!({ + "protocolVersion": protocol_version, + "capabilities": { "tools": { "listChanged": false } }, + "serverInfo": { "name": SERVER_NAME, "version": SERVER_VERSION }, + }); + write_success(id, result); + } + + fn handle_tools_list(&self, id: &Value) { + let tools = json!([ + { + "name": "burn__sessionCost", + "description": + "Return the total cost (USD), token count, and turn count for a session. \ + Defaults to the server's registered sessionId (the running agent's own \ + session). Read-only.", + "inputSchema": { + "type": "object", + "properties": { + "sessionId": { + "type": "string", + "description": + "Override the registered session id. Omit to query the running \ + agent's own session.", + }, + }, + "required": [], + "additionalProperties": false, + }, + } + ]); + write_success(id, json!({ "tools": tools })); + } + + async fn handle_tools_call(&self, id: &Value, params: &Value) { + let name = params.get("name").and_then(|v| v.as_str()); + let Some(name) = name else { + write_response(&error_envelope( + id, + -32602, + "tools/call requires a name", + None, + )); + return; + }; + let args = params.get("arguments").cloned().unwrap_or(json!({})); + match name { + "burn__sessionCost" => self.tool_session_cost(id, &args).await, + other => { + write_response(&error_envelope( + id, + -32601, + &format!("unknown tool: {other}"), + None, + )); + } + } + } + + async fn tool_session_cost(&self, id: &Value, args: &Value) { + let override_id = args + .get("sessionId") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let session = override_id + .clone() + .or_else(|| self.default_session_id.clone()); + + // Run the SDK call. `session_cost` is sync but we already hold a + // ledger handle — call directly on it via + // `LedgerHandle::session_cost` so we don't re-open the ledger + // every call. The free `relayburn_sdk::session_cost` would + // open + close per call, which is wasteful for a long-lived + // server. + // + // This branch is intentionally cheap: the entire body is CPU / + // SQLite work, so the `await` below only yields if the global + // ledger lock is contended (it isn't — we're the only user). + let opts = SessionCostOptions { + session: session.clone(), + ledger_home: None, + }; + let handle_guard = self.handle.lock().await; + let result = handle_guard.session_cost(opts); + drop(handle_guard); + + let mut payload: SessionCostResult = match result { + Ok(r) => r, + Err(err) => { + let msg = err.to_string(); + // Per MCP convention: tool errors are non-throwing + // results with `isError: true`. Reserve JSON-RPC errors + // for protocol problems (parse / method-not-found). + write_success( + id, + json!({ + "content": [{ "type": "text", "text": msg }], + "isError": true, + }), + ); + return; + } + }; + + // Mirror TS: when no override and no registered default, surface + // a more descriptive note than the SDK's generic one. + if payload.session_id.is_none() && override_id.is_none() && self.default_session_id.is_none() + { + payload.note = Some( + "no session id provided and server was not registered with one".to_string(), + ); + } + + let value = serde_json::to_value(&payload).unwrap_or(Value::Null); + let text = serde_json::to_string(&value).unwrap_or_else(|_| "{}".to_string()); + write_success( + id, + json!({ + "content": [{ "type": "text", "text": text }], + "structuredContent": value, + }), + ); + } +} + +// --------------------------------------------------------------------------- +// Wire I/O +// --------------------------------------------------------------------------- + +fn write_success(id: &Value, result: Value) { + let env = JsonRpcSuccess { + jsonrpc: "2.0", + id, + result, + }; + write_response(&serde_json::to_value(&env).unwrap_or(Value::Null)); +} + +fn error_envelope(id: &Value, code: i32, message: &str, data: Option) -> Value { + let env = JsonRpcError { + jsonrpc: "2.0", + id, + error: JsonRpcErrorBody { + code, + message: message.to_string(), + data, + }, + }; + serde_json::to_value(&env).unwrap_or(Value::Null) +} + +fn write_response(value: &Value) { + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + if let Ok(mut s) = serde_json::to_string(value) { + s.push('\n'); + let _ = out.write_all(s.as_bytes()); + let _ = out.flush(); + } +} -use crate::cli::GlobalArgs; +#[cfg(test)] +mod tests { + use super::*; -pub fn run(globals: &GlobalArgs) -> i32 { - super::not_yet_implemented("mcp-server", globals) + /// The wire protocol is small enough to unit-test the framing + /// helpers without spinning up a full server. + #[test] + fn error_envelope_carries_code_and_message() { + let v = error_envelope(&json!(7), -32601, "method not found: foo", None); + assert_eq!(v.get("jsonrpc"), Some(&Value::String("2.0".into()))); + assert_eq!(v.get("id"), Some(&json!(7))); + let err = v.get("error").unwrap(); + assert_eq!(err.get("code"), Some(&json!(-32601))); + assert_eq!( + err.get("message"), + Some(&Value::String("method not found: foo".into())), + ); + } } diff --git a/crates/relayburn-cli/src/main.rs b/crates/relayburn-cli/src/main.rs index abfcecc4..a6385420 100644 --- a/crates/relayburn-cli/src/main.rs +++ b/crates/relayburn-cli/src/main.rs @@ -37,7 +37,7 @@ fn dispatch(args: Args) -> i32 { Command::Compare(args) => commands::compare::run(&globals, args), Command::Run => commands::run::run(&globals), Command::State(args) => commands::state::run(&globals, args), - Command::Ingest => commands::ingest::run(&globals), - Command::McpServer => commands::mcp_server::run(&globals), + Command::Ingest(args) => commands::ingest::run(&globals, args), + Command::McpServer(args) => commands::mcp_server::run(&globals, args), } } diff --git a/crates/relayburn-cli/tests/smoke.rs b/crates/relayburn-cli/tests/smoke.rs index 338af0cf..90dd191a 100644 --- a/crates/relayburn-cli/tests/smoke.rs +++ b/crates/relayburn-cli/tests/smoke.rs @@ -36,16 +36,12 @@ const SUBCOMMANDS: &[&str] = &[ /// Subcommands that still print "not yet implemented" when invoked /// without args. Wave 2 D1 wired up `summary` and `hotspots`, D2 wired -/// up `overhead`, D3 wired up `compare`, and D4 wired up `state` as -/// real presenters, so they're excluded from the stub-mode tripwire -/// below. The remaining entries are owned by sibling Wave 2 PRs. As -/// each Wave 2 D1–D8 PR wires its presenter, drop the command from this -/// list — the missing entries fall under a more targeted assertion (see -/// `compare_command_rejects_missing_models` below for an example). +/// up `overhead`, D3 wired up `compare`, D4 wired up `state`, and D8 +/// wired up `ingest` + `mcp-server` as real presenters, so they're +/// excluded from the stub-mode tripwire below. The remaining entries +/// are owned by sibling Wave 2 PRs (D5 owns `run`). const UNIMPLEMENTED_SUBCOMMANDS: &[&str] = &[ "run", - "ingest", - "mcp-server", ]; /// Helper: build a `Command` driving the locally-built `burn` binary. From 3ed58fa8be91f2605fc3a65d77b84ab1f0c7904a Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Wed, 6 May 2026 20:34:18 -0400 Subject: [PATCH 2/2] relayburn-cli: address PR #319 review (rebase + stdout report + --quiet hook scope) - Merge origin/main (D6 codex landed); CHANGELOG additive, registry/mod auto-merged with codex slot from main. - Codex P2: route burn ingest one-shot summary to stdout (matches TS runIngestOnce at packages/cli/src/commands/ingest.ts:121-126); --watch banner + --hook breadcrumbs stay on stderr. - CodeRabbit Minor: scope --quiet to --hook via clap requires = "hook" (mirrors the --reingest requires = "force" pattern from #313). Standalone --quiet and --watch --quiet are now rejected at parse time. --- crates/relayburn-cli/src/cli.rs | 5 +-- crates/relayburn-cli/src/commands/ingest.rs | 36 +++++++++++++-------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/crates/relayburn-cli/src/cli.rs b/crates/relayburn-cli/src/cli.rs index b73c3786..e8b2d364 100644 --- a/crates/relayburn-cli/src/cli.rs +++ b/crates/relayburn-cli/src/cli.rs @@ -159,8 +159,9 @@ pub struct IngestArgs { /// Suppress non-error stderr breadcrumbs. Used by hook callers so /// the surrounding tool invocation isn't blocked by a noisy - /// pipeline. - #[arg(long)] + /// pipeline. Only meaningful with `--hook`; clap rejects `--quiet` + /// on its own (or with `--watch`) so a typo can't silently no-op. + #[arg(long, requires = "hook")] pub quiet: bool, } diff --git a/crates/relayburn-cli/src/commands/ingest.rs b/crates/relayburn-cli/src/commands/ingest.rs index 8c76f1a1..7b42060f 100644 --- a/crates/relayburn-cli/src/commands/ingest.rs +++ b/crates/relayburn-cli/src/commands/ingest.rs @@ -23,10 +23,14 @@ //! hook payload. //! //! Output shape: every successful run writes a single -//! `[burn] ingest: ingested N session(s) (+M turn(s))` line on stderr -//! (matching TS) so callers wrapping the binary can grep it without -//! parsing JSON. `--quiet` suppresses the line when the report is -//! empty; non-empty runs still log to stderr for parity with TS. +//! `[burn] ingest: ingested N session(s) (+M turn(s))` line. The +//! one-shot path emits it on **stdout** so pipelines can capture the +//! summary (matching the TS `runIngestOnce` source-of-truth at +//! `packages/cli/src/commands/ingest.ts:121-126`); `--watch` and +//! `--hook` modes log on **stderr** so the foreground banner / hook +//! breadcrumbs don't pollute downstream stdout consumers. `--quiet` +//! (only valid with `--hook`) suppresses the hook breadcrumb when the +//! report is empty. use std::io::{self, Read}; use std::sync::Arc; @@ -71,7 +75,13 @@ pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 { /// One-shot scan: open the ledger, run a single `ingest_all`, log the /// summary, exit. Drives a current-thread tokio runtime so the otherwise /// sync presenter can drive the async SDK verb. +/// +/// Summary line is emitted on **stdout** (matching TS `runIngestOnce` +/// at `packages/cli/src/commands/ingest.ts:121-126`) so callers can +/// capture pipeline output without redirecting stderr. fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 { + let _ = quiet; // `--quiet` is hook-only (clap `requires = "hook"`); kept in + // the dispatch signature for symmetry with run_watch / run_hook. let mut handle = match open_handle(globals) { Ok(h) => h, Err(err) => return report_error(&err, globals), @@ -86,7 +96,7 @@ fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 { let opts = RawIngestOptions::default(); match rt.block_on(ingest_all(handle.raw_mut(), &opts)) { Ok(report) => { - log_report(&report, quiet); + log_report_oneshot(&report); 0 } Err(err) => report_error(&err, globals), @@ -289,14 +299,14 @@ fn render_ingest_line(report: &IngestReport) -> String { ) } -/// Log the canonical `[burn] ingest: ...` line on stderr. Suppressed in -/// `--quiet` mode unless work was done — empty zero-progress reports -/// would otherwise be noise on every invocation. -fn log_report(report: &IngestReport, quiet: bool) { - if quiet && report.appended_turns == 0 { - return; - } - eprint!("{}", render_ingest_line(report)); +/// Log the canonical `[burn] ingest: ...` line on **stdout** for the +/// one-shot path. TS source of truth: `runIngestOnce` at +/// `packages/cli/src/commands/ingest.ts:121-126` writes the rendered +/// report via `process.stdout.write`, so pipelines that capture stdout +/// see the summary. `--watch` and `--hook` keep their own stderr +/// emitters (`render_ingest_line` is the shared formatter). +fn log_report_oneshot(report: &IngestReport) { + print!("{}", render_ingest_line(report)); } /// Read all of stdin into a String. Returns empty string when stdin is