diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 16080ad..892552d 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `burn archive status` (text + `--json`) now reports `tool_result_events` row counts alongside sessions / turns / tool_calls / compactions, and `burn archive build` / `rebuild` summary lines include the count of tool-result events materialized this run (#101). + ## [0.24.0] - 2026-04-26 ### Added diff --git a/packages/cli/src/commands/archive.test.ts b/packages/cli/src/commands/archive.test.ts index f808ada..845b3f6 100644 --- a/packages/cli/src/commands/archive.test.ts +++ b/packages/cli/src/commands/archive.test.ts @@ -4,8 +4,8 @@ import { tmpdir } from 'node:os'; import * as path from 'node:path'; import { after, beforeEach, describe, it } from 'node:test'; -import { appendTurns, stamp } from '@relayburn/ledger'; -import type { TurnRecord } from '@relayburn/reader'; +import { appendToolResultEvents, appendTurns, stamp } from '@relayburn/ledger'; +import type { ToolResultEventRecord, TurnRecord } from '@relayburn/reader'; import { runArchive } from './archive.js'; @@ -134,6 +134,38 @@ describe('burn archive CLI', () => { assert.equal(afterStatus.rowCounts.turns, beforeStatus.rowCounts.turns); }); + it('archive status --json surfaces tool_result_events row count', async () => { + await appendTurns([fakeTurn({ sessionId: 's-tre-cli', messageId: 'tre-cli-1' })]); + const events: ToolResultEventRecord[] = [ + { + v: 1, + source: 'claude-code', + sessionId: 's-tre-cli', + messageId: 'tre-cli-1', + toolUseId: 'tu-cli-1', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:01.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 7, + contentHash: 'cli-h1', + }, + ]; + await appendToolResultEvents(events); + await captureRun({}, ['build']); + + const statusOut = await captureRun({ json: true }, ['status']); + assert.equal(statusOut.code, 0); + const parsed = JSON.parse(statusOut.stdout) as { + rowCounts: { toolResultEvents: number }; + }; + assert.equal(parsed.rowCounts.toolResultEvents, 1); + + const human = await captureRun({}, ['status']); + assert.match(human.stdout, /tool_result_events:\s+1/); + }); + it('archive status --json surfaces a fidelity histogram on turns (#110)', async () => { // Use distinctive ts/usage so the writer's content-fingerprint dedup // doesn't collide with prior CLI tests in the same module (the in-memory diff --git a/packages/cli/src/commands/archive.ts b/packages/cli/src/commands/archive.ts index d58cd2a..18c4ede 100644 --- a/packages/cli/src/commands/archive.ts +++ b/packages/cli/src/commands/archive.ts @@ -76,7 +76,8 @@ function printBuildResult( ` ${formatInt(result.turnsApplied)} turn${result.turnsApplied === 1 ? '' : 's'} applied,` + ` ${formatInt(result.sessionsTouched)} session${result.sessionsTouched === 1 ? '' : 's'} touched,` + ` ${formatInt(result.stampsApplied)} stamp${result.stampsApplied === 1 ? '' : 's'},` + - ` ${formatInt(result.compactionsApplied)} compaction${result.compactionsApplied === 1 ? '' : 's'}`, + ` ${formatInt(result.compactionsApplied)} compaction${result.compactionsApplied === 1 ? '' : 's'},` + + ` ${formatInt(result.toolResultEventsApplied)} tool-result event${result.toolResultEventsApplied === 1 ? '' : 's'}`, ); lines.push(` ${formatInt(result.scannedBytes)} bytes scanned from ledger tail`); process.stdout.write(lines.join('\n') + '\n'); @@ -104,10 +105,11 @@ async function runStatus(args: ParsedArgs): Promise { if (status.lastBuiltAt) lines.push(` last build: ${status.lastBuiltAt}`); if (status.lastRebuildAt) lines.push(` last rebuild: ${status.lastRebuildAt}`); lines.push(' rows:'); - lines.push(` sessions: ${formatInt(status.rowCounts.sessions)}`); - lines.push(` turns: ${formatInt(status.rowCounts.turns)}`); - lines.push(` tool_calls: ${formatInt(status.rowCounts.toolCalls)}`); - lines.push(` compactions: ${formatInt(status.rowCounts.compactions)}`); + lines.push(` sessions: ${formatInt(status.rowCounts.sessions)}`); + lines.push(` turns: ${formatInt(status.rowCounts.turns)}`); + lines.push(` tool_calls: ${formatInt(status.rowCounts.toolCalls)}`); + lines.push(` tool_result_events: ${formatInt(status.rowCounts.toolResultEvents)}`); + lines.push(` compactions: ${formatInt(status.rowCounts.compactions)}`); process.stdout.write(lines.join('\n') + '\n'); return 0; } diff --git a/packages/ledger/CHANGELOG.md b/packages/ledger/CHANGELOG.md index 896bdf4..c79ae47 100644 --- a/packages/ledger/CHANGELOG.md +++ b/packages/ledger/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`tool_result_events` archive table is now populated** (#101). `buildArchive()` materializes `ToolResultEventLine` ledger lines into the previously-empty `tool_result_events` table during incremental builds, keyed on (`source`, `session_id`, `message_id`, `tool_use_id`, `event_index`). Columns mirror the canonical `ToolResultEventRecord`: `status`, `content_length`, `content_hash`, `subagent_session_id`, `agent_id`, `event_source`, `ts`, `call_index`. Cursor uses the same `archive_state.ledger_offset_bytes` as turns / stamps / compactions, so no parallel cursor and no extra disk read. `rebuildArchive()` replays tool-result events alongside the other line kinds and yields the same row count deterministically. New `idx_tool_result_events_use_id` / `_session` / `_subagent` indexes for the obvious join paths. `BuildResult.toolResultEventsApplied` and `ArchiveStatus.rowCounts.toolResultEvents` expose the new counts. Closes #101, refs #40, #42, #77. + +### Changed + +- **Bumped `ARCHIVE_VERSION` to 2** to pick up the new `tool_result_events` indexes on existing archives. The next `buildArchive()` call detects the version mismatch and rebuilds from scratch — safe because the archive is derived state. + ## [0.24.0] - 2026-04-26 ### Added diff --git a/packages/ledger/src/archive.test.ts b/packages/ledger/src/archive.test.ts index 24220d7..2830655 100644 --- a/packages/ledger/src/archive.test.ts +++ b/packages/ledger/src/archive.test.ts @@ -5,10 +5,15 @@ import * as path from 'node:path'; import { DatabaseSync } from 'node:sqlite'; import { after, before, beforeEach, describe, it } from 'node:test'; -import type { TurnRecord } from '@relayburn/reader'; +import type { ToolResultEventRecord, TurnRecord } from '@relayburn/reader'; import { __resetIndexCacheForTesting } from './index-sidecar.js'; -import { appendCompactions, appendTurns, stamp } from './writer.js'; +import { + appendCompactions, + appendToolResultEvents, + appendTurns, + stamp, +} from './writer.js'; import { archivePath, ledgerPath } from './paths.js'; import { ARCHIVE_VERSION, @@ -269,6 +274,269 @@ describe('archive', () => { } }); + it('round-trips tool_result_event lines from ledger to archive', async () => { + await appendTurns([fakeTurn({ sessionId: 's-tre', messageId: 'mtre-1' })]); + const events: ToolResultEventRecord[] = [ + { + v: 1, + source: 'claude-code', + sessionId: 's-tre', + messageId: 'mtre-1', + toolUseId: 'tu-A', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:01.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 1234, + contentHash: 'abc123', + agentId: 'agent-X', + }, + { + v: 1, + source: 'claude-code', + sessionId: 's-tre', + messageId: 'mtre-1', + toolUseId: 'tu-B', + callIndex: 1, + eventIndex: 1, + ts: '2026-04-20T00:00:02.000Z', + status: 'errored', + eventSource: 'tool_result', + contentLength: 0, + contentHash: 'def456', + isError: true, + }, + ]; + await appendToolResultEvents(events); + const result = await buildArchive(); + assert.equal(result.toolResultEventsApplied, 2); + + const status = await getArchiveStatus(); + assert.equal(status.rowCounts.toolResultEvents, 2); + + const db = await openArchive(); + try { + const rows = db + .prepare( + `SELECT source, session_id, message_id, tool_use_id, call_index, + event_index, status, content_length, content_hash, is_error, + agent_id, event_source, ts + FROM tool_result_events ORDER BY event_index`, + ) + .all() as Array<{ + source: string; + session_id: string; + message_id: string; + tool_use_id: string; + call_index: number | bigint; + event_index: number | bigint; + status: string; + content_length: number | bigint | null; + content_hash: string | null; + is_error: number | bigint | null; + agent_id: string | null; + event_source: string; + ts: string | null; + }>; + assert.equal(rows.length, 2); + const r0 = rows[0]!; + assert.equal(r0.source, 'claude-code'); + assert.equal(r0.session_id, 's-tre'); + assert.equal(r0.message_id, 'mtre-1'); + assert.equal(r0.tool_use_id, 'tu-A'); + assert.equal(Number(r0.call_index), 0); + assert.equal(Number(r0.event_index), 0); + assert.equal(r0.status, 'completed'); + assert.equal(Number(r0.content_length), 1234); + assert.equal(r0.content_hash, 'abc123'); + assert.equal(r0.agent_id, 'agent-X'); + assert.equal(r0.event_source, 'tool_result'); + assert.equal(r0.ts, '2026-04-20T00:00:01.000Z'); + assert.equal(r0.is_error, null); + const r1 = rows[1]!; + assert.equal(r1.tool_use_id, 'tu-B'); + assert.equal(r1.status, 'errored'); + assert.equal(Number(r1.call_index), 1); + assert.equal(Number(r1.is_error), 1); + } finally { + db.close(); + } + }); + + it('rebuilds tool_result_events from zero deterministically (idempotent)', async () => { + await appendTurns([fakeTurn({ sessionId: 's-rb-tre', messageId: 'mrb-tre-1' })]); + const events: ToolResultEventRecord[] = Array.from({ length: 5 }, (_, i) => ({ + v: 1, + source: 'claude-code', + sessionId: 's-rb-tre', + messageId: 'mrb-tre-1', + toolUseId: `tu-${i}`, + callIndex: i, + eventIndex: i, + ts: `2026-04-20T00:00:0${i}.000Z`, + status: 'completed', + eventSource: 'tool_result', + contentLength: 100 * i, + contentHash: `h${i}`, + })); + await appendToolResultEvents(events); + await buildArchive(); + const before = await getArchiveStatus(); + assert.equal(before.rowCounts.toolResultEvents, 5); + + // Rebuild from zero replays the same lines and yields the same row count. + const rb = await rebuildArchive(); + assert.equal(rb.rebuiltFromZero, true); + assert.equal(rb.toolResultEventsApplied, 5); + const after = await getArchiveStatus(); + assert.equal(after.rowCounts.toolResultEvents, 5); + + // A second build with no new ledger writes is a clean no-op. + const noop = await buildArchive(); + assert.equal(noop.toolResultEventsApplied, 0); + const noopStatus = await getArchiveStatus(); + assert.equal(noopStatus.rowCounts.toolResultEvents, 5); + }); + + it('materializes mixed-source tool_result events (claude-code, codex, opencode)', async () => { + // Synthesize events from each supported source so the archive table + // doesn't accidentally lock to a single source's column shape. + const events: ToolResultEventRecord[] = [ + { + v: 1, + source: 'claude-code', + sessionId: 's-cc', + messageId: 'mcc-1', + toolUseId: 'tu-cc', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:00.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 10, + contentHash: 'cc', + }, + { + v: 1, + source: 'codex', + sessionId: 's-cx', + messageId: 'mcx-1', + toolUseId: 'call-cx', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:01.000Z', + status: 'completed', + eventSource: 'function_call_output', + contentLength: 20, + contentHash: 'cx', + }, + { + v: 1, + source: 'opencode', + sessionId: 's-oc', + messageId: 'moc-1', + toolUseId: 'callid-oc', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:02.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 30, + contentHash: 'oc', + }, + ]; + await appendToolResultEvents(events); + await buildArchive(); + const db = await openArchive(); + try { + const rows = db + .prepare( + 'SELECT DISTINCT source FROM tool_result_events ORDER BY source', + ) + .all() as Array<{ source: string }>; + assert.deepEqual( + rows.map((r) => r.source), + ['claude-code', 'codex', 'opencode'], + ); + const total = (db + .prepare('SELECT COUNT(*) AS n FROM tool_result_events') + .get() as { n: number | bigint }).n; + assert.equal(Number(total), 3); + } finally { + db.close(); + } + }); + + it('tool_result_events: incremental tail picks up newly-appended events', async () => { + await appendToolResultEvents([ + { + v: 1, + source: 'claude-code', + sessionId: 's-tail-tre', + messageId: 'mtt-1', + toolUseId: 'tu-1', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:00.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 1, + contentHash: 'h1', + }, + ]); + await buildArchive(); + const status1 = await getArchiveStatus(); + assert.equal(status1.rowCounts.toolResultEvents, 1); + + await appendToolResultEvents([ + { + v: 1, + source: 'claude-code', + sessionId: 's-tail-tre', + messageId: 'mtt-1', + toolUseId: 'tu-2', + callIndex: 1, + eventIndex: 1, + ts: '2026-04-20T00:00:01.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 2, + contentHash: 'h2', + }, + ]); + const tail = await buildArchive(); + assert.equal(tail.toolResultEventsApplied, 1); + const status2 = await getArchiveStatus(); + assert.equal(status2.rowCounts.toolResultEvents, 2); + assert.equal(status2.upToDate, true); + }); + + it('tool_result_events: PK is dedup-safe — replaying the same event is a no-op upsert', async () => { + const ev: ToolResultEventRecord = { + v: 1, + source: 'claude-code', + sessionId: 's-dup', + messageId: 'mdup-1', + toolUseId: 'tu-dup', + callIndex: 0, + eventIndex: 0, + ts: '2026-04-20T00:00:00.000Z', + status: 'completed', + eventSource: 'tool_result', + contentLength: 99, + contentHash: 'h-dup', + }; + await appendToolResultEvents([ev]); + await buildArchive(); + + // rebuildArchive replays the entire ledger — including the same tool + // result event line — and must land exactly one row. + await rebuildArchive(); + const status = await getArchiveStatus(); + assert.equal(status.rowCounts.toolResultEvents, 1); + }); + it('archive lives at RELAYBURN_HOME/archive.sqlite', async () => { await buildArchive(); const expected = path.join(tmpDir, 'archive.sqlite'); diff --git a/packages/ledger/src/archive.ts b/packages/ledger/src/archive.ts index b7fd963..ca1eef4 100644 --- a/packages/ledger/src/archive.ts +++ b/packages/ledger/src/archive.ts @@ -43,11 +43,13 @@ import { archivePath, ledgerPath } from './paths.js'; import { isCompactionLine, isStampLine, + isToolResultEventLine, isTurnLine, stampMatches, type CompactionLine, type Enrichment, type StampLine, + type ToolResultEventLine, type TurnLine, } from './schema.js'; @@ -56,7 +58,7 @@ import { * statement below changes shape; the next `buildArchive()` call will detect * the mismatch in `archive_state.archive_version` and rebuild from scratch. */ -export const ARCHIVE_VERSION = 1; +export const ARCHIVE_VERSION = 2; /** * SQL statements that materialize the read model. These are intentionally @@ -67,7 +69,11 @@ export const ARCHIVE_VERSION = 1; * - sessions: one row per (source, sessionId) * - turns: one row per ingested TurnRecord, with stamps folded in * - tool_calls: one row per ToolCall attached to a turn - * - tool_result_events: reserved for future content-sidecar bridging (#33) + * - tool_result_events: chronological tool-output / terminal-status events + * materialized from `ToolResultEventLine` ledger lines (#101 / #42 / #77). + * `content_length` / `content_hash` come straight from the canonical + * record; the content sidecar (#33) carries the raw bytes when callers + * need them. * - archive_state: incremental build cursor + schema version */ const SCHEMA_SQL = ` @@ -174,8 +180,12 @@ CREATE TABLE IF NOT EXISTS tool_calls ( CREATE INDEX IF NOT EXISTS idx_tool_calls_name ON tool_calls(tool_name); CREATE INDEX IF NOT EXISTS idx_tool_calls_use_id ON tool_calls(tool_use_id); --- Reserved for the content-sidecar bridge (#33) and the execution-graph work. --- Created now so consumers can stably reference the table, populated later. +-- Materialized from ToolResultEventLine (execution-graph #42 / #77) -- one +-- row per chronological tool_result / terminal-status / progress event keyed +-- on (source, session_id, message_id, tool_use_id, event_index). The content +-- sidecar (#33) holds the raw bytes; this table carries metadata only +-- (content_length, content_hash) so analyses can group / dedupe without +-- loading sidecar JSONL. CREATE TABLE IF NOT EXISTS tool_result_events ( source TEXT NOT NULL, session_id TEXT NOT NULL, @@ -186,6 +196,7 @@ CREATE TABLE IF NOT EXISTS tool_result_events ( status TEXT, content_length INTEGER, content_hash TEXT, + is_error INTEGER, subagent_session_id TEXT, agent_id TEXT, event_source TEXT, @@ -193,6 +204,10 @@ CREATE TABLE IF NOT EXISTS tool_result_events ( PRIMARY KEY (source, session_id, message_id, tool_use_id, event_index) ); +CREATE INDEX IF NOT EXISTS idx_tool_result_events_use_id ON tool_result_events(tool_use_id); +CREATE INDEX IF NOT EXISTS idx_tool_result_events_session ON tool_result_events(source, session_id); +CREATE INDEX IF NOT EXISTS idx_tool_result_events_subagent ON tool_result_events(subagent_session_id); + CREATE TABLE IF NOT EXISTS compactions ( source TEXT NOT NULL, session_id TEXT NOT NULL, @@ -227,6 +242,7 @@ export interface ArchiveStatus { sessions: number; turns: number; toolCalls: number; + toolResultEvents: number; compactions: number; }; /** @@ -249,6 +265,8 @@ export interface BuildResult { stampsApplied: number; /** Compaction events ingested. */ compactionsApplied: number; + /** Tool-result-event lines materialized into `tool_result_events`. */ + toolResultEventsApplied: number; /** True iff the archive was rebuilt from zero before this build. */ rebuiltFromZero: boolean; } @@ -413,6 +431,7 @@ async function buildArchiveLocked(opts: { isRebuild?: boolean } = {}): Promise 0) { + // INSERT OR REPLACE keyed on the table's PK (source, session_id, + // message_id, tool_use_id, event_index). The execution-graph record + // already carries `contentLength` and `contentHash`; if a future + // record lacks them we still write null and a follow-up can enrich + // from the content sidecar (#33). The table allows NULL for both. + const insertToolResultEvent = db.prepare(` + INSERT OR REPLACE INTO tool_result_events ( + source, session_id, message_id, tool_use_id, call_index, + event_index, status, content_length, content_hash, is_error, + subagent_session_id, agent_id, event_source, ts + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + for (const tre of toolResultEventLines) { + const r = tre.record; + insertToolResultEvent.run( + r.source, + r.sessionId, + // PK requires NOT NULL message_id; some event sources (queue events, + // subagent notifications) don't carry one. Substitute the empty + // string so the row still lands and so two NULL-message_id events + // for the same (source, sessionId, toolUseId, eventIndex) collide + // on the PK (which is the desired idempotent behavior). + r.messageId ?? '', + r.toolUseId, + r.callIndex ?? 0, + r.eventIndex, + r.status, + r.contentLength ?? null, + r.contentHash ?? null, + r.isError === undefined ? null : r.isError ? 1 : 0, + r.subagentSessionId ?? null, + r.agentId ?? null, + r.eventSource, + r.ts ?? null, + ); + } + } + rebuildSessions(db, sessionsTouched); // NOTE: the ledger cursor is intentionally NOT written here. The caller @@ -737,6 +806,7 @@ async function applyLedgerRange( sessionsTouched: distinctSessionIds.size, stampsApplied: newStamps.length, compactionsApplied: compactionLines.length, + toolResultEventsApplied: toolResultEventLines.length, safeOffset, }; } @@ -1009,7 +1079,13 @@ export async function getArchiveStatus(): Promise { upToDate: false, lastBuiltAt: null, lastRebuildAt: null, - rowCounts: { sessions: 0, turns: 0, toolCalls: 0, compactions: 0 }, + rowCounts: { + sessions: 0, + turns: 0, + toolCalls: 0, + toolResultEvents: 0, + compactions: 0, + }, fidelityHistogram: {}, }; } @@ -1039,6 +1115,9 @@ export async function getArchiveStatus(): Promise { const toolCalls = db.prepare('SELECT COUNT(*) AS n FROM tool_calls').get() as { n: number | bigint; }; + const toolResultEvents = db + .prepare('SELECT COUNT(*) AS n FROM tool_result_events') + .get() as { n: number | bigint }; const compactions = db.prepare('SELECT COUNT(*) AS n FROM compactions').get() as { n: number | bigint; }; @@ -1073,6 +1152,7 @@ export async function getArchiveStatus(): Promise { sessions: Number(sessions.n), turns: Number(turns.n), toolCalls: Number(toolCalls.n), + toolResultEvents: Number(toolResultEvents.n), compactions: Number(compactions.n), }, fidelityHistogram,