Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/core/src/evaluation/providers/copilot-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ export class CopilotCliProvider implements Provider {
attempt: request.attempt,
format: this.config.logFormat ?? 'summary',
headerLabel: 'Copilot CLI (ACP)',
chunkExtractor: extractAcpChunk,
},
summarizeAcpEvent,
);
Expand Down Expand Up @@ -501,6 +502,31 @@ Fix options:
- In .agentv/targets.yaml: executable: \${{ COPILOT_EXE }}`;
}

/**
* Extracts bufferable text from ACP streaming events.
*
* Return values control CopilotStreamLogger buffering:
* string — accumulate this text into the pending buffer
* null — reset (discard) the pending buffer without emitting it
* undefined — not a chunk event; process normally
*
* Copilot ACP sends agent_message_chunk events in two passes:
* 1. A streaming preview batch (before extended thinking)
* 2. agent_thought_chunk events (extended reasoning)
* 3. A final response batch (after extended thinking)
*
* Returning null for agent_thought_chunk discards the preview batch so that
* only the final post-thinking response is emitted as [assistant_message].
*/
function extractAcpChunk(eventType: string, data: unknown): string | null | undefined {
if (eventType === 'agent_thought_chunk') return null;
if (eventType !== 'agent_message_chunk') return undefined;
if (!data || typeof data !== 'object') return undefined;
const d = data as Record<string, unknown>;
const content = d.content as Record<string, unknown> | undefined;
return content?.type === 'text' && typeof content.text === 'string' ? content.text : undefined;
}

function summarizeAcpEvent(eventType: string, data: unknown): string | undefined {
if (!data || typeof data !== 'object') {
return eventType;
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/evaluation/providers/copilot-sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ export class CopilotSdkProvider implements Provider {
attempt: request.attempt,
format: this.config.logFormat ?? 'summary',
headerLabel: 'Copilot SDK',
chunkExtractor: extractSdkChunk,
},
summarizeSdkEvent,
);
Expand Down Expand Up @@ -426,6 +427,19 @@ function normalizeByokBaseUrl(baseUrl: string, type: string): string {
return trimmed;
}

/**
* Extracts bufferable text from SDK assistant.message_delta events.
* Returning a string causes the logger to accumulate the text rather than
* emit a line per delta. A single [assistant_message] line is written once
* all deltas for a turn have arrived (on the next non-chunk event or close).
*/
function extractSdkChunk(eventType: string, data: unknown): string | undefined {
if (eventType !== 'assistant.message_delta') return undefined;
if (!data || typeof data !== 'object') return undefined;
const d = data as Record<string, unknown>;
return typeof d.deltaContent === 'string' ? d.deltaContent : undefined;
}

function summarizeSdkEvent(eventType: string, data: unknown): string | undefined {
if (!data || typeof data !== 'object') {
return eventType;
Expand Down
73 changes: 67 additions & 6 deletions packages/core/src/evaluation/providers/copilot-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,27 @@ export interface StreamLoggerOptions {
readonly attempt?: number;
readonly format: 'summary' | 'json';
readonly headerLabel: string;
/**
* Optional extractor for streaming text chunk events.
*
* When provided, the return value controls how each event is handled:
* - `string` — buffer this text; flush as `[assistant_message]` on the next
* non-chunk event or `close()`.
* - `null` — discard (reset) the accumulated buffer without emitting it.
* Use this for events that signal a new streaming pass is starting,
* e.g. `agent_thought_chunk` in Copilot ACP, which arrives between
* a streaming preview batch and the final response batch.
* - `undefined` — not a chunk event; process normally (flush buffer first, then
* call `summarize` and write the line).
*
* Example (Copilot CLI ACP):
* chunkExtractor: (type, data) => {
* if (type === 'agent_thought_chunk') return null; // reset pre-thinking buffer
* if (type !== 'agent_message_chunk') return undefined;
* return (data as any)?.content?.text ?? undefined;
* }
*/
readonly chunkExtractor?: (eventType: string, data: unknown) => string | null | undefined;
}

export class CopilotStreamLogger {
Expand All @@ -261,23 +282,32 @@ export class CopilotStreamLogger {
private readonly startedAt = Date.now();
private readonly format: 'summary' | 'json';
private readonly summarize: (eventType: string, data: unknown) => string | undefined;
private readonly chunkExtractor?: (eventType: string, data: unknown) => string | null | undefined;
private pendingText = '';

private constructor(
filePath: string,
format: 'summary' | 'json',
summarize: (eventType: string, data: unknown) => string | undefined,
chunkExtractor?: (eventType: string, data: unknown) => string | null | undefined,
) {
this.filePath = filePath;
this.format = format;
this.summarize = summarize;
this.chunkExtractor = chunkExtractor;
this.stream = createWriteStream(filePath, { flags: 'a' });
}

static async create(
options: StreamLoggerOptions,
summarize: (eventType: string, data: unknown) => string | undefined,
): Promise<CopilotStreamLogger> {
const logger = new CopilotStreamLogger(options.filePath, options.format, summarize);
const logger = new CopilotStreamLogger(
options.filePath,
options.format,
summarize,
options.chunkExtractor,
);
const header = [
`# ${options.headerLabel} stream log`,
`# target: ${options.targetName}`,
Expand All @@ -293,18 +323,49 @@ export class CopilotStreamLogger {
}

handleEvent(eventType: string, data: unknown): void {
const elapsed = formatElapsed(this.startedAt);
if (this.format === 'json') {
const elapsed = formatElapsed(this.startedAt);
this.stream.write(`${JSON.stringify({ time: elapsed, event: eventType, data })}\n`);
} else {
const summary = this.summarize(eventType, data);
if (summary) {
this.stream.write(`[+${elapsed}] [${eventType}] ${summary}\n`);
return;
}

// In summary mode, buffer chunk events and emit a single consolidated line.
if (this.chunkExtractor) {
const chunkText = this.chunkExtractor(eventType, data);
if (chunkText === null) {
// Reset signal: discard the accumulated buffer without emitting.
// Used for events like agent_thought_chunk that arrive between a
// streaming preview batch and the final response batch in Copilot ACP —
// the preview text is stale; the real message follows after thinking.
this.pendingText = '';
return;
}
if (chunkText !== undefined) {
this.pendingText += chunkText;
return;
}
// Non-chunk event: flush any accumulated text first.
this.flushPendingText();
}

const elapsed = formatElapsed(this.startedAt);
const summary = this.summarize(eventType, data);
if (summary) {
this.stream.write(`[+${elapsed}] [${eventType}] ${summary}\n`);
}
}

private flushPendingText(): void {
if (!this.pendingText) return;
const elapsed = formatElapsed(this.startedAt);
this.stream.write(`[+${elapsed}] [assistant_message] ${this.pendingText}\n`);
this.pendingText = '';
}

async close(): Promise<void> {
if (this.format !== 'json') {
this.flushPendingText();
}
await new Promise<void>((resolve, reject) => {
this.stream.once('error', reject);
this.stream.end(() => resolve());
Expand Down
185 changes: 185 additions & 0 deletions packages/core/test/evaluation/providers/copilot-stream-logger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import { mkdtemp, readFile, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import path from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';

import { CopilotStreamLogger } from '../../../src/evaluation/providers/copilot-utils.js';

const noopSummarize = (_type: string, _data: unknown): string | undefined => undefined;

describe('CopilotStreamLogger', () => {
let tempDir: string;

beforeEach(async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'copilot-stream-logger-'));
});

afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});

it('writes summary events as separate lines', async () => {
const filePath = path.join(tempDir, 'test.log');
const summarize = (type: string, _data: unknown) =>
type === 'tool_call' ? 'read_file' : undefined;

const logger = await CopilotStreamLogger.create(
{ filePath, targetName: 'test', format: 'summary', headerLabel: 'Test' },
summarize,
);
logger.handleEvent('tool_call', {});
logger.handleEvent('tool_call', {});
await logger.close();

const content = await readFile(filePath, 'utf8');
const lines = content.split('\n').filter((l) => l.includes('[tool_call]'));
expect(lines).toHaveLength(2);
expect(lines[0]).toMatch(/\[tool_call\] read_file/);
});

it('buffers chunk events and flushes as single [assistant_message] line on non-chunk event', async () => {
const filePath = path.join(tempDir, 'test.log');
const summarize = (type: string, _data: unknown) =>
type === 'tool_call' ? 'read_file' : undefined;
const chunkExtractor = (type: string, data: unknown): string | null | undefined => {
if (type !== 'agent_message_chunk') return undefined;
const d = data as Record<string, unknown>;
const content = d?.content as Record<string, unknown> | undefined;
return content?.type === 'text' && typeof content.text === 'string'
? content.text
: undefined;
};

const logger = await CopilotStreamLogger.create(
{ filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor },
summarize,
);

// Three chunks — should NOT produce three log lines
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Hello' } });
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: ' world' } });
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: '!' } });
// Non-chunk event triggers flush
logger.handleEvent('tool_call', {});
await logger.close();

const content = await readFile(filePath, 'utf8');
const lines = content.split('\n').filter((l) => l.trim());

// No raw chunk lines
expect(lines.some((l) => l.includes('[agent_message_chunk]'))).toBe(false);
// One consolidated assistant_message line with full text
const msgLine = lines.find((l) => l.includes('[assistant_message]'));
expect(msgLine).toBeDefined();
expect(msgLine).toMatch(/Hello world!/);
// tool_call still emitted
expect(lines.some((l) => l.includes('[tool_call]'))).toBe(true);
});

it('flushes remaining buffered text on close', async () => {
const filePath = path.join(tempDir, 'test.log');
const chunkExtractor = (type: string, data: unknown): string | null | undefined => {
if (type !== 'agent_message_chunk') return undefined;
const d = data as Record<string, unknown>;
const content = d?.content as Record<string, unknown> | undefined;
return content?.type === 'text' && typeof content.text === 'string'
? content.text
: undefined;
};

const logger = await CopilotStreamLogger.create(
{ filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor },
noopSummarize,
);

logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Final answer' } });
// close() without any subsequent non-chunk event
await logger.close();

const content = await readFile(filePath, 'utf8');
expect(content).toMatch(/\[assistant_message\] Final answer/);
});

it('does not buffer in json format (keeps per-event for full fidelity)', async () => {
const filePath = path.join(tempDir, 'test.log');
const chunkExtractor = (type: string, data: unknown): string | null | undefined => {
if (type !== 'agent_message_chunk') return undefined;
const d = data as Record<string, unknown>;
const content = d?.content as Record<string, unknown> | undefined;
return content?.type === 'text' && typeof content.text === 'string'
? content.text
: undefined;
};

const logger = await CopilotStreamLogger.create(
{ filePath, targetName: 'test', format: 'json', headerLabel: 'Test', chunkExtractor },
noopSummarize,
);

logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'chunk1' } });
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'chunk2' } });
await logger.close();

const content = await readFile(filePath, 'utf8');
const jsonLines = content
.split('\n')
.filter((l) => l.trim().startsWith('{'))
.map((l) => JSON.parse(l));
// Both chunks emitted individually as JSON
expect(jsonLines.filter((e) => e.event === 'agent_message_chunk')).toHaveLength(2);
});

it('handles chunk events with no extractable text gracefully', async () => {
const filePath = path.join(tempDir, 'test.log');
const chunkExtractor = (type: string, _data: unknown): string | null | undefined =>
type === 'agent_message_chunk' ? undefined : undefined;

const logger = await CopilotStreamLogger.create(
{ filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor },
noopSummarize,
);

// Chunks with no extractable text are silently skipped (chunkExtractor returns undefined
// meaning "not a chunk" — treated as non-chunk events, summarize returns undefined, no output)
logger.handleEvent('agent_message_chunk', { content: { type: 'image' } });
await logger.close();

const content = await readFile(filePath, 'utf8');
expect(content).not.toMatch(/\[assistant_message\]/);
});

it('null return from chunkExtractor resets buffer without emitting (handles pre-thinking streaming)', async () => {
const filePath = path.join(tempDir, 'test.log');
// Simulates Copilot ACP: chunks → thought_chunks (reset) → chunks (final)
const chunkExtractor = (type: string, data: unknown): string | null | undefined => {
if (type === 'agent_thought_chunk') return null;
if (type !== 'agent_message_chunk') return undefined;
const d = data as Record<string, unknown>;
const content = d?.content as Record<string, unknown> | undefined;
return content?.type === 'text' && typeof content.text === 'string'
? content.text
: undefined;
};

const logger = await CopilotStreamLogger.create(
{ filePath, targetName: 'test', format: 'summary', headerLabel: 'Test', chunkExtractor },
noopSummarize,
);

// First pass: streaming preview (should be discarded)
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Hi' } });
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: ' there.' } });
// Extended thinking: resets the buffer
logger.handleEvent('agent_thought_chunk', {});
logger.handleEvent('agent_thought_chunk', {});
// Second pass: final response
logger.handleEvent('agent_message_chunk', { content: { type: 'text', text: 'Hi there.' } });
await logger.close();

const content = await readFile(filePath, 'utf8');
const msgLines = content.split('\n').filter((l) => l.includes('[assistant_message]'));
// Only one consolidated line — the final response, not the preview
expect(msgLines).toHaveLength(1);
expect(msgLines[0]).toMatch(/\[assistant_message\] Hi there\.$/);
});
});
Loading