diff --git a/packages/core/src/evaluation/providers/copilot-cli.ts b/packages/core/src/evaluation/providers/copilot-cli.ts index 31684af05..bc70b704d 100644 --- a/packages/core/src/evaluation/providers/copilot-cli.ts +++ b/packages/core/src/evaluation/providers/copilot-cli.ts @@ -423,6 +423,7 @@ export class CopilotCliProvider implements Provider { attempt: request.attempt, format: this.config.logFormat ?? 'summary', headerLabel: 'Copilot CLI (ACP)', + chunkExtractor: extractAcpChunk, }, summarizeAcpEvent, ); @@ -501,6 +502,31 @@ Fix options: - In .agentv/targets.yaml: executable: \${{ COPILOT_EXE }}`; } +/** + * Extracts bufferable text from ACP streaming events. + * + * Return values control CopilotStreamLogger buffering: + * string — accumulate this text into the pending buffer + * null — reset (discard) the pending buffer without emitting it + * undefined — not a chunk event; process normally + * + * Copilot ACP sends agent_message_chunk events in two passes: + * 1. A streaming preview batch (before extended thinking) + * 2. agent_thought_chunk events (extended reasoning) + * 3. A final response batch (after extended thinking) + * + * Returning null for agent_thought_chunk discards the preview batch so that + * only the final post-thinking response is emitted as [assistant_message]. + */ +function extractAcpChunk(eventType: string, data: unknown): string | null | undefined { + if (eventType === 'agent_thought_chunk') return null; + if (eventType !== 'agent_message_chunk') return undefined; + if (!data || typeof data !== 'object') return undefined; + const d = data as Record; + const content = d.content as Record | undefined; + return content?.type === 'text' && typeof content.text === 'string' ? content.text : undefined; +} + function summarizeAcpEvent(eventType: string, data: unknown): string | undefined { if (!data || typeof data !== 'object') { return eventType; diff --git a/packages/core/src/evaluation/providers/copilot-sdk.ts b/packages/core/src/evaluation/providers/copilot-sdk.ts index b18a10a65..ac903f8e1 100644 --- a/packages/core/src/evaluation/providers/copilot-sdk.ts +++ b/packages/core/src/evaluation/providers/copilot-sdk.ts @@ -378,6 +378,7 @@ export class CopilotSdkProvider implements Provider { attempt: request.attempt, format: this.config.logFormat ?? 'summary', headerLabel: 'Copilot SDK', + chunkExtractor: extractSdkChunk, }, summarizeSdkEvent, ); @@ -426,6 +427,19 @@ function normalizeByokBaseUrl(baseUrl: string, type: string): string { return trimmed; } +/** + * Extracts bufferable text from SDK assistant.message_delta events. + * Returning a string causes the logger to accumulate the text rather than + * emit a line per delta. A single [assistant_message] line is written once + * all deltas for a turn have arrived (on the next non-chunk event or close). + */ +function extractSdkChunk(eventType: string, data: unknown): string | undefined { + if (eventType !== 'assistant.message_delta') return undefined; + if (!data || typeof data !== 'object') return undefined; + const d = data as Record; + return typeof d.deltaContent === 'string' ? d.deltaContent : undefined; +} + function summarizeSdkEvent(eventType: string, data: unknown): string | undefined { if (!data || typeof data !== 'object') { return eventType; diff --git a/packages/core/src/evaluation/providers/copilot-utils.ts b/packages/core/src/evaluation/providers/copilot-utils.ts index 6a184de79..43bb8ae55 100644 --- a/packages/core/src/evaluation/providers/copilot-utils.ts +++ b/packages/core/src/evaluation/providers/copilot-utils.ts @@ -253,6 +253,27 @@ export interface StreamLoggerOptions { readonly attempt?: number; readonly format: 'summary' | 'json'; readonly headerLabel: string; + /** + * Optional extractor for streaming text chunk events. + * + * When provided, the return value controls how each event is handled: + * - `string` — buffer this text; flush as `[assistant_message]` on the next + * non-chunk event or `close()`. + * - `null` — discard (reset) the accumulated buffer without emitting it. + * Use this for events that signal a new streaming pass is starting, + * e.g. `agent_thought_chunk` in Copilot ACP, which arrives between + * a streaming preview batch and the final response batch. + * - `undefined` — not a chunk event; process normally (flush buffer first, then + * call `summarize` and write the line). + * + * Example (Copilot CLI ACP): + * chunkExtractor: (type, data) => { + * if (type === 'agent_thought_chunk') return null; // reset pre-thinking buffer + * if (type !== 'agent_message_chunk') return undefined; + * return (data as any)?.content?.text ?? undefined; + * } + */ + readonly chunkExtractor?: (eventType: string, data: unknown) => string | null | undefined; } export class CopilotStreamLogger { @@ -261,15 +282,19 @@ export class CopilotStreamLogger { private readonly startedAt = Date.now(); private readonly format: 'summary' | 'json'; private readonly summarize: (eventType: string, data: unknown) => string | undefined; + private readonly chunkExtractor?: (eventType: string, data: unknown) => string | null | undefined; + private pendingText = ''; private constructor( filePath: string, format: 'summary' | 'json', summarize: (eventType: string, data: unknown) => string | undefined, + chunkExtractor?: (eventType: string, data: unknown) => string | null | undefined, ) { this.filePath = filePath; this.format = format; this.summarize = summarize; + this.chunkExtractor = chunkExtractor; this.stream = createWriteStream(filePath, { flags: 'a' }); } @@ -277,7 +302,12 @@ export class CopilotStreamLogger { options: StreamLoggerOptions, summarize: (eventType: string, data: unknown) => string | undefined, ): Promise { - const logger = new CopilotStreamLogger(options.filePath, options.format, summarize); + const logger = new CopilotStreamLogger( + options.filePath, + options.format, + summarize, + options.chunkExtractor, + ); const header = [ `# ${options.headerLabel} stream log`, `# target: ${options.targetName}`, @@ -293,18 +323,49 @@ export class CopilotStreamLogger { } handleEvent(eventType: string, data: unknown): void { - const elapsed = formatElapsed(this.startedAt); if (this.format === 'json') { + const elapsed = formatElapsed(this.startedAt); this.stream.write(`${JSON.stringify({ time: elapsed, event: eventType, data })}\n`); - } else { - const summary = this.summarize(eventType, data); - if (summary) { - this.stream.write(`[+${elapsed}] [${eventType}] ${summary}\n`); + return; + } + + // In summary mode, buffer chunk events and emit a single consolidated line. + if (this.chunkExtractor) { + const chunkText = this.chunkExtractor(eventType, data); + if (chunkText === null) { + // Reset signal: discard the accumulated buffer without emitting. + // Used for events like agent_thought_chunk that arrive between a + // streaming preview batch and the final response batch in Copilot ACP — + // the preview text is stale; the real message follows after thinking. + this.pendingText = ''; + return; + } + if (chunkText !== undefined) { + this.pendingText += chunkText; + return; } + // Non-chunk event: flush any accumulated text first. + this.flushPendingText(); + } + + const elapsed = formatElapsed(this.startedAt); + const summary = this.summarize(eventType, data); + if (summary) { + this.stream.write(`[+${elapsed}] [${eventType}] ${summary}\n`); } } + private flushPendingText(): void { + if (!this.pendingText) return; + const elapsed = formatElapsed(this.startedAt); + this.stream.write(`[+${elapsed}] [assistant_message] ${this.pendingText}\n`); + this.pendingText = ''; + } + async close(): Promise { + if (this.format !== 'json') { + this.flushPendingText(); + } await new Promise((resolve, reject) => { this.stream.once('error', reject); this.stream.end(() => resolve()); diff --git a/packages/core/test/evaluation/providers/copilot-stream-logger.test.ts b/packages/core/test/evaluation/providers/copilot-stream-logger.test.ts new file mode 100644 index 000000000..926e57d61 --- /dev/null +++ b/packages/core/test/evaluation/providers/copilot-stream-logger.test.ts @@ -0,0 +1,185 @@ +import { mkdtemp, readFile, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { CopilotStreamLogger } from '../../../src/evaluation/providers/copilot-utils.js'; + +const noopSummarize = (_type: string, _data: unknown): string | undefined => undefined; + +describe('CopilotStreamLogger', () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(path.join(tmpdir(), 'copilot-stream-logger-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('writes summary events as separate lines', async () => { + const filePath = path.join(tempDir, 'test.log'); + const summarize = (type: string, _data: unknown) => + type === 'tool_call' ? 'read_file' : undefined; + + const logger = await CopilotStreamLogger.create( + { filePath, targetName: 'test', format: 'summary', headerLabel: 'Test' }, + summarize, + ); + logger.handleEvent('tool_call', {}); + logger.handleEvent('tool_call', {}); + await logger.close(); + + const content = await readFile(filePath, 'utf8'); + const lines = content.split('\n').filter((l) => l.includes('[tool_call]')); + expect(lines).toHaveLength(2); + expect(lines[0]).toMatch(/\[tool_call\] read_file/); + }); + + it('buffers chunk events and flushes as single [assistant_message] line on non-chunk event', async () => { + const filePath = path.join(tempDir, 'test.log'); + const summarize = (type: string, _data: unknown) => + type === 'tool_call' ? 'read_file' : undefined; + const chunkExtractor = (type: string, data: unknown): string | null | undefined => { + if (type !== 'agent_message_chunk') return undefined; + const d = data as Record; + const content = d?.content as Record | undefined; + return content?.type === 'text' && typeof content.text === 'string' + ? content.text + : undefined; + }; + + const logger = await CopilotStreamLogger.create( + { filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor }, + summarize, + ); + + // Three chunks — should NOT produce three log lines + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Hello' } }); + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: ' world' } }); + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: '!' } }); + // Non-chunk event triggers flush + logger.handleEvent('tool_call', {}); + await logger.close(); + + const content = await readFile(filePath, 'utf8'); + const lines = content.split('\n').filter((l) => l.trim()); + + // No raw chunk lines + expect(lines.some((l) => l.includes('[agent_message_chunk]'))).toBe(false); + // One consolidated assistant_message line with full text + const msgLine = lines.find((l) => l.includes('[assistant_message]')); + expect(msgLine).toBeDefined(); + expect(msgLine).toMatch(/Hello world!/); + // tool_call still emitted + expect(lines.some((l) => l.includes('[tool_call]'))).toBe(true); + }); + + it('flushes remaining buffered text on close', async () => { + const filePath = path.join(tempDir, 'test.log'); + const chunkExtractor = (type: string, data: unknown): string | null | undefined => { + if (type !== 'agent_message_chunk') return undefined; + const d = data as Record; + const content = d?.content as Record | undefined; + return content?.type === 'text' && typeof content.text === 'string' + ? content.text + : undefined; + }; + + const logger = await CopilotStreamLogger.create( + { filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor }, + noopSummarize, + ); + + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Final answer' } }); + // close() without any subsequent non-chunk event + await logger.close(); + + const content = await readFile(filePath, 'utf8'); + expect(content).toMatch(/\[assistant_message\] Final answer/); + }); + + it('does not buffer in json format (keeps per-event for full fidelity)', async () => { + const filePath = path.join(tempDir, 'test.log'); + const chunkExtractor = (type: string, data: unknown): string | null | undefined => { + if (type !== 'agent_message_chunk') return undefined; + const d = data as Record; + const content = d?.content as Record | undefined; + return content?.type === 'text' && typeof content.text === 'string' + ? content.text + : undefined; + }; + + const logger = await CopilotStreamLogger.create( + { filePath, targetName: 'test', format: 'json', headerLabel: 'Test', chunkExtractor }, + noopSummarize, + ); + + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'chunk1' } }); + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'chunk2' } }); + await logger.close(); + + const content = await readFile(filePath, 'utf8'); + const jsonLines = content + .split('\n') + .filter((l) => l.trim().startsWith('{')) + .map((l) => JSON.parse(l)); + // Both chunks emitted individually as JSON + expect(jsonLines.filter((e) => e.event === 'agent_message_chunk')).toHaveLength(2); + }); + + it('handles chunk events with no extractable text gracefully', async () => { + const filePath = path.join(tempDir, 'test.log'); + const chunkExtractor = (type: string, _data: unknown): string | null | undefined => + type === 'agent_message_chunk' ? undefined : undefined; + + const logger = await CopilotStreamLogger.create( + { filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor }, + noopSummarize, + ); + + // Chunks with no extractable text are silently skipped (chunkExtractor returns undefined + // meaning "not a chunk" — treated as non-chunk events, summarize returns undefined, no output) + logger.handleEvent('agent_message_chunk', { content: { type: 'image' } }); + await logger.close(); + + const content = await readFile(filePath, 'utf8'); + expect(content).not.toMatch(/\[assistant_message\]/); + }); + + it('null return from chunkExtractor resets buffer without emitting (handles pre-thinking streaming)', async () => { + const filePath = path.join(tempDir, 'test.log'); + // Simulates Copilot ACP: chunks → thought_chunks (reset) → chunks (final) + const chunkExtractor = (type: string, data: unknown): string | null | undefined => { + if (type === 'agent_thought_chunk') return null; + if (type !== 'agent_message_chunk') return undefined; + const d = data as Record; + const content = d?.content as Record | undefined; + return content?.type === 'text' && typeof content.text === 'string' + ? content.text + : undefined; + }; + + const logger = await CopilotStreamLogger.create( + { filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor }, + noopSummarize, + ); + + // First pass: streaming preview (should be discarded) + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Hi' } }); + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: ' there.' } }); + // Extended thinking: resets the buffer + logger.handleEvent('agent_thought_chunk', {}); + logger.handleEvent('agent_thought_chunk', {}); + // Second pass: final response + logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Hi there.' } }); + await logger.close(); + + const content = await readFile(filePath, 'utf8'); + const msgLines = content.split('\n').filter((l) => l.includes('[assistant_message]')); + // Only one consolidated line — the final response, not the preview + expect(msgLines).toHaveLength(1); + expect(msgLines[0]).toMatch(/\[assistant_message\] Hi there\.$/); + }); +});