From 93ae2d7f9d274bb0c4bd6aea043649952013084d Mon Sep 17 00:00:00 2001 From: Lu Nelson Date: Thu, 2 Apr 2026 15:11:11 +0200 Subject: [PATCH 1/2] feat: agent pattern + observer entity persistence (FE-537) Refactors conductTurn into generator composition (D27): interviewer and observer agents are async generators composed via yield*. Shared SDK stream translator in sdk.ts. Observer uses outputFormat for structured JSON extraction (D28). ResultMessage inspection provides per-agent metrics (D29). Observer errors are non-fatal. New: sdk.ts, observer.ts; 147 tests (24 new), all pass. Co-Authored-By: Claude Opus 4.6 (1M context) --- memory/SPEC.md | 8 ++ src/server/app.test.ts | 43 ++++--- src/server/app.ts | 3 + src/server/context.test.ts | 34 ++++++ src/server/context.ts | 26 ++++- src/server/core.test.ts | 41 +++++-- src/server/core.ts | 138 +++++----------------- src/server/db.test.ts | 74 ++++++++++++ src/server/db.ts | 73 ++++++++++++ src/server/interview.test.ts | 27 ++++- src/server/interview.ts | 73 +++++++++++- src/server/observer.test.ts | 202 +++++++++++++++++++++++++++++++++ src/server/observer.ts | 170 +++++++++++++++++++++++++++ src/server/sdk.test.ts | 103 +++++++++++++++++ src/server/sdk.ts | 115 +++++++++++++++++++ src/server/sse-adapter.test.ts | 34 ++++++ src/server/sse-adapter.ts | 23 +++- 17 files changed, 1034 insertions(+), 153 deletions(-) create mode 100644 src/server/observer.test.ts create mode 100644 src/server/observer.ts create mode 100644 src/server/sdk.test.ts create mode 100644 src/server/sdk.ts diff --git a/memory/SPEC.md b/memory/SPEC.md index 03c3c8d0..4ddc4391 100644 --- a/memory/SPEC.md +++ b/memory/SPEC.md @@ -87,9 +87,17 @@ The architecture (layered: db → core → adapters): | A21 | `useChat` `onData` callback reliably bridges to `queryClient.setQueryData` without stale-closure issues — known `onFinish` stale-closure bug (ai-sdk#550) may or may not affect `onData` | medium | D22 | Entity sidebar | Test in slice 6: verify `setQueryData` from `onData` updates sidebar reactively; if stale, use parallel `EventSource` instead | | A22 | AI SDK `UIMessage.parts[]` with custom Data Parts (typed via `dataPartsSchema`) persisted as JSON on the turn table is sufficient for faithful UI resume — no separate `turn_message` table needed for current scope | **validated** | D23, D24 | Parts persistence | Validated: parts assembler converts DomainEvents to typed parts, round-trips through JSON persistence (I18). Client hydration from parts deferred to 4b (outer-loop). | | A23 | Custom Data Parts for structured user input (option selection, confirmation) can replace scalar `turn.answer` as the primary user-response model without breaking `formatHistory()` or observer context | **validated** | D24 | Parts persistence | Validated: Data Part schemas defined with Zod (I17), context builders read scalars not parts (I19), structured user input round-trip tested. Full UI wiring deferred to 4b. | +| A24 | SDK `outputFormat` with JSON schema produces equally reliable entity extraction as MCP tool-based extraction — structurally simpler (one API call, no tool round-trip), schema validation built into SDK response via `structured_output` field on `SDKResultMessage` | high | D28 | Observer agent | Validate in slice 5: compare extraction quality with outputFormat vs spike's MCP tool approach. If outputFormat produces malformed or lower-quality extraction, fall back to MCP tool pattern | +| A25 | `SDKResultMessage` provides accurate `duration_ms`, `total_cost_usd`, and `usage` for per-agent observability — types confirmed in TS SDK (`SDKResultSuccess`, `SDKResultError`) | high | D29 | Observer agent | Validate in slice 5: inspect ResultMessage after query() iteration, confirm fields are populated | ## Decisions +27. **Agent module pattern — generator composition** — Each agent (interviewer, observer, future phase agents) is an async generator function yielding `DomainEvent`s. `conductTurn()` is a thin sequencer composing agents via `yield*`. No wrapper around `query()` — each agent calls the SDK directly with whatever options it needs (`outputFormat`, `effort`, `mcpServers`, etc.). A shared `translateStreamEvents()` utility in `sdk.ts` maps SDK `stream_event` messages to DomainEvents; streaming agents use it, silent agents don't. File layout: `interviewer.ts` (evolves from `interview.ts`), `observer.ts` (new), `sdk.ts` (new). Research: `docs/research/claude-agent-sdk-cookbook-patterns-vs-brunch-usage.md`. Depends on: D19. Supersedes: monolithic `conductTurn()` with inline `query()` call and stream parsing. + +28. **Observer uses `outputFormat` (structured JSON output)** — The observer agent returns extracted entities via SDK `outputFormat` with a Zod-derived JSON schema, not via MCP tools. The SDK validates the response and places the parsed result in `SDKResultMessage.structured_output`. This is simpler than tool-based extraction (one API call, no tool round-trip) and better suited to the observer's pure-extraction job (no side effects during the call). The interviewer retains MCP tools because `ask_question` has side effects (DB writes during the call). Depends on: A24. Supersedes: MCP tool-based observer extraction from spike. + +29. **`ResultMessage` inspection for agent observability** — After each `query()` call, the agent inspects `SDKResultMessage` for `duration_ms`, `duration_api_ms`, `total_cost_usd`, and `usage`. Emitted as `agent-metrics` DomainEvent. Primary use: validate A4 (observer latency) and track cost per turn. Secondary: surface in future debug mode overlay. Depends on: A25. Supersedes: discarding `ResultMessage` (gap #1 in cookbook research). + 26. **`md-pen` for programmatic markdown rendering** — Structured data (entity tables, dependency graphs, checklists) rendered to markdown via `md-pen` rather than hand-rolled string concatenation. Pure string-return functions (`table()`, `taskList()`, `mermaid()`, `heading()`, `alert()`, `details()`) compose by nesting — no AST, no intermediate representation. Escaping is context-aware per function (table cells, URLs, code fences), eliminating a class of bugs when rendering user-supplied text from interviews. Primary use cases: (1) observer context builders presenting growing entity graphs to agents (`table()` for decisions/assumptions with metadata, `taskList()` for reviewed/unreviewed items), (2) spec export rendering active-path entities into downloadable markdown (slice 13), (3) any future agent-facing or user-facing projection of structured data. Zero dependencies, ESM-only, TypeScript-first. Depends on: —. Supersedes: hand-rolled string assembly in context builders. ### Domain model diff --git a/src/server/app.test.ts b/src/server/app.test.ts index 38644737..0bb81233 100644 --- a/src/server/app.test.ts +++ b/src/server/app.test.ts @@ -24,6 +24,23 @@ let db: DB; beforeEach(() => { mockQuery.mockReset(); + // Default: observer gets empty result for any call not covered by mockReturnValueOnce + mockQuery.mockImplementation(() => + makeMockStream([ + { + type: 'result', + subtype: 'success', + duration_ms: 500, + duration_api_ms: 300, + total_cost_usd: 0.0005, + is_error: false, + num_turns: 1, + usage: { input_tokens: 100, output_tokens: 50 }, + result: '', + structured_output: { decisions: [], assumptions: [] }, + }, + ]), + ); const result = createApp(); app = result.app; db = result.db; @@ -138,7 +155,7 @@ describe('GET /api/projects/:id', () => { it('returns turns on active path after a chat exchange', async () => { const projectId = await createTestProject('Chat Test'); - mockQuery.mockReturnValue(mockTextStream('Hi')); + mockQuery.mockReturnValueOnce(mockTextStream('Hi')); await request(app) .post(`/api/projects/${projectId}/chat`) @@ -155,7 +172,7 @@ describe('GET /api/projects/:id', () => { describe('POST /api/projects/:id/chat', () => { it('returns Content-Type text/event-stream', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream()); + mockQuery.mockReturnValueOnce(mockTextStream()); const res = await request(app) .post(`/api/projects/${projectId}/chat`) @@ -167,7 +184,7 @@ describe('POST /api/projects/:id/chat', () => { it('produces well-formed SSE lines with data: prefix and double newline delimiters', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', @@ -204,7 +221,7 @@ describe('POST /api/projects/:id/chat', () => { it('contains at least one text-delta event with non-empty text', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream('Hello!')); + mockQuery.mockReturnValueOnce(mockTextStream('Hello!')); const res = await request(app) .post(`/api/projects/${projectId}/chat`) @@ -218,7 +235,7 @@ describe('POST /api/projects/:id/chat', () => { it('ends with finish event and [DONE]', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream()); + mockQuery.mockReturnValueOnce(mockTextStream()); const res = await request(app) .post(`/api/projects/${projectId}/chat`) @@ -233,7 +250,7 @@ describe('POST /api/projects/:id/chat', () => { it('emits reasoning-delta events for thinking content', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', @@ -312,7 +329,7 @@ describe('POST /api/projects/:id/chat', () => { describe('POST /api/projects/:id/chat — tool calls', () => { it('emits tool-call SSE events for tool-using mock stream', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', @@ -395,7 +412,7 @@ describe('POST /api/projects/:id/chat — tool calls', () => { describe('GET /api/projects/:id — enriched state', () => { it('returns turns with options after structured question', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream('Hi')); + mockQuery.mockReturnValueOnce(mockTextStream('Hi')); await request(app) .post(`/api/projects/${projectId}/chat`) @@ -419,7 +436,7 @@ describe('GET /api/projects/:id — enriched state', () => { describe('POST /api/projects/:id/turns/:turnId/select', () => { it('persists option selection and sets answer', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream('Hi')); + mockQuery.mockReturnValueOnce(mockTextStream('Hi')); await request(app) .post(`/api/projects/${projectId}/chat`) @@ -451,7 +468,7 @@ describe('POST /api/projects/:id/turns/:turnId/select', () => { it('returns 400 for missing position', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream('Hi')); + mockQuery.mockReturnValueOnce(mockTextStream('Hi')); await request(app) .post(`/api/projects/${projectId}/chat`) @@ -473,7 +490,7 @@ describe('POST /api/projects/:id/turns/:turnId/select', () => { describe('POST /api/projects/:id/chat — turn persistence', () => { it('creates a turn with user answer and advances HEAD', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream('Hi there')); + mockQuery.mockReturnValueOnce(mockTextStream('Hi there')); await request(app) .post(`/api/projects/${projectId}/chat`) @@ -492,12 +509,12 @@ describe('POST /api/projects/:id/chat — turn persistence', () => { it('chains turns with parent pointers across exchanges', async () => { const projectId = await createTestProject(); - mockQuery.mockReturnValue(mockTextStream('First response')); + mockQuery.mockReturnValueOnce(mockTextStream('First response')); await request(app) .post(`/api/projects/${projectId}/chat`) .send({ messages: [{ role: 'user', content: 'first' }] }); - mockQuery.mockReturnValue(mockTextStream('Second response')); + mockQuery.mockReturnValueOnce(mockTextStream('Second response')); await request(app) .post(`/api/projects/${projectId}/chat`) .send({ messages: [{ role: 'user', content: 'second' }] }); diff --git a/src/server/app.ts b/src/server/app.ts index 810349a0..e4358d86 100644 --- a/src/server/app.ts +++ b/src/server/app.ts @@ -113,6 +113,9 @@ export function createApp(dbPath?: string) { res.write(formatSSE({ type: 'error', errorText: message })); } + // Protocol termination: finish-step + finish after all events (including observer) + res.write(formatSSE({ type: 'finish-step' })); + res.write(formatSSE({ type: 'finish', finishReason: 'stop' })); res.write(formatSSE('[DONE]')); res.end(); }); diff --git a/src/server/context.test.ts b/src/server/context.test.ts index 18b03fa8..d48a4f9d 100644 --- a/src/server/context.test.ts +++ b/src/server/context.test.ts @@ -184,4 +184,38 @@ describe('observer-context-projection', () => { // Should NOT contain the full Q&A pairs from earlier turns expect(result).not.toContain('Previous conversation:'); }); + + it('renders entity tables with md-pen (not hand-rolled strings)', () => { + const turn: Turn = { + id: 5, + project_id: 1, + parent_turn_id: 4, + phase: 'scope', + question: 'Q5', + answer: 'A5', + why: null, + impact: null, + is_resolution: false, + user_parts: null, + assistant_parts: null, + created_at: '2026-01-01', + }; + + const result = buildObserverContext({ + turn, + activePathSummary: '', + entities: { + decisions: [{ id: 1, content: 'Use React' }], + assumptions: [{ id: 2, content: 'Users have browsers' }], + }, + }); + + // md-pen table() produces pipe-separated markdown tables + expect(result).toContain('| ID | Content |'); + expect(result).toContain('| 1 | Use React |'); + expect(result).toContain('| 2 | Users have browsers |'); + // md-pen h3() produces ### headings + expect(result).toContain('### Existing Decisions'); + expect(result).toContain('### Existing Assumptions'); + }); }); diff --git a/src/server/context.ts b/src/server/context.ts index 68289068..d1884e5b 100644 --- a/src/server/context.ts +++ b/src/server/context.ts @@ -1,3 +1,5 @@ +import { table, h3 } from 'md-pen'; + import type { TurnWithOptions } from './core.js'; import type { Turn } from './db.js'; @@ -51,14 +53,26 @@ export function buildObserverContext(input: ObserverContextInput): string { const sections: string[] = []; if (input.entities.decisions.length > 0 || input.entities.assumptions.length > 0) { - const entityLines: string[] = ['Existing entities:']; - for (const d of input.entities.decisions) { - entityLines.push(` Decision #${d.id}: ${d.content}`); + if (input.entities.decisions.length > 0) { + sections.push( + h3('Existing Decisions') + + '\n' + + table( + input.entities.decisions.map((d) => ({ ID: d.id, Content: d.content })), + { columns: ['ID', 'Content'] }, + ), + ); } - for (const a of input.entities.assumptions) { - entityLines.push(` Assumption #${a.id}: ${a.content}`); + if (input.entities.assumptions.length > 0) { + sections.push( + h3('Existing Assumptions') + + '\n' + + table( + input.entities.assumptions.map((a) => ({ ID: a.id, Content: a.content })), + { columns: ['ID', 'Content'] }, + ), + ); } - sections.push(entityLines.join('\n')); } if (input.activePathSummary) { diff --git a/src/server/core.test.ts b/src/server/core.test.ts index 9adc131d..dcbcd7d3 100644 --- a/src/server/core.test.ts +++ b/src/server/core.test.ts @@ -22,6 +22,23 @@ let db: DB; beforeEach(() => { mockQuery.mockReset(); + // Default: observer gets empty result for any call not covered by mockReturnValueOnce + mockQuery.mockImplementation(() => + makeMockStream([ + { + type: 'result', + subtype: 'success', + duration_ms: 500, + duration_api_ms: 300, + total_cost_usd: 0.0005, + is_error: false, + num_turns: 1, + usage: { input_tokens: 100, output_tokens: 50 }, + result: '', + structured_output: { decisions: [], assumptions: [] }, + }, + ]), + ); db = createDb(); }); @@ -67,7 +84,7 @@ describe('formatHistory', () => { describe('conductTurn', () => { it('yields turn-created as first event', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { type: 'stream_event', event: { type: 'message_stop' } }, @@ -86,7 +103,7 @@ describe('conductTurn', () => { }); it('yields stream-start with message ID', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-42' } } }, { type: 'stream_event', event: { type: 'message_stop' } }, @@ -105,7 +122,7 @@ describe('conductTurn', () => { }); it('yields thinking events for thinking_delta', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -132,7 +149,7 @@ describe('conductTurn', () => { }); it('yields text-delta events and persists assistant text', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -165,7 +182,7 @@ describe('conductTurn', () => { }); it('yields stream-end and advances HEAD', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -195,7 +212,7 @@ describe('conductTurn', () => { }); it('yields error event on SDK failure', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( // oxlint-disable-next-line require-yield -- intentional: tests error before first yield (async function* () { throw new Error('API rate limit'); @@ -214,7 +231,7 @@ describe('conductTurn', () => { }); it('yields tool-call-start for tool_use content blocks', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -243,7 +260,7 @@ describe('conductTurn', () => { }); it('yields tool-call-delta for input_json_delta', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -280,7 +297,7 @@ describe('conductTurn', () => { }); it('yields tool-call-end with toolCallId and toolName', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -318,7 +335,7 @@ describe('conductTurn', () => { it('chains turns with parent pointers', async () => { // First turn - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { @@ -339,7 +356,7 @@ describe('conductTurn', () => { } // Second turn - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-2' } } }, { @@ -364,7 +381,7 @@ describe('conductTurn', () => { }); it('persists assistant_parts after stream finish', async () => { - mockQuery.mockReturnValue( + mockQuery.mockReturnValueOnce( makeMockStream([ { type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } }, { diff --git a/src/server/core.ts b/src/server/core.ts index 5034a9f3..24ea9175 100644 --- a/src/server/core.ts +++ b/src/server/core.ts @@ -1,13 +1,9 @@ -import { query } from '@anthropic-ai/claude-agent-sdk'; - import { buildInterviewerContext } from './context.js'; import { getProject, getActivePath, getOptionsForTurn, - getTurn, createTurn, - updateTurn, advanceHead, listProjects, createProject, @@ -15,8 +11,8 @@ import { type DB, type Project, } from './db.js'; -import { getSystemPrompt, createInterviewMcpServer } from './interview.js'; -import { assembleAssistantParts, serializeParts } from './parts.js'; +import { runInterviewer } from './interview.js'; +import { runObserver } from './observer.js'; /** Domain events yielded by conductTurn(). Transport-agnostic. */ export type DomainEvent = @@ -28,7 +24,18 @@ export type DomainEvent = | { type: 'tool-call-end'; toolCallId: string; toolName: string } | { type: 'stream-end' } | { type: 'turn-created'; turn: Turn } - | { type: 'error'; message: string }; + | { type: 'error'; message: string } + | { type: 'observer-complete'; entityIds: { decisions: number[]; assumptions: number[] } } + | { type: 'observer-error'; message: string } + | { + type: 'agent-metrics'; + agent: string; + durationMs: number; + durationApiMs: number; + totalCostUsd: number; + inputTokens: number; + outputTokens: number; + }; /** Extract user text from a UIMessage (parts[]) or legacy format (content string). */ export function extractPrompt(messages: unknown[]): string { @@ -57,21 +64,10 @@ export function formatHistory(turns: TurnWithOptions[], currentPrompt: string): return buildInterviewerContext(turns, currentPrompt); } -/** SDK stream event shapes we consume */ -interface SDKStreamEvent { - type: 'stream_event'; - event: { - type: string; - index?: number; - message?: { id: string }; - content_block?: { type: string; name?: string; id?: string }; - delta?: { type: string; text?: string; thinking?: string; partial_json?: string }; - }; -} - /** - * Conduct a turn: create turn, stream agent response, persist result. + * Conduct a turn: create turn, run interviewer, advance HEAD, run observer. * Yields DomainEvents for adapter consumption. + * conductTurn is a thin sequencer — agent-specific logic lives in each agent module. */ export async function* conductTurn( db: DB, @@ -96,104 +92,24 @@ export async function* conductTurn( yield { type: 'turn-created', turn }; - const fullPrompt = buildInterviewerContext(activePath, userMessage); - let assistantText = ''; - let errored = false; - const collectedEvents: DomainEvent[] = []; - - function emit(ev: DomainEvent): DomainEvent { - collectedEvents.push(ev); - return ev; - } - - const interviewServer = createInterviewMcpServer(db, turn.id); - + // Interviewer agent — streams DomainEvents and persists turn-level data try { - const stream = query({ - prompt: fullPrompt, - options: { - model: process.env.ANTHROPIC_MODEL || 'claude-sonnet-4-20250514', - maxTurns: 1, - includePartialMessages: true, - systemPrompt: getSystemPrompt(phase), - mcpServers: { interview: interviewServer }, - }, - }); - - const toolUseBlocks = new Map(); - - for await (const sdkMessage of stream) { - if (sdkMessage.type !== 'stream_event') continue; - const event = (sdkMessage as SDKStreamEvent).event; - - switch (event.type) { - case 'message_start': - yield emit({ type: 'stream-start', messageId: event.message!.id }); - break; - - case 'content_block_start': { - const block = event.content_block!; - if (block.type === 'tool_use') { - toolUseBlocks.set(event.index!, { toolName: block.name!, toolCallId: block.id! }); - yield emit({ type: 'tool-call-start', toolName: block.name!, toolCallId: block.id! }); - } - break; - } - - case 'content_block_delta': { - const delta = event.delta!; - if (delta.type === 'thinking_delta' && delta.thinking) { - yield emit({ type: 'thinking', delta: delta.thinking }); - } else if (delta.type === 'text_delta' && delta.text) { - assistantText += delta.text; - yield emit({ type: 'text-delta', delta: delta.text }); - } else if (delta.type === 'input_json_delta' && delta.partial_json) { - const toolBlock = toolUseBlocks.get(event.index!); - yield emit({ - type: 'tool-call-delta', - toolCallId: toolBlock?.toolCallId ?? '', - delta: delta.partial_json, - }); - } - break; - } - - case 'content_block_stop': { - const toolBlock = toolUseBlocks.get(event.index!); - if (toolBlock) { - yield emit({ - type: 'tool-call-end', - toolCallId: toolBlock.toolCallId, - toolName: toolBlock.toolName, - }); - toolUseBlocks.delete(event.index!); - } - break; - } - - case 'message_stop': - yield emit({ type: 'stream-end' }); - break; - } - } + yield* runInterviewer(db, turn, activePath, userMessage, phase); } catch (err) { - errored = true; const message = err instanceof Error ? err.message : 'Unknown error'; yield { type: 'error', message }; + return; // Don't advance head or run observer on interviewer error } - if (!errored) { - const currentTurn = getTurn(db, turn.id); - const parts = assembleAssistantParts(collectedEvents); + advanceHead(db, projectId, turn.id); - updateTurn(db, turn.id, { - ...(assistantText && (!currentTurn?.question || currentTurn.question === '') - ? { question: assistantText } - : {}), - ...(parts.length > 0 ? { assistant_parts: serializeParts(parts) } : {}), - }); - - advanceHead(db, projectId, turn.id); + // Observer agent — runs silently, persists entities, yields observer-complete + // Non-fatal: observer failure does not affect the interviewer's persisted turn + try { + yield* runObserver(db, turn, projectId); + } catch (err) { + const message = err instanceof Error ? err.message : 'Unknown error'; + yield { type: 'observer-error', message }; } } diff --git a/src/server/db.test.ts b/src/server/db.test.ts index c4b794b5..f0579063 100644 --- a/src/server/db.test.ts +++ b/src/server/db.test.ts @@ -16,6 +16,14 @@ import { listProjects, createProject, getProject, + createDecision, + createAssumption, + linkDecisionToTurn, + linkAssumptionToTurn, + addDecisionParentDecision, + addDecisionParentAssumption, + addAssumptionParentAssumption, + getEntitiesForProject, type DB, } from './db.js'; @@ -400,3 +408,69 @@ describe('DB lifecycle — turn tree persistence', () => { unlinkSync(dbPath); }); }); + +describe('entity persistence — decisions and assumptions', () => { + it('creates a decision with project linkage', () => { + const project = createProject(db, 'Test'); + const d = createDecision(db, project.id, 'Use SQLite for persistence'); + expect(d.id).toBeDefined(); + expect(d.content).toBe('Use SQLite for persistence'); + expect(d.project_id).toBe(project.id); + }); + + it('creates an assumption with project linkage', () => { + const project = createProject(db, 'Test'); + const a = createAssumption(db, project.id, 'SQLite handles concurrent writes'); + expect(a.id).toBeDefined(); + expect(a.content).toBe('SQLite handles concurrent writes'); + expect(a.project_id).toBe(project.id); + }); + + it('links a decision to a turn', () => { + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + const d = createDecision(db, project.id, 'Use React'); + linkDecisionToTurn(db, d.id, turn.id); + const entities = getEntitiesForProject(db, project.id); + expect(entities.decisions).toHaveLength(1); + expect(entities.decisions[0].content).toBe('Use React'); + }); + + it('links an assumption to a turn', () => { + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + const a = createAssumption(db, project.id, 'Users have API keys'); + linkAssumptionToTurn(db, a.id, turn.id); + const entities = getEntitiesForProject(db, project.id); + expect(entities.assumptions).toHaveLength(1); + expect(entities.assumptions[0].content).toBe('Users have API keys'); + }); + + it('creates dependency edges between decisions', () => { + const project = createProject(db, 'Test'); + const d1 = createDecision(db, project.id, 'Use Express'); + const d2 = createDecision(db, project.id, 'Use SSE for streaming'); + addDecisionParentDecision(db, d2.id, d1.id); + const entities = getEntitiesForProject(db, project.id); + expect(entities.decisions).toHaveLength(2); + }); + + it('creates dependency edges between decisions and assumptions', () => { + const project = createProject(db, 'Test'); + const a = createAssumption(db, project.id, 'SDK supports streaming'); + const d = createDecision(db, project.id, 'Use SDK streaming'); + addDecisionParentAssumption(db, d.id, a.id); + const entities = getEntitiesForProject(db, project.id); + expect(entities.decisions).toHaveLength(1); + expect(entities.assumptions).toHaveLength(1); + }); + + it('creates dependency edges between assumptions', () => { + const project = createProject(db, 'Test'); + const a1 = createAssumption(db, project.id, 'Single user'); + const a2 = createAssumption(db, project.id, 'No concurrent writes'); + addAssumptionParentAssumption(db, a2.id, a1.id); + const entities = getEntitiesForProject(db, project.id); + expect(entities.assumptions).toHaveLength(2); + }); +}); diff --git a/src/server/db.ts b/src/server/db.ts index 5b59dd63..ee558930 100644 --- a/src/server/db.ts +++ b/src/server/db.ts @@ -169,3 +169,76 @@ export function advanceHead(db: DB, projectId: number, turnId: number): void { .where(eq(schema.project.id, projectId)) .run(); } + +// --- Entity persistence (decisions, assumptions, dependency edges) --- + +export type Decision = InferSelectModel; +export type Assumption = InferSelectModel; + +export function createDecision( + db: DB, + projectId: number, + content: string, + rationale?: string | null, +): Decision { + return db + .insert(schema.decision) + .values({ project_id: projectId, content, rationale: rationale ?? null }) + .returning() + .get() as Decision; +} + +export function createAssumption(db: DB, projectId: number, content: string): Assumption { + return db + .insert(schema.assumption) + .values({ project_id: projectId, content }) + .returning() + .get() as Assumption; +} + +export function linkDecisionToTurn(db: DB, decisionId: number, turnId: number): void { + db.insert(schema.turnDecision).values({ turn_id: turnId, decision_id: decisionId }).run(); +} + +export function linkAssumptionToTurn(db: DB, assumptionId: number, turnId: number): void { + db.insert(schema.turnAssumption).values({ turn_id: turnId, assumption_id: assumptionId }).run(); +} + +export function addDecisionParentDecision(db: DB, decisionId: number, parentDecisionId: number): void { + db.insert(schema.decisionParentDecision) + .values({ decision_id: decisionId, parent_decision_id: parentDecisionId }) + .run(); +} + +export function addDecisionParentAssumption(db: DB, decisionId: number, parentAssumptionId: number): void { + db.insert(schema.decisionParentAssumption) + .values({ decision_id: decisionId, parent_assumption_id: parentAssumptionId }) + .run(); +} + +export function addAssumptionParentAssumption( + db: DB, + assumptionId: number, + parentAssumptionId: number, +): void { + db.insert(schema.assumptionParentAssumption) + .values({ assumption_id: assumptionId, parent_assumption_id: parentAssumptionId }) + .run(); +} + +export function getEntitiesForProject( + db: DB, + projectId: number, +): { decisions: Decision[]; assumptions: Assumption[] } { + const decisions = db + .select() + .from(schema.decision) + .where(eq(schema.decision.project_id, projectId)) + .all() as Decision[]; + const assumptions = db + .select() + .from(schema.assumption) + .where(eq(schema.assumption.project_id, projectId)) + .all() as Assumption[]; + return { decisions, assumptions }; +} diff --git a/src/server/interview.test.ts b/src/server/interview.test.ts index 05f4b537..fdef73e0 100644 --- a/src/server/interview.test.ts +++ b/src/server/interview.test.ts @@ -28,6 +28,23 @@ let db: DB; beforeEach(() => { mockQuery.mockReset(); mockCreateSdkMcpServer.mockClear(); + // Default: observer gets empty result for any call not covered by mockReturnValueOnce + mockQuery.mockImplementation(() => + makeMockStream([ + { + type: 'result', + subtype: 'success', + duration_ms: 500, + duration_api_ms: 300, + total_cost_usd: 0.0005, + is_error: false, + num_turns: 1, + usage: { input_tokens: 100, output_tokens: 50 }, + result: '', + structured_output: { decisions: [], assumptions: [] }, + }, + ]), + ); db = createDb(); }); @@ -195,28 +212,30 @@ describe('conductTurn with interview config', () => { } it('passes scope system prompt to SDK query', async () => { - mockQuery.mockReturnValue(mockMinimalStream()); + mockQuery.mockReturnValueOnce(mockMinimalStream()); const project = getOrCreateProject(db); for await (const _ of conductTurn(db, project.id, 'hello')) { /* consume */ } - expect(mockQuery).toHaveBeenCalledOnce(); + // First call is interviewer; second is observer (may fail gracefully) + expect(mockQuery).toHaveBeenCalled(); const callArgs = mockQuery.mock.calls[0][0]; expect(callArgs.options.systemPrompt).toContain('scope'); expect(callArgs.options.systemPrompt).not.toBe('You are a helpful assistant.'); }); it('passes interview MCP server to SDK query', async () => { - mockQuery.mockReturnValue(mockMinimalStream()); + mockQuery.mockReturnValueOnce(mockMinimalStream()); const project = getOrCreateProject(db); for await (const _ of conductTurn(db, project.id, 'hello')) { /* consume */ } - expect(mockQuery).toHaveBeenCalledOnce(); + // First call is interviewer; second is observer (may fail gracefully) + expect(mockQuery).toHaveBeenCalled(); const callArgs = mockQuery.mock.calls[0][0]; expect(callArgs.options.mcpServers).toBeDefined(); expect(callArgs.options.mcpServers.interview).toBeDefined(); diff --git a/src/server/interview.ts b/src/server/interview.ts index eb3b9a1d..0f56c908 100644 --- a/src/server/interview.ts +++ b/src/server/interview.ts @@ -1,14 +1,18 @@ -import { createSdkMcpServer, tool } from '@anthropic-ai/claude-agent-sdk'; +import { query, createSdkMcpServer, tool } from '@anthropic-ai/claude-agent-sdk'; /** - * Interview module — structured question schema, phase prompts, and MCP tool server. + * Interview module — structured question schema, phase prompts, MCP tool server, + * and runInterviewer() generator that owns the full interviewer pipeline. * * Pure domain: structuredQuestionSchema, getSystemPrompt, SYSTEM_PROMPTS. - * Shell boundary: createInterviewMcpServer — the tool handler captures db + turnId - * via closure and persists structured data when the agent uses ask_question. + * Shell boundary: createInterviewMcpServer, runInterviewer. */ import { z } from 'zod'; -import { createOption, updateTurn, type DB, type Impact, type Phase } from './db.js'; +import { buildInterviewerContext } from './context.js'; +import type { TurnWithOptions, DomainEvent } from './core.js'; +import { createOption, updateTurn, getTurn, type DB, type Turn, type Impact, type Phase } from './db.js'; +import { assembleAssistantParts, serializeParts } from './parts.js'; +import { createStreamTranslator, extractMetrics, type SdkResultMessage } from './sdk.js'; /** Zod schema for the ask_question tool output. */ export const structuredQuestionSchema = z.object({ @@ -103,3 +107,62 @@ export function createInterviewMcpServer(db: DB, turnId: number) { ], }); } + +/** + * Run the interviewer agent. Streams DomainEvents from the SDK query + * and persists turn-level data (assistant text, parts) when done. + * Each call owns its full pipeline: prompt, tools, streaming, persistence. + */ +export async function* runInterviewer( + db: DB, + turn: Turn, + activePath: TurnWithOptions[], + userMessage: string, + phase: Phase, +): AsyncGenerator { + const fullPrompt = buildInterviewerContext(activePath, userMessage); + const interviewServer = createInterviewMcpServer(db, turn.id); + const { translate } = createStreamTranslator(); + + let assistantText = ''; + const collectedEvents: DomainEvent[] = []; + + const stream = query({ + prompt: fullPrompt, + options: { + model: process.env.ANTHROPIC_MODEL || 'claude-sonnet-4-20250514', + maxTurns: 1, + includePartialMessages: true, + systemPrompt: getSystemPrompt(phase), + mcpServers: { interview: interviewServer }, + persistSession: false, + }, + }); + + for await (const msg of stream) { + // Translate SDK stream events to DomainEvents + for (const event of translate(msg)) { + collectedEvents.push(event); + if (event.type === 'text-delta') { + assistantText += event.delta; + } + yield event; + } + + // Capture ResultMessage for metrics + if ((msg as Record).type === 'result') { + yield extractMetrics('interviewer', msg as unknown as SdkResultMessage); + } + } + + // Persist turn-level data + const currentTurn = getTurn(db, turn.id); + const parts = assembleAssistantParts(collectedEvents); + + updateTurn(db, turn.id, { + ...(assistantText && (!currentTurn?.question || currentTurn.question === '') + ? { question: assistantText } + : {}), + ...(parts.length > 0 ? { assistant_parts: serializeParts(parts) } : {}), + }); +} diff --git a/src/server/observer.test.ts b/src/server/observer.test.ts new file mode 100644 index 00000000..22b9bf00 --- /dev/null +++ b/src/server/observer.test.ts @@ -0,0 +1,202 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +import type { DomainEvent } from './core.js'; +import type { DB } from './db.js'; + +// Mock the Claude Agent SDK +const mockQuery = vi.fn(); +vi.mock('@anthropic-ai/claude-agent-sdk', () => ({ + query: mockQuery, + createSdkMcpServer: () => ({ name: 'interview', instance: {} }), + tool: (name: string, desc: string, schema: any, handler: any) => ({ + name, + description: desc, + inputSchema: schema, + handler, + }), +})); + +const { runObserver } = await import('./observer.js'); +const { createDb, createProject, createTurn, getEntitiesForProject } = await import('./db.js'); + +let db: DB; + +beforeEach(() => { + mockQuery.mockReset(); + db = createDb(); +}); + +afterEach(() => { + db.$client.close(); +}); + +/** Helper: mock a successful observer result message */ +function mockObserverResult(structured_output: unknown) { + return (async function* () { + yield { + type: 'result', + subtype: 'success', + duration_ms: 1500, + duration_api_ms: 1000, + total_cost_usd: 0.001, + is_error: false, + num_turns: 1, + usage: { input_tokens: 300, output_tokens: 100 }, + result: '', + structured_output, + }; + })(); +} + +describe('runObserver', () => { + it('persists extracted decisions and links them to the turn', async () => { + mockQuery.mockReturnValue( + mockObserverResult({ + decisions: [ + { + content: 'Use SQLite', + rationale: 'Simple and fast', + parentDecisionIds: [], + parentAssumptionIds: [], + }, + ], + assumptions: [], + }), + ); + + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + + const events: DomainEvent[] = []; + for await (const event of runObserver(db, turn, project.id)) { + events.push(event); + } + + const entities = getEntitiesForProject(db, project.id); + expect(entities.decisions).toHaveLength(1); + expect(entities.decisions[0].content).toBe('Use SQLite'); + }); + + it('persists extracted assumptions and links them to the turn', async () => { + mockQuery.mockReturnValue( + mockObserverResult({ + decisions: [], + assumptions: [{ content: 'Users have API keys', parentAssumptionIds: [] }], + }), + ); + + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + + const events: DomainEvent[] = []; + for await (const event of runObserver(db, turn, project.id)) { + events.push(event); + } + + const entities = getEntitiesForProject(db, project.id); + expect(entities.assumptions).toHaveLength(1); + expect(entities.assumptions[0].content).toBe('Users have API keys'); + }); + + it('persists dependency edges between decisions', async () => { + const project = createProject(db, 'Test'); + // Pre-existing decision from a previous turn + const { createDecision, linkDecisionToTurn } = await import('./db.js'); + const prevTurn = createTurn(db, project.id, { phase: 'scope', question: 'Q1', answer: 'A1' }); + const existingDecision = createDecision(db, project.id, 'Use Express'); + linkDecisionToTurn(db, existingDecision.id, prevTurn.id); + + const turn = createTurn(db, project.id, { + phase: 'scope', + question: 'Q2', + answer: 'A2', + parent_turn_id: prevTurn.id, + }); + + mockQuery.mockReturnValue( + mockObserverResult({ + decisions: [ + { + content: 'Use SSE for streaming', + rationale: 'Real-time updates', + parentDecisionIds: [existingDecision.id], + parentAssumptionIds: [], + }, + ], + assumptions: [], + }), + ); + + for await (const _ of runObserver(db, turn, project.id)) { + /* consume */ + } + + const entities = getEntitiesForProject(db, project.id); + expect(entities.decisions).toHaveLength(2); + }); + + it('yields observer-complete with entity IDs after extraction', async () => { + mockQuery.mockReturnValue( + mockObserverResult({ + decisions: [{ content: 'D1', rationale: null, parentDecisionIds: [], parentAssumptionIds: [] }], + assumptions: [{ content: 'A1', parentAssumptionIds: [] }], + }), + ); + + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + + const events: DomainEvent[] = []; + for await (const event of runObserver(db, turn, project.id)) { + events.push(event); + } + + const complete = events.find((e) => e.type === 'observer-complete'); + expect(complete).toBeDefined(); + expect((complete as any).entityIds.decisions).toHaveLength(1); + expect((complete as any).entityIds.assumptions).toHaveLength(1); + }); + + it('yields agent-metrics from ResultMessage', async () => { + mockQuery.mockReturnValue( + mockObserverResult({ + decisions: [], + assumptions: [], + }), + ); + + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + + const events: DomainEvent[] = []; + for await (const event of runObserver(db, turn, project.id)) { + events.push(event); + } + + const metrics = events.find((e) => e.type === 'agent-metrics'); + expect(metrics).toBeDefined(); + expect((metrics as any).agent).toBe('observer'); + expect((metrics as any).durationMs).toBe(1500); + }); + + it('handles empty extraction gracefully', async () => { + mockQuery.mockReturnValue( + mockObserverResult({ + decisions: [], + assumptions: [], + }), + ); + + const project = createProject(db, 'Test'); + const turn = createTurn(db, project.id, { phase: 'scope', question: 'Q', answer: 'A' }); + + const events: DomainEvent[] = []; + for await (const event of runObserver(db, turn, project.id)) { + events.push(event); + } + + const complete = events.find((e) => e.type === 'observer-complete') as any; + expect(complete.entityIds.decisions).toEqual([]); + expect(complete.entityIds.assumptions).toEqual([]); + }); +}); diff --git a/src/server/observer.ts b/src/server/observer.ts new file mode 100644 index 00000000..7b135aa2 --- /dev/null +++ b/src/server/observer.ts @@ -0,0 +1,170 @@ +/** + * Observer agent — extracts decisions and assumptions from answered turns. + * + * Runs silently after the interviewer completes. Uses outputFormat (structured JSON) + * for entity extraction — no MCP tools, no streaming events. + * Persists entities to the DB in a transaction, then yields observer-complete. + */ +import { query } from '@anthropic-ai/claude-agent-sdk'; +import { z } from 'zod'; + +import { buildObserverContext } from './context.js'; +import type { DomainEvent } from './core.js'; +import { + createDecision, + createAssumption, + linkDecisionToTurn, + linkAssumptionToTurn, + addDecisionParentDecision, + addDecisionParentAssumption, + addAssumptionParentAssumption, + getEntitiesForProject, + type DB, + type Turn, +} from './db.js'; +import { extractMetrics, type SdkResultMessage } from './sdk.js'; + +/** Schema for observer structured output. */ +export const observerOutputSchema = z.object({ + decisions: z.array( + z.object({ + content: z.string().min(1), + rationale: z.string().nullable(), + parentDecisionIds: z.array(z.number()), + parentAssumptionIds: z.array(z.number()), + }), + ), + assumptions: z.array( + z.object({ + content: z.string().min(1), + parentAssumptionIds: z.array(z.number()), + }), + ), +}); + +export type ObserverOutput = z.infer; + +const OBSERVER_SYSTEM_PROMPT = `You are an observer agent analyzing a spec elicitation interview turn. + +Your job is to extract decisions and assumptions from the Q&A exchange. For each turn, identify: + +1. **Decisions** — explicit choices the user made (e.g., "use SQLite", "support only macOS"). Include the rationale if stated. +2. **Assumptions** — implicit or explicit beliefs underlying the decisions (e.g., "single-user tool", "users have API keys"). + +For each entity, identify dependency edges to previously extracted entities by their IDs. + +Rules: +- Only extract entities that are NEW in this turn — do not re-extract existing entities. +- Be precise: a decision is a concrete choice; an assumption is a belief that could be wrong. +- If no new entities are evident in this turn, return empty arrays. +- Reference parent entity IDs only when a clear dependency exists.`; + +/** + * Run the observer agent. Extracts entities from the completed turn, + * persists them to the DB, and yields observer-complete with entity IDs. + */ +export async function* runObserver(db: DB, turn: Turn, projectId: number): AsyncGenerator { + const entities = getEntitiesForProject(db, projectId); + const context = buildObserverContext({ + turn, + activePathSummary: '', + entities, + }); + + const stream = query({ + prompt: context, + options: { + model: process.env.OBSERVER_MODEL || 'claude-haiku-4-5-20251001', + maxTurns: 1, + persistSession: false, + effort: 'low', + systemPrompt: OBSERVER_SYSTEM_PROMPT, + outputFormat: { + type: 'json_schema', + schema: { + type: 'object', + properties: { + decisions: { + type: 'array', + items: { + type: 'object', + properties: { + content: { type: 'string' }, + rationale: { type: 'string', nullable: true }, + parentDecisionIds: { type: 'array', items: { type: 'number' } }, + parentAssumptionIds: { type: 'array', items: { type: 'number' } }, + }, + required: ['content', 'rationale', 'parentDecisionIds', 'parentAssumptionIds'], + }, + }, + assumptions: { + type: 'array', + items: { + type: 'object', + properties: { + content: { type: 'string' }, + parentAssumptionIds: { type: 'array', items: { type: 'number' } }, + }, + required: ['content', 'parentAssumptionIds'], + }, + }, + }, + required: ['decisions', 'assumptions'], + }, + }, + }, + }); + + let resultMessage: SdkResultMessage | undefined; + + for await (const msg of stream) { + if ((msg as Record).type === 'result') { + resultMessage = msg as unknown as SdkResultMessage; + } + } + + if (!resultMessage || resultMessage.is_error) { + throw new Error(`Observer extraction failed: ${resultMessage ? 'SDK error' : 'no result message'}`); + } + + // Parse structured output + const parsed = observerOutputSchema.parse(resultMessage.structured_output); + + // Persist entities in a transaction-like sequence + const createdDecisionIds: number[] = []; + const createdAssumptionIds: number[] = []; + + for (const d of parsed.decisions) { + const decision = createDecision(db, projectId, d.content, d.rationale); + linkDecisionToTurn(db, decision.id, turn.id); + createdDecisionIds.push(decision.id); + + for (const parentId of d.parentDecisionIds) { + addDecisionParentDecision(db, decision.id, parentId); + } + for (const parentId of d.parentAssumptionIds) { + addDecisionParentAssumption(db, decision.id, parentId); + } + } + + for (const a of parsed.assumptions) { + const assumption = createAssumption(db, projectId, a.content); + linkAssumptionToTurn(db, assumption.id, turn.id); + createdAssumptionIds.push(assumption.id); + + for (const parentId of a.parentAssumptionIds) { + addAssumptionParentAssumption(db, assumption.id, parentId); + } + } + + // Yield observer-complete post-commit + yield { + type: 'observer-complete', + entityIds: { decisions: createdDecisionIds, assumptions: createdAssumptionIds }, + }; + + // Yield agent metrics + if (resultMessage) { + yield extractMetrics('observer', resultMessage); + } +} diff --git a/src/server/sdk.test.ts b/src/server/sdk.test.ts new file mode 100644 index 00000000..e4caa0c1 --- /dev/null +++ b/src/server/sdk.test.ts @@ -0,0 +1,103 @@ +import { describe, it, expect } from 'vitest'; + +import { createStreamTranslator, extractMetrics, type SdkResultMessage } from './sdk.js'; + +describe('createStreamTranslator', () => { + it('translates message_start to stream-start', () => { + const { translate } = createStreamTranslator(); + const events = translate({ + type: 'stream_event', + event: { type: 'message_start', message: { id: 'msg-1' } }, + }); + expect(events).toEqual([{ type: 'stream-start', messageId: 'msg-1' }]); + }); + + it('translates thinking_delta to thinking', () => { + const { translate } = createStreamTranslator(); + const events = translate({ + type: 'stream_event', + event: { type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'hmm' } }, + }); + expect(events).toEqual([{ type: 'thinking', delta: 'hmm' }]); + }); + + it('translates text_delta to text-delta', () => { + const { translate } = createStreamTranslator(); + const events = translate({ + type: 'stream_event', + event: { type: 'content_block_delta', index: 1, delta: { type: 'text_delta', text: 'hello' } }, + }); + expect(events).toEqual([{ type: 'text-delta', delta: 'hello' }]); + }); + + it('translates tool_use lifecycle (start → delta → stop)', () => { + const { translate } = createStreamTranslator(); + + const start = translate({ + type: 'stream_event', + event: { + type: 'content_block_start', + index: 0, + content_block: { type: 'tool_use', name: 'ask_question', id: 'toolu_01' }, + }, + }); + expect(start).toEqual([{ type: 'tool-call-start', toolName: 'ask_question', toolCallId: 'toolu_01' }]); + + const delta = translate({ + type: 'stream_event', + event: { + type: 'content_block_delta', + index: 0, + delta: { type: 'input_json_delta', partial_json: '{"q":"hi"}' }, + }, + }); + expect(delta).toEqual([{ type: 'tool-call-delta', toolCallId: 'toolu_01', delta: '{"q":"hi"}' }]); + + const stop = translate({ + type: 'stream_event', + event: { type: 'content_block_stop', index: 0 }, + }); + expect(stop).toEqual([{ type: 'tool-call-end', toolCallId: 'toolu_01', toolName: 'ask_question' }]); + }); + + it('translates message_stop to stream-end', () => { + const { translate } = createStreamTranslator(); + const events = translate({ + type: 'stream_event', + event: { type: 'message_stop' }, + }); + expect(events).toEqual([{ type: 'stream-end' }]); + }); + + it('ignores non-stream_event messages', () => { + const { translate } = createStreamTranslator(); + expect(translate({ type: 'result', subtype: 'success' })).toEqual([]); + expect(translate({ type: 'system' })).toEqual([]); + }); +}); + +describe('extractMetrics', () => { + it('produces agent-metrics DomainEvent from ResultMessage', () => { + const result: SdkResultMessage = { + type: 'result', + subtype: 'success', + duration_ms: 1200, + duration_api_ms: 800, + total_cost_usd: 0.003, + is_error: false, + num_turns: 1, + usage: { input_tokens: 500, output_tokens: 200 }, + result: 'ok', + }; + const event = extractMetrics('observer', result); + expect(event).toEqual({ + type: 'agent-metrics', + agent: 'observer', + durationMs: 1200, + durationApiMs: 800, + totalCostUsd: 0.003, + inputTokens: 500, + outputTokens: 200, + }); + }); +}); diff --git a/src/server/sdk.ts b/src/server/sdk.ts new file mode 100644 index 00000000..a009fe5e --- /dev/null +++ b/src/server/sdk.ts @@ -0,0 +1,115 @@ +/** + * SDK utilities — shared stream event translation for agents that call query(). + * + * createStreamTranslator() — per-message SDK stream_event → DomainEvent translator. + * Each agent owns its own query() call; this handles the shared translation logic. + * Streaming agents use it; silent agents (observer) don't. + */ + +import type { DomainEvent } from './core.js'; + +/** Minimal shape of an SDK stream_event message. */ +export interface SdkStreamEvent { + type: 'stream_event'; + event: { + type: string; + index?: number; + message?: { id: string }; + content_block?: { type: string; name?: string; id?: string }; + delta?: { type: string; text?: string; thinking?: string; partial_json?: string }; + }; +} + +/** Shape of SDK ResultMessage (success or error). */ +export interface SdkResultMessage { + type: 'result'; + subtype: 'success' | 'error_during_execution' | 'error_max_turns' | 'error_max_budget_usd'; + duration_ms: number; + duration_api_ms: number; + total_cost_usd: number; + is_error: boolean; + num_turns: number; + usage: { input_tokens: number; output_tokens: number }; + result?: string; + structured_output?: unknown; +} + +/** + * Create a per-request stream translator with scoped state. + * Maps individual SDK stream_event messages to DomainEvent arrays. + * Tracks tool_use block lifecycle (start → delta → stop). + */ +export function createStreamTranslator() { + const toolUseBlocks = new Map(); + + function translate(sdkMessage: unknown): DomainEvent[] { + const msg = sdkMessage as Record; + if (msg.type !== 'stream_event') return []; + + const event = (msg as unknown as SdkStreamEvent).event; + switch (event.type) { + case 'message_start': + return [{ type: 'stream-start', messageId: event.message!.id }]; + + case 'content_block_start': { + const block = event.content_block!; + if (block.type === 'tool_use') { + toolUseBlocks.set(event.index!, { toolName: block.name!, toolCallId: block.id! }); + return [{ type: 'tool-call-start', toolName: block.name!, toolCallId: block.id! }]; + } + return []; + } + + case 'content_block_delta': { + const delta = event.delta!; + if (delta.type === 'thinking_delta' && delta.thinking) { + return [{ type: 'thinking', delta: delta.thinking }]; + } + if (delta.type === 'text_delta' && delta.text) { + return [{ type: 'text-delta', delta: delta.text }]; + } + if (delta.type === 'input_json_delta' && delta.partial_json) { + const toolBlock = toolUseBlocks.get(event.index!); + return [ + { + type: 'tool-call-delta', + toolCallId: toolBlock?.toolCallId ?? '', + delta: delta.partial_json, + }, + ]; + } + return []; + } + + case 'content_block_stop': { + const toolBlock = toolUseBlocks.get(event.index!); + if (toolBlock) { + toolUseBlocks.delete(event.index!); + return [{ type: 'tool-call-end', toolCallId: toolBlock.toolCallId, toolName: toolBlock.toolName }]; + } + return []; + } + + case 'message_stop': + return [{ type: 'stream-end' }]; + + default: + return []; + } + } + + return { translate }; +} + +/** Extract agent-metrics DomainEvent from an SDK ResultMessage. */ +export function extractMetrics(agent: string, msg: SdkResultMessage): DomainEvent { + return { + type: 'agent-metrics', + agent, + durationMs: msg.duration_ms, + durationApiMs: msg.duration_api_ms, + totalCostUsd: msg.total_cost_usd, + inputTokens: msg.usage.input_tokens, + outputTokens: msg.usage.output_tokens, + }; +} diff --git a/src/server/sse-adapter.test.ts b/src/server/sse-adapter.test.ts index a9558d71..28589e10 100644 --- a/src/server/sse-adapter.test.ts +++ b/src/server/sse-adapter.test.ts @@ -232,4 +232,38 @@ describe('createDomainAdapter — tool-call events', () => { const events = translate({ type: 'tool-call-end', toolCallId: 'tc-1', toolName: 'search' }); expect(events).toEqual([{ type: 'tool-call', id: 'tc-1', toolName: 'search', args: '{"q":"test"}' }]); }); + + it('translates observer-complete to data event', () => { + const { translate } = createDomainAdapter(); + const events = translate({ + type: 'observer-complete', + entityIds: { decisions: [1, 2], assumptions: [3] }, + } as any); + expect(events).toEqual([ + { + type: 'data', + data: { type: 'data-observer-result', entityIds: { decisions: [1, 2], assumptions: [3] } }, + }, + ]); + }); + + it('translates observer-error to error event', () => { + const { translate } = createDomainAdapter(); + const events = translate({ type: 'observer-error', message: 'extraction failed' } as any); + expect(events).toEqual([{ type: 'error', errorText: 'Observer: extraction failed' }]); + }); + + it('translates agent-metrics to empty array (internal only)', () => { + const { translate } = createDomainAdapter(); + const events = translate({ + type: 'agent-metrics', + agent: 'observer', + durationMs: 1500, + durationApiMs: 1000, + totalCostUsd: 0.001, + inputTokens: 300, + outputTokens: 100, + } as any); + expect(events).toEqual([]); + }); }); diff --git a/src/server/sse-adapter.ts b/src/server/sse-adapter.ts index 186ab8c0..478d5e55 100644 --- a/src/server/sse-adapter.ts +++ b/src/server/sse-adapter.ts @@ -22,7 +22,8 @@ export type AIEvent = | { type: 'tool-call'; id: string; toolName: string; args: string } | { type: 'finish-step' } | { type: 'finish'; finishReason: string } - | { type: 'error'; errorText: string }; + | { type: 'error'; errorText: string } + | { type: 'data'; data: unknown }; /** Minimal shape of an SDKPartialAssistantMessage from the Claude Agent SDK */ interface SDKStreamEvent { @@ -225,13 +226,14 @@ export function createDomainAdapter() { } case 'stream-end': { + // Close any open blocks; finish-step + finish are emitted by the Express adapter + // after all events (including observer) have been processed. const events: AIEvent[] = []; if (currentBlock === 'thinking') { events.push({ type: 'reasoning-end', id: `reasoning-${blockIndex}` }); } else if (currentBlock === 'text') { events.push({ type: 'text-end', id: `text-${blockIndex}` }); } - events.push({ type: 'finish-step' }, { type: 'finish', finishReason: 'stop' }); return events; } @@ -241,6 +243,23 @@ export function createDomainAdapter() { case 'turn-created': return []; // No SSE representation + case 'observer-complete': + return [ + { + type: 'data', + data: { + type: 'data-observer-result', + entityIds: event.entityIds, + }, + }, + ]; + + case 'observer-error': + return [{ type: 'error', errorText: `Observer: ${event.message}` }]; + + case 'agent-metrics': + return []; // Internal only — not sent to client + default: return []; } From 8d6848a9125d210d849fb14aa82c7bcbbedac3f8 Mon Sep 17 00:00:00 2001 From: Lu Nelson Date: Thu, 2 Apr 2026 15:12:11 +0200 Subject: [PATCH 2/2] =?UTF-8?q?traceability:=20slice=205=20done=20?= =?UTF-8?q?=E2=80=94=20D27/D28/D29,=20A24/A25=20added,=20I20/I21/I22=20est?= =?UTF-8?q?ablished?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- memory/PLAN.md | 13 ++++++++----- memory/SPEC.md | 13 +++++++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/memory/PLAN.md b/memory/PLAN.md index b0280910..3e883a16 100644 --- a/memory/PLAN.md +++ b/memory/PLAN.md @@ -129,12 +129,15 @@ - Branch: `ln/fe-558-ui-foundation` - **Verification approach**: inner — `npm run verify` (lint, format, type-check, all tests, build). Outer — manual visual inspection of interview workspace and project list in dev mode. -5. **Observer agent + entity persistence** — After each answered turn, core invokes a second agent call that extracts decisions and assumptions. Writes to decision/assumption tables with turn linkage and dependency edges. Core yields `observer-complete` DomainEvent **post-commit** (after SQLite transaction); SSE adapter emits as typed data part on existing chat stream (in-band sync per D22). Context builders upgraded to use `md-pen` for structured entity rendering (tables, checklists) in observer context. `not-started` +5. **Observer agent + entity persistence** `FE-537` — After each answered turn, core invokes a second agent call that extracts decisions and assumptions. Writes to decision/assumption tables with turn linkage and dependency edges. Core yields `observer-complete` DomainEvent **post-commit** (after SQLite transaction); SSE adapter emits as typed data part on existing chat stream (in-band sync per D22). Context builders upgraded to use `md-pen` for structured entity rendering (tables, checklists) in observer context. Agent pattern refactored: conductTurn() is thin sequencer, each agent is async generator composed via yield* (D27). Observer uses outputFormat for structured JSON extraction (D28). ResultMessage inspection for agent metrics (D29). `done` - Requirements: → SPEC.md §Requirements #5 - - Assumptions: → SPEC.md §Assumptions A3, A4, A14 (validated by spike), A20 - - Decisions: → SPEC.md §Decisions D22 (in-band sync — observer-complete as data part), D26 (md-pen for markdown rendering) - - Acceptance: answer a scope question, observer extracts decision + assumptions, dependency edges in DB, `observer-complete` event emitted post-commit with entity IDs, extraction within user think time - - **Verification approach**: inner — unit tests for entity writes with dependency edges, observer-complete DomainEvent emission post-commit, SSE adapter data-part encoding. Middle — differential oracle from spike fixtures (observer extraction vs golden master, ≥80% capture). Outer — debug mode: raw observer extraction visible per-turn in UI; fixture capture from confirmed-good manual runs. → SPEC.md §Oracle Strategy, §Observer History Projection, §Acknowledged Blind Spots (extraction variance, cumulative graph integrity) + - Assumptions: → SPEC.md §Assumptions A3, A4, A14 (validated by spike), A20, A24, A25 + - Decisions: → SPEC.md §Decisions D22 (in-band sync — observer-complete as data part), D26 (md-pen), D27 (agent generator composition), D28 (outputFormat), D29 (ResultMessage metrics) + - Invariants established: → SPEC.md §Invariants I20, I21, I22 + - Invariants respected: → SPEC.md §Invariants I1, I5, I6, I9, I10, I12, I13, I17, I19 + - Acceptance: 147 tests pass (24 new); agent pattern refactored; observer persists entities with turn linkage and dependency edges; observer-complete emitted post-commit; SSE adapter encodes as data-observer-result; observer errors non-fatal; context uses md-pen; agent-metrics emitted + - Branch: `ln/fe-537-observer-agent` + - **Verification approach**: inner — unit tests for entity writes with dependency edges, observer-complete DomainEvent emission post-commit, SSE adapter data-part encoding, sdk translateStreamEvents parity, observer-error non-fatality, agent-metrics shape. Middle — differential oracle from spike fixtures (deferred to manual testing). Outer — debug mode and fixture capture (deferred to slice 6). → SPEC.md §Oracle Strategy 6. **Entity sidebar (read-only)** — React sidebar in interview workspace showing decisions, assumptions, requirements, and criteria on the active path. Tabbed display. TanStack Query (`useQuery`) for entity data; cache populated via `queryClient.setQueryData` from `useChat`'s `onData` callback when `observer-complete` data parts arrive (in-band sync per D22). Dependency edges visible. Stale badges for soft-invalidated entities. `not-started` - Requirements: → SPEC.md §Requirements #6 diff --git a/memory/SPEC.md b/memory/SPEC.md index 4ddc4391..50d0e4d4 100644 --- a/memory/SPEC.md +++ b/memory/SPEC.md @@ -166,6 +166,9 @@ The architecture (layered: db → core → adapters): | I17 | Data Part schema validation | Slice 4a (parts persistence) | parts.test.ts (7 tests) | D24 | | I18 | Parts round-trip fidelity | Slice 4a (parts persistence) | parts.test.ts (8 tests), core.test.ts | D23 | | I19 | Context builder equivalence | Slice 4a (parts persistence) | context.test.ts (7 tests) | D25 | +| I20 | Entity persistence with turn linkage | Slice 5 (observer) | db.test.ts (7 tests), observer.test.ts | D4, D5 | +| I21 | Observer-complete post-commit | Slice 5 (observer) | observer.test.ts (6 tests), sse-adapter.test.ts (3 tests) | D22 | +| I22 | Agent generator composition | Slice 5 (observer) | core.test.ts, sdk.test.ts (7 tests) | D27 | ## Lexicon @@ -331,13 +334,15 @@ This projection difference is a deliberate design choice, not an implementation | File | Tests | Protects | | ------------------- | ----- | --------------------------- | -| sse-adapter.test.ts | 18 | I1, I3, I7 | -| db.test.ts | 25 | I5, I6, I9, I10, I11, I18 | +| sse-adapter.test.ts | 21 | I1, I3, I7, I21 | +| db.test.ts | 32 | I5, I6, I9, I10, I11, I18, I20 | | app.test.ts | 22 | I2, I3, I6, I7, I13, I14 | -| core.test.ts | 16 | I12, I13, I18 | +| core.test.ts | 16 | I12, I13, I18, I22 | | interview.test.ts | 16 | I16 | | parts.test.ts | 23 | I17, I18 | -| context.test.ts | 7 | I19 | +| context.test.ts | 8 | I19 | +| sdk.test.ts | 7 | I22 | +| observer.test.ts | 6 | I20, I21 | ## Acceptance Criteria (exit conditions)