Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ All notable changes to `@relayburn/cli`.

- `burn --version` (and `-v` / `burn version`) prints the installed CLI version.

### Fixed

- `burn hotspots` no longer hangs silently after `read N turns`. The attribution path was issuing one `queryUserTurns({sessionId})` call per session, each of which streamed the entire ledger.jsonl from disk; on a 7-day / 190MB / 169-session ledger this stalled for minutes with no spinner. Replaced with a single ledger pass + in-memory bucket (narrowed by the active `--since` / `--source` so peak memory stays bounded on long historical ledgers) and added a progress task. The `--patterns` user-turn loader had the same shape and is fixed too.

## [1.2.2] - 2026-04-30

### Removed
Expand Down
106 changes: 106 additions & 0 deletions packages/cli/src/commands/hotspots-bulk-load.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { strict as assert } from 'node:assert';
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import * as path from 'node:path';
import { after, beforeEach, describe, it } from 'node:test';

import {
__resetIndexCacheForTesting,
appendUserTurns,
} from '@relayburn/ledger';
import type { UserTurnRecord } from '@relayburn/reader';

import { bulkUserTurnsBySession } from './hotspots.js';

function fakeUserTurn(overrides: Partial<UserTurnRecord> = {}): UserTurnRecord {
return {
v: 1,
source: 'claude-code',
sessionId: 's-x',
userUuid: 'uu-1',
ts: '2026-04-20T00:00:00.000Z',
blocks: [],
...overrides,
};
}

describe('bulkUserTurnsBySession — query narrowing (#214 follow-up)', () => {
let tmpHome: string;
let tmpRelay: string;
const originalHome = process.env['HOME'];
const originalRelay = process.env['RELAYBURN_HOME'];

beforeEach(async () => {
tmpHome = await mkdtemp(path.join(tmpdir(), 'burn-bulk-home-'));
tmpRelay = await mkdtemp(path.join(tmpdir(), 'burn-bulk-relay-'));
process.env['HOME'] = tmpHome;
process.env['RELAYBURN_HOME'] = tmpRelay;
__resetIndexCacheForTesting();
});

after(async () => {
if (originalHome !== undefined) process.env['HOME'] = originalHome;
else delete process.env['HOME'];
if (originalRelay !== undefined) process.env['RELAYBURN_HOME'] = originalRelay;
else delete process.env['RELAYBURN_HOME'];
await rm(tmpHome, { recursive: true, force: true });
await rm(tmpRelay, { recursive: true, force: true });
});

it('forwards q.since to queryUserTurns so historical user turns are not buffered', async () => {
// Three sessions, each with one ancient user turn (2024) and one recent
// user turn (2026-04-20). With `since: 2026-04-01`, only the recent turns
// should appear in the result map — the ancient ones must be filtered
// during streaming, not after, otherwise long historical ledgers blow
// memory on a small recent window.
const sessions = ['s-1', 's-2', 's-3'];
const records: UserTurnRecord[] = [];
for (const sessionId of sessions) {
records.push(
fakeUserTurn({
sessionId,
userUuid: `${sessionId}-old`,
ts: '2024-01-01T00:00:00.000Z',
}),
fakeUserTurn({
sessionId,
userUuid: `${sessionId}-new`,
ts: '2026-04-20T00:00:00.000Z',
}),
);
}
await appendUserTurns(records);

const out = await bulkUserTurnsBySession(new Set(sessions), {
since: '2026-04-01T00:00:00.000Z',
});

assert.equal(out.size, 3);
for (const sessionId of sessions) {
const got = out.get(sessionId);
assert.ok(got, `expected results for ${sessionId}`);
assert.equal(got.length, 1, `expected only the recent turn for ${sessionId}`);
assert.equal(got[0]!.userUuid, `${sessionId}-new`);
}
});

it('drops user turns whose sessionId is outside the requested set', async () => {
await appendUserTurns([
fakeUserTurn({ sessionId: 's-keep', userUuid: 'k1' }),
fakeUserTurn({ sessionId: 's-drop', userUuid: 'd1' }),
]);

const out = await bulkUserTurnsBySession(new Set(['s-keep']));

assert.equal(out.size, 1);
assert.ok(out.has('s-keep'));
assert.equal(out.has('s-drop'), false);
});

it('returns an empty map without touching the ledger when sessionIds is empty', async () => {
// No appendUserTurns call — the ledger file may not exist. Helper must
// short-circuit before any I/O.
const out = await bulkUserTurnsBySession(new Set());
assert.equal(out.size, 0);
});
});
41 changes: 39 additions & 2 deletions packages/cli/src/commands/hotspots.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ async function captureStdio<T>(
}

const EMPTY_DEPS: HotspotsAttributionDeps = {
loadContentForSession: async () => [],
loadUserTurnsForSession: async () => [],
loadContentBySession: async () => new Map(),
loadUserTurnsBySession: async () => new Map(),
};

describe('turnPassesCoverage (#100)', () => {
Expand Down Expand Up @@ -630,6 +630,43 @@ describe('runHotspotsAttribution — partial exclusion (#100)', () => {
});
});

describe('runHotspotsAttribution — bulk session loaders', () => {
it('invokes loadUserTurnsBySession and loadContentBySession once with the eligible session ids, not once per session', async () => {
const pricing = await loadBuiltinPricing();
const goodFidelity = fidelityWith('full', 'per-turn');
const badFidelity = fidelityWith('partial', 'per-turn', { hasToolResultEvents: false });
const turns: EnrichedTurn[] = [
makeTurn({ sessionId: 's-1', messageId: 'a', turnIndex: 0, source: 'claude-code', fidelity: goodFidelity }),
makeTurn({ sessionId: 's-2', messageId: 'b', turnIndex: 0, source: 'claude-code', fidelity: goodFidelity }),
makeTurn({ sessionId: 's-3', messageId: 'c', turnIndex: 0, source: 'claude-code', fidelity: goodFidelity }),
// Excluded — its sessionId must not appear in the eligible-only loader call.
makeTurn({ sessionId: 's-skip', messageId: 'x', turnIndex: 0, source: 'codex', fidelity: badFidelity }),
];

const userCalls: Array<Set<string>> = [];
const contentCalls: Array<Set<string>> = [];
const deps: HotspotsAttributionDeps = {
loadUserTurnsBySession: async (ids) => {
userCalls.push(new Set(ids));
return new Map();
},
loadContentBySession: async (ids) => {
contentCalls.push(new Set(ids));
return new Map();
},
};

const { result } = await captureStdio(() =>
runHotspotsAttribution(args(), turns, pricing, deps),
);
assert.equal(result, 0);
assert.equal(userCalls.length, 1, 'user-turn loader must be called exactly once');
assert.equal(contentCalls.length, 1, 'content loader must be called exactly once');
assert.deepEqual([...userCalls[0]!].sort(), ['s-1', 's-2', 's-3']);
assert.deepEqual([...contentCalls[0]!].sort(), ['s-1', 's-2', 's-3']);
});
});

describe('runPatternsMode — fidelity refusal (#100)', () => {
it('refuses with exit 2 when every turn is below every selected detector\'s prereq', async () => {
const pricing = await loadBuiltinPricing();
Expand Down
133 changes: 106 additions & 27 deletions packages/cli/src/commands/hotspots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,36 @@ export async function runHotspots(args: ParsedArgs): Promise<number> {
return runPatternsMode(args, turns, pricing, compactions, selected, { query: q });
}

return runHotspotsAttribution(args, turns, pricing);
return runHotspotsAttribution(args, turns, pricing, {
// Bind `q` so the bulk user-turn pass narrows by `since`/`source` during
// streaming rather than buffering the entire historical ledger first.
loadUserTurnsBySession: (ids) =>
withProgress('reading user turns for attribution', async (task) => {
const out = await bulkUserTurnsBySession(ids, q);
task.succeed(
`read user turns for ${formatInt(out.size)} session${out.size === 1 ? '' : 's'}`,
);
return out;
}),
});
}

// Exposed for tests so they can drive the orchestration with fixture turns
// and a mocked content/userTurns loader. Production callers go through
// `runHotspots`, which fetches both via the ledger.
//
// Bulk-shaped (Set → Map) rather than per-session to keep the production
// `queryUserTurns` path to a single ledger pass: the per-session form
// `queryUserTurns({sessionId})` streams the entire ledger.jsonl on every
// call, which on a 7-day slice with hundreds of sessions adds minutes of
// silent disk I/O after `read N turns`.
export interface HotspotsAttributionDeps {
loadContentForSession?: (sessionId: string) => Promise<ContentRecord[]>;
loadUserTurnsForSession?: (sessionId: string) => Promise<UserTurnRecord[]>;
loadContentBySession?: (
sessionIds: Set<string>,
) => Promise<Map<string, ContentRecord[]>>;
loadUserTurnsBySession?: (
sessionIds: Set<string>,
) => Promise<Map<string, UserTurnRecord[]>>;
}

export async function runHotspotsAttribution(
Expand Down Expand Up @@ -304,22 +325,12 @@ export async function runHotspotsAttribution(
return 2;
}

const loadContent =
deps.loadContentForSession ??
((sessionId: string) => readContent({ sessionId }));
const loadUserTurns =
deps.loadUserTurnsForSession ??
((sessionId: string) => queryUserTurns({ sessionId }));
const loadContent = deps.loadContentBySession ?? defaultLoadContentBySession;
const loadUserTurns = deps.loadUserTurnsBySession ?? defaultLoadUserTurnsBySession;

const sessionIds = new Set(eligible.map((t) => t.sessionId));
const contentBySession = new Map<string, ContentRecord[]>();
const userTurnsBySession = new Map<string, UserTurnRecord[]>();
for (const sessionId of sessionIds) {
const records = await loadContent(sessionId);
if (records.length > 0) contentBySession.set(sessionId, records);
const userTurns = await loadUserTurns(sessionId);
if (userTurns.length > 0) userTurnsBySession.set(sessionId, userTurns);
}
const userTurnsBySession = await loadUserTurns(sessionIds);
const contentBySession = await loadContent(sessionIds);

const result = attributeHotspots(eligible, {
pricing,
Expand Down Expand Up @@ -763,7 +774,7 @@ export async function runPatternsMode(
selected.has('opencode-system-prompt') || selected.has('tool-output-bloat');
const userTurnsBySession = needUserTurns
? await withProgress('reading user turns for pattern detectors', async (task) => {
const rows = await loadUserTurnsBySession(perDetector);
const rows = await userTurnsForPatternDetectors(perDetector, deps.query);
task.succeed(`read user turns for ${formatInt(rows.size)} session${rows.size === 1 ? '' : 's'}`);
return rows;
})
Expand All @@ -778,7 +789,7 @@ export async function runPatternsMode(
const needContent = enrichableDetectors.some((d) => selected.has(d));
const contentBySession = needContent
? await withProgress('reading content for pattern detectors', async (task) => {
const rows = await loadContentBySession(perDetector, enrichableDetectors);
const rows = await contentForPatternDetectors(perDetector, enrichableDetectors);
task.succeed(`read content for ${formatInt(rows.size)} session${rows.size === 1 ? '' : 's'}`);
return rows;
})
Expand Down Expand Up @@ -1436,28 +1447,96 @@ function renderEditHeavyTable(
return table(rows);
}

async function loadUserTurnsBySession(
perDetector: Map<PatternKind, EnrichedTurn[]>,
// One ledger pass + in-memory bucket. The per-session form
// `queryUserTurns({sessionId})` re-streams the entire ledger.jsonl on every
// call, so issuing it once per session costs O(sessions × ledger-size). Used
// by both the attribution loader and the patterns helper below.
//
// `q.since` / `q.source` are forwarded so the streaming filter narrows the
// in-memory buffer to the same window the eligible turns live in. This is
// safe because the user turn that carries tool_results for an eligible
// assistant turn arrives immediately after it: `userTurn.ts >= turn.ts`,
// so any user turn whose blocks join an eligible turn also passes the same
// `since` cutoff. We deliberately do NOT pass `q.until` — a user turn may
// lag a few seconds past a hard until cutoff while still carrying the
// tool_results for the last eligible turn — and we do NOT pass `sessionId`
// (defeats the bulk call) or `project` (per `userTurnPasses`, it does not
// filter user turns).
export async function bulkUserTurnsBySession(
sessionIds: Set<string>,
q: Query = {},
): Promise<Map<string, UserTurnRecord[]>> {
const sessionIds = new Set<string>();
for (const turns of perDetector.values()) {
for (const t of turns) sessionIds.add(t.sessionId);
const out = new Map<string, UserTurnRecord[]>();
if (sessionIds.size === 0) return out;
const filter: Query = {};
if (q.since !== undefined) filter.since = q.since;
if (q.source !== undefined) filter.source = q.source;
const all = await queryUserTurns(filter);
for (const ut of all) {
if (!sessionIds.has(ut.sessionId)) continue;
const list = out.get(ut.sessionId);
if (list) list.push(ut);
else out.set(ut.sessionId, [ut]);
}
return out;
}

// Fallback for callers that don't supply a loader. Per-session form so peak
// memory stays bounded on long historical ledgers; the production path
// (`runHotspots`) overrides this with a q-bound bulk loader that issues a
// single ledger pass narrowed to the current `since`/`source` window.
async function defaultLoadUserTurnsBySession(
sessionIds: Set<string>,
): Promise<Map<string, UserTurnRecord[]>> {
const out = new Map<string, UserTurnRecord[]>();
for (const sessionId of sessionIds) {
const userTurns = await queryUserTurns({ sessionId });
if (userTurns.length > 0) out.set(sessionId, userTurns);
const rows = await queryUserTurns({ sessionId });
if (rows.length > 0) out.set(sessionId, rows);
}
return out;
}

// Default loader for content sidecars. Each sidecar is its own file under
// ~/.relayburn/content/, so this stays per-session — just wrapped in a
// progress task with a periodic counter so a long loop doesn't look frozen.
async function defaultLoadContentBySession(
sessionIds: Set<string>,
): Promise<Map<string, ContentRecord[]>> {
return withProgress('reading content sidecars', async (task) => {
const out = new Map<string, ContentRecord[]>();
const total = sessionIds.size;
let i = 0;
for (const sessionId of sessionIds) {
i++;
task.update(`reading content sidecars (${formatInt(i)}/${formatInt(total)})`);
const records = await readContent({ sessionId });
if (records.length > 0) out.set(sessionId, records);
}
task.succeed(
`read content for ${formatInt(out.size)} session${out.size === 1 ? '' : 's'}`,
);
return out;
});
}

async function userTurnsForPatternDetectors(
perDetector: Map<PatternKind, EnrichedTurn[]>,
q?: Query,
): Promise<Map<string, UserTurnRecord[]>> {
const sessionIds = new Set<string>();
for (const turns of perDetector.values()) {
for (const t of turns) sessionIds.add(t.sessionId);
}
return bulkUserTurnsBySession(sessionIds, q);
}

// Reads the per-session content sidecar for every session that lands in any
// of the requested detector slices. Sessions whose sidecar is empty (content
// store is hash-only / off, or content was pruned) are silently omitted —
// `detectPatterns` keys enrichment off the map being non-empty per session,
// so the absent entry yields the graceful-degradation behavior the
// enrichment layer promises.
async function loadContentBySession(
async function contentForPatternDetectors(
perDetector: Map<PatternKind, EnrichedTurn[]>,
detectors: PatternKind[],
): Promise<Map<string, ContentRecord[]>> {
Expand Down