From cbe56f0078dba498de8d643f6679b4eef5502838 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 18 May 2026 14:47:32 -0700 Subject: [PATCH 1/5] feat: timing-aware recording and replay with --replay-speed multiplier --- src/cli.ts | 9 ++++++ src/fixture-loader.ts | 24 +++++++++++++-- src/recorder.ts | 69 +++++++++++++++++++++++++++++++++++++++++++ src/sse-writer.ts | 59 +++++++++++++++++++++++++++--------- src/types.ts | 17 +++++++++++ 5 files changed, 161 insertions(+), 17 deletions(-) diff --git a/src/cli.ts b/src/cli.ts index 6592379..9834bfd 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -44,6 +44,7 @@ Options: --agui-record Enable AG-UI recording (proxy unmatched AG-UI requests) --agui-upstream Upstream AG-UI agent URL (used with --agui-record) --agui-proxy-only AG-UI proxy mode: forward without saving + --replay-speed Replay speed multiplier (default: 1.0, 2.0 = 2x faster) --chaos-drop Probability (0-1) of dropping requests with 500 --chaos-malformed Probability (0-1) of returning malformed JSON --chaos-disconnect Probability (0-1) of destroying connection @@ -78,6 +79,7 @@ const { values } = parseArgs({ "agui-record": { type: "boolean", default: false }, "agui-upstream": { type: "string" }, "agui-proxy-only": { type: "boolean", default: false }, + "replay-speed": { type: "string", default: "1.0" }, "chaos-drop": { type: "string" }, "chaos-malformed": { type: "string" }, "chaos-disconnect": { type: "string" }, @@ -124,6 +126,12 @@ if (Number.isNaN(chunkSize) || chunkSize < 1) { process.exit(1); } +const replaySpeed = Number(values["replay-speed"]); +if (Number.isNaN(replaySpeed) || replaySpeed <= 0) { + console.error("--replay-speed must be a positive number"); + process.exit(1); +} + const journalMax = Number(values["journal-max"]); if (Number.isNaN(journalMax) || !Number.isInteger(journalMax) || journalMax < 0) { console.error( @@ -370,6 +378,7 @@ async function main() { host, latency, chunkSize, + replaySpeed, logLevel, chaos, metrics: values.metrics, diff --git a/src/fixture-loader.ts b/src/fixture-loader.ts index fd9b368..adedd8a 100644 --- a/src/fixture-loader.ts +++ b/src/fixture-loader.ts @@ -49,8 +49,8 @@ export function normalizeResponse(raw: FixtureFileResponse): FixtureResponse { return response as unknown as FixtureResponse; } -export function entryToFixture(entry: FixtureFileEntry): Fixture { - return { +export function entryToFixture(entry: FixtureFileEntry, logger?: Logger): Fixture { + const fixture: Fixture = { match: { userMessage: entry.match.userMessage, systemMessage: entry.match.systemMessage, @@ -76,9 +76,27 @@ export function entryToFixture(entry: FixtureFileEntry): Fixture { }), ...(entry.disconnectAfterMs !== undefined && { disconnectAfterMs: entry.disconnectAfterMs }), ...(entry.streamingProfile !== undefined && { streamingProfile: entry.streamingProfile }), + ...(entry.recordedTimings !== undefined && { recordedTimings: entry.recordedTimings }), + ...(entry.replaySpeed != null && { replaySpeed: entry.replaySpeed }), ...(entry.chaos !== undefined && { chaos: entry.chaos }), ...(entry.metadata !== undefined && { metadata: entry.metadata }), }; + + // Sanitize recordedTimings to guard against NaN or negative values that + // would silently degrade replay timing calculations. + if (fixture.recordedTimings) { + const rt = fixture.recordedTimings; + if (!Number.isFinite(rt.ttftMs) || rt.ttftMs < 0) rt.ttftMs = 0; + rt.interChunkDelaysMs = rt.interChunkDelaysMs.filter((d) => Number.isFinite(d) && d >= 0); + if (!Number.isFinite(rt.totalDurationMs) || rt.totalDurationMs < 0) rt.totalDurationMs = 0; + } + + if (fixture.replaySpeed != null && fixture.replaySpeed <= 0) { + logger?.warn(`Fixture replaySpeed must be positive, got ${fixture.replaySpeed}. Ignoring.`); + delete fixture.replaySpeed; + } + + return fixture; } // Logging helper — uses logger if provided, falls back to console.warn. @@ -116,7 +134,7 @@ export function loadFixtureFile(filePath: string, logger?: Logger): Fixture[] { return []; } - return (parsed as FixtureFile).fixtures.map(entryToFixture); + return (parsed as FixtureFile).fixtures.map((e) => entryToFixture(e, logger)); } export function loadFixturesFromDir(dirPath: string, logger?: Logger): Fixture[] { diff --git a/src/recorder.ts b/src/recorder.ts index a9c977f..990accd 100644 --- a/src/recorder.ts +++ b/src/recorder.ts @@ -8,6 +8,7 @@ import type { Fixture, FixtureResponse, RecordConfig, + RecordedTimings, RecordProviderKey, ToolCall, } from "./types.js"; @@ -288,6 +289,8 @@ export async function proxyAndRecord( // skip the final res.writeHead/res.end relay at the bottom of this fn. let streamedToClient = false; let clientDisconnected = false; + let frameTimestamps: number[] = []; + let streamStartTime = 0; try { const result = await makeUpstreamRequest( target, @@ -304,6 +307,8 @@ export async function proxyAndRecord( rawBuffer = result.rawBuffer; streamedToClient = result.streamedToClient; clientDisconnected = result.clientDisconnected; + frameTimestamps = result.frameTimestamps; + streamStartTime = result.streamStartTime; } catch (err) { const msg = err instanceof Error ? err.message : "Unknown proxy error"; defaults.logger.error(`Proxy request failed: ${msg}`); @@ -454,11 +459,25 @@ export async function proxyAndRecord( return "relayed"; } + // Build RecordedTimings from frame timestamps captured during streaming. + // Requires at least 2 timestamps (first frame + at least one more) to + // produce meaningful timing data. + let recordedTimings: RecordedTimings | undefined; + if (frameTimestamps.length > 1) { + const ts = frameTimestamps; + recordedTimings = { + ttftMs: ts[0] - streamStartTime, + interChunkDelaysMs: ts.slice(1).map((t, i) => t - ts[i]), + totalDurationMs: ts[ts.length - 1] - streamStartTime, + }; + } + const matchRequest = defaults.requestTransform ? defaults.requestTransform(request) : request; const metadata = buildFixtureMetadata(request); const fixture: Fixture = { match: buildFixtureMatch(matchRequest, defaults.record), response: fixtureResponse, + ...(recordedTimings && { recordedTimings }), ...(metadata && { metadata }), }; @@ -571,6 +590,8 @@ function makeUpstreamRequest( rawBuffer: Buffer; streamedToClient: boolean; clientDisconnected: boolean; + frameTimestamps: number[]; + streamStartTime: number; }> { return new Promise((resolve, reject) => { const transport = target.protocol === "https:" ? https : http; @@ -603,6 +624,13 @@ function makeUpstreamRequest( const isNDJSON = ctLower.includes("application/x-ndjson"); const isBinaryEventStream = ctLower.includes("application/vnd.amazon.eventstream"); const isProgressiveStream = isSSE || isNDJSON || isBinaryEventStream; + // SSE/NDJSON frame timing capture — timestamps each complete frame + // so proxyAndRecord can build RecordedTimings for the fixture. + const frameTimestamps: number[] = []; + const streamStartTime = Date.now(); + let frameBuffer = ""; + let binaryFrameBuffer = Buffer.alloc(0); + let streamedToClient = false; let clientDisconnected = false; if (isProgressiveStream && clientRes && !clientRes.headersSent) { @@ -635,6 +663,38 @@ function makeUpstreamRequest( const chunks: Buffer[] = []; res.on("data", (chunk: Buffer) => { chunks.push(chunk); + + // Capture per-frame timestamps for SSE/NDJSON streams. + // TCP data events don't align with SSE frames — buffer and + // split on the protocol delimiter to timestamp each complete frame. + if (isSSE || isNDJSON) { + frameBuffer += chunk.toString(); + const delimiter = isNDJSON ? "\n" : "\n\n"; + const parts = frameBuffer.split(delimiter); + // All complete frames (everything except the last part which + // may be incomplete). + for (let fi = 0; fi < parts.length - 1; fi++) { + if (parts[fi].trim().length > 0) { + frameTimestamps.push(Date.now()); + } + } + // Last part stays in buffer (may be incomplete) + frameBuffer = parts[parts.length - 1]; + } + + // Binary EventStream frame boundary detection — parse the 4-byte + // total-length prefix to detect complete frames without decoding + // frame contents (CRC validation happens in stream-collapse). + if (isBinaryEventStream) { + binaryFrameBuffer = Buffer.concat([binaryFrameBuffer, chunk]); + while (binaryFrameBuffer.length >= 4) { + const totalLen = binaryFrameBuffer.readUInt32BE(0); + if (totalLen < 12 || binaryFrameBuffer.length < totalLen) break; + frameTimestamps.push(Date.now()); + binaryFrameBuffer = binaryFrameBuffer.subarray(totalLen); + } + } + if ( streamedToClient && clientRes && @@ -655,6 +715,13 @@ function makeUpstreamRequest( res.on("error", reject); res.on("end", () => { if (res.socket) res.setTimeout(0); + // Flush remaining text frame buffer — captures the last frame if + // the stream ended without a trailing delimiter. Binary EventStream + // frames are length-prefixed so partial frames at end-of-stream are + // genuinely incomplete and should not be timestamped. + if ((isSSE || isNDJSON) && frameBuffer.trim().length > 0) { + frameTimestamps.push(Date.now()); + } const rawBuffer = Buffer.concat(chunks); if ( streamedToClient && @@ -678,6 +745,8 @@ function makeUpstreamRequest( rawBuffer, streamedToClient, clientDisconnected, + frameTimestamps, + streamStartTime, }); }); }, diff --git a/src/sse-writer.ts b/src/sse-writer.ts index c401acb..072e889 100644 --- a/src/sse-writer.ts +++ b/src/sse-writer.ts @@ -1,5 +1,5 @@ import type * as http from "node:http"; -import type { SSEChunk, StreamingProfile } from "./types.js"; +import type { SSEChunk, StreamingProfile, RecordedTimings } from "./types.js"; export function delay(ms: number, signal?: AbortSignal): Promise { if (ms <= 0 || signal?.aborted) return Promise.resolve(); @@ -19,6 +19,8 @@ export function delay(ms: number, signal?: AbortSignal): Promise { export interface StreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; /** When set, emitted as the final chunk before [DONE] (OpenAI stream_options.include_usage). */ @@ -29,23 +31,51 @@ export function calculateDelay( chunkIndex: number, profile?: StreamingProfile, fallbackLatency?: number, + recordedTimings?: RecordedTimings, + replaySpeed?: number, ): number { - if (!profile) return fallbackLatency ?? 0; - + const speed = replaySpeed ?? 1.0; let delayMs: number; - if (chunkIndex === 0 && profile.ttft !== undefined) { - delayMs = profile.ttft; - } else if (profile.tps !== undefined && profile.tps > 0) { - delayMs = 1000 / profile.tps; - } else { - return fallbackLatency ?? 0; - } - if (profile.jitter && profile.jitter > 0) { - delayMs *= 1 + (Math.random() * 2 - 1) * profile.jitter; + if (profile) { + // StreamingProfile has highest precedence + let fromProfile = true; + if (chunkIndex === 0 && profile.ttft !== undefined) { + delayMs = profile.ttft; + } else if (profile.tps !== undefined && profile.tps > 0) { + delayMs = 1000 / profile.tps; + } else { + delayMs = fallbackLatency ?? 0; + fromProfile = false; + } + // Jitter only applies when the delay came from ttft/tps, not fallback + if (fromProfile && profile.jitter && profile.jitter > 0) { + delayMs *= 1 + (Math.random() * 2 - 1) * profile.jitter; + if (delayMs < 0) delayMs = 0; + } + } else if (recordedTimings) { + // Recorded timings (second precedence) + if (chunkIndex === 0) { + delayMs = recordedTimings.ttftMs; + } else { + const idx = chunkIndex - 1; + if (idx < recordedTimings.interChunkDelaysMs.length) { + delayMs = recordedTimings.interChunkDelaysMs[idx]; + } else { + // Excess chunks: derive average from recorded inter-chunk delays + const totalInterChunk = recordedTimings.interChunkDelaysMs.reduce((a, b) => a + b, 0); + delayMs = + recordedTimings.interChunkDelaysMs.length > 0 + ? totalInterChunk / recordedTimings.interChunkDelaysMs.length + : 0; + } + } + } else { + delayMs = fallbackLatency ?? 0; } - return Math.max(0, delayMs); + delayMs = Math.max(0, delayMs); + return speed > 0 ? delayMs / speed : delayMs; } export async function writeSSEStream( @@ -57,6 +87,7 @@ export async function writeSSEStream( typeof optionsOrLatency === "number" ? { latency: optionsOrLatency } : (optionsOrLatency ?? {}); const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -67,7 +98,7 @@ export async function writeSSEStream( let chunkIndex = 0; for (const chunk of chunks) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) { await delay(chunkDelay, signal); } diff --git a/src/types.ts b/src/types.ts index 35c3ef7..997d435 100644 --- a/src/types.ts +++ b/src/types.ts @@ -268,6 +268,17 @@ export interface StreamingProfile { jitter?: number; // Random variance factor (0-1), default 0 } +/** + * Per-frame arrival timestamps captured during proxy recording. + * Used during replay to reproduce real-world streaming timing instead of + * the synthetic model (StreamingProfile / flat latency). + */ +export interface RecordedTimings { + ttftMs: number; + interChunkDelaysMs: number[]; + totalDurationMs: number; +} + /** * Probabilistic chaos injection rates. * @@ -301,6 +312,8 @@ export interface Fixture { truncateAfterChunks?: number; disconnectAfterMs?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; chaos?: ChaosConfig; metadata?: { systemHash?: string; @@ -401,6 +414,8 @@ export interface FixtureFileEntry { truncateAfterChunks?: number; disconnectAfterMs?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; chaos?: ChaosConfig; metadata?: { systemHash?: string; @@ -558,6 +573,7 @@ export interface MockServerOptions { host?: string; latency?: number; chunkSize?: number; + replaySpeed?: number; /** Log verbosity. CLI default is "info"; programmatic default (when omitted) is "silent". */ logLevel?: "silent" | "warn" | "info" | "debug"; chaos?: ChaosConfig; @@ -638,6 +654,7 @@ export interface FalQueueConfig { export interface HandlerDefaults { latency: number; chunkSize: number; + replaySpeed: number; logger: Logger; chaos?: ChaosConfig; registry?: MetricsRegistry; From 9eab754619276ffec9cc669e2ef8c4629c90e35a Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 18 May 2026 14:47:33 -0700 Subject: [PATCH 2/5] feat: thread recorded timings through all 11 streaming paths --- src/aws-event-stream.ts | 7 +++-- src/bedrock-converse.ts | 6 ++++ src/bedrock.ts | 6 ++++ src/cohere.ts | 12 +++++++- src/gemini-interactions.ts | 13 ++++++++- src/gemini.ts | 15 +++++++++- src/llmock.ts | 2 +- src/messages.ts | 12 +++++++- src/ndjson-writer.ts | 7 +++-- src/ollama.ts | 8 ++++++ src/responses.ts | 14 +++++++++- src/server.ts | 9 +++++- src/ws-gemini-live.ts | 28 +++++++++++++++---- src/ws-realtime.ts | 56 ++++++++++++++++++++++++++++++++++---- src/ws-responses.ts | 17 ++++++++++-- 15 files changed, 189 insertions(+), 23 deletions(-) diff --git a/src/aws-event-stream.ts b/src/aws-event-stream.ts index 5dc04fe..b370e7c 100644 --- a/src/aws-event-stream.ts +++ b/src/aws-event-stream.ts @@ -17,7 +17,7 @@ import { crc32 } from "node:zlib"; import type * as http from "node:http"; -import type { StreamingProfile } from "./types.js"; +import type { StreamingProfile, RecordedTimings } from "./types.js"; import { delay, calculateDelay } from "./sse-writer.js"; // ─── Header encoding ──────────────────────────────────────────────────────── @@ -121,6 +121,8 @@ export async function writeEventStream( options?: { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; }, @@ -128,6 +130,7 @@ export async function writeEventStream( const opts = options ?? {}; const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -137,7 +140,7 @@ export async function writeEventStream( let chunkIndex = 0; for (const event of events) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) { await delay(chunkDelay, signal); } diff --git a/src/bedrock-converse.ts b/src/bedrock-converse.ts index 61be57c..4077cd9 100644 --- a/src/bedrock-converse.ts +++ b/src/bedrock-converse.ts @@ -1022,6 +1022,8 @@ export async function handleConverseStream( const completed = await writeEventStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1059,6 +1061,8 @@ export async function handleConverseStream( const completed = await writeEventStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1096,6 +1100,8 @@ export async function handleConverseStream( const completed = await writeEventStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/bedrock.ts b/src/bedrock.ts index ee72848..543a91c 100644 --- a/src/bedrock.ts +++ b/src/bedrock.ts @@ -1201,6 +1201,8 @@ export async function handleBedrockStream( const completed = await writeEventStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1237,6 +1239,8 @@ export async function handleBedrockStream( const completed = await writeEventStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1273,6 +1277,8 @@ export async function handleBedrockStream( const completed = await writeEventStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/cohere.ts b/src/cohere.ts index 7da2d24..3461a80 100644 --- a/src/cohere.ts +++ b/src/cohere.ts @@ -15,6 +15,7 @@ import type { ChatMessage, Fixture, HandlerDefaults, + RecordedTimings, ResponseOverrides, StreamingProfile, ToolCall, @@ -705,6 +706,8 @@ function buildCohereContentWithToolCallsStreamEvents( interface CohereStreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; } @@ -718,6 +721,7 @@ async function writeCohereSSEStream( typeof optionsOrLatency === "number" ? { latency: optionsOrLatency } : (optionsOrLatency ?? {}); const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -728,7 +732,7 @@ async function writeCohereSSEStream( let chunkIndex = 0; for (const event of events) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) await delay(chunkDelay, signal); if (signal?.aborted) return false; if (res.writableEnded) return true; @@ -1001,6 +1005,8 @@ export async function handleCohere( const completed = await writeCohereSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1044,6 +1050,8 @@ export async function handleCohere( const completed = await writeCohereSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1087,6 +1095,8 @@ export async function handleCohere( const completed = await writeCohereSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/gemini-interactions.ts b/src/gemini-interactions.ts index 0a4408b..a310179 100644 --- a/src/gemini-interactions.ts +++ b/src/gemini-interactions.ts @@ -13,6 +13,7 @@ import type { ChatMessage, Fixture, HandlerDefaults, + RecordedTimings, ResponseOverrides, StreamingProfile, ToolCall, @@ -583,6 +584,8 @@ export function buildInteractionsContentWithToolCallsSSEEvents( interface InteractionsStreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; } @@ -596,6 +599,7 @@ export async function writeGeminiInteractionsSSEStream( typeof optionsOrLatency === "number" ? { latency: optionsOrLatency } : (optionsOrLatency ?? {}); const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -606,7 +610,7 @@ export async function writeGeminiInteractionsSSEStream( let chunkIndex = 0; for (const event of events) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) await delay(chunkDelay, signal); if (signal?.aborted) return false; if (res.writableEnded) return true; @@ -773,6 +777,7 @@ export async function handleGeminiInteractions( const response = await resolveResponse(fixture, completionReq); const latency = fixture.latency ?? defaults.latency; const chunkSize = Math.max(1, fixture.chunkSize ?? defaults.chunkSize); + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; // Error response if (isErrorResponse(response)) { @@ -836,6 +841,8 @@ export async function handleGeminiInteractions( const completed = await writeGeminiInteractionsSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -879,6 +886,8 @@ export async function handleGeminiInteractions( const completed = await writeGeminiInteractionsSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -923,6 +932,8 @@ export async function handleGeminiInteractions( const completed = await writeGeminiInteractionsSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/gemini.ts b/src/gemini.ts index be30721..0b98f36 100644 --- a/src/gemini.ts +++ b/src/gemini.ts @@ -13,6 +13,7 @@ import type { ChatMessage, Fixture, HandlerDefaults, + RecordedTimings, RecordProviderKey, ResponseOverrides, StreamingProfile, @@ -542,6 +543,8 @@ function buildGeminiAudioStreamChunks(audio: AudioResponse): GeminiResponseChunk interface GeminiStreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; } @@ -555,6 +558,7 @@ async function writeGeminiSSEStream( typeof optionsOrLatency === "number" ? { latency: optionsOrLatency } : (optionsOrLatency ?? {}); const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -565,7 +569,7 @@ async function writeGeminiSSEStream( let chunkIndex = 0; for (const chunk of chunks) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) await delay(chunkDelay, signal); if (signal?.aborted) return false; if (res.writableEnded) return true; @@ -748,6 +752,7 @@ export async function handleGemini( const response = await resolveResponse(fixture, completionReq); const latency = fixture.latency ?? defaults.latency; const chunkSize = Math.max(1, fixture.chunkSize ?? defaults.chunkSize); + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; // Error response if (isErrorResponse(response)) { @@ -792,6 +797,8 @@ export async function handleGemini( const completed = await writeGeminiSSEStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -841,6 +848,8 @@ export async function handleGemini( const completed = await writeGeminiSSEStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -882,6 +891,8 @@ export async function handleGemini( const completed = await writeGeminiSSEStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -918,6 +929,8 @@ export async function handleGemini( const completed = await writeGeminiSSEStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/llmock.ts b/src/llmock.ts index d7ce2df..02e376e 100644 --- a/src/llmock.ts +++ b/src/llmock.ts @@ -81,7 +81,7 @@ export class LLMock { */ addFixturesFromJSON(input: string | FixtureFileEntry[]): this { const entries: FixtureFileEntry[] = typeof input === "string" ? JSON.parse(input) : input; - const converted = entries.map(entryToFixture); + const converted = entries.map((e) => entryToFixture(e)); const issues = validateFixtures(converted); const errors = issues.filter((i) => i.severity === "error"); if (errors.length > 0) { diff --git a/src/messages.ts b/src/messages.ts index b3bcec6..96e70c4 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -12,6 +12,7 @@ import type { ChatMessage, Fixture, HandlerDefaults, + RecordedTimings, ResponseOverrides, StreamingProfile, ToolCall, @@ -663,6 +664,8 @@ function buildClaudeContentWithToolCallsResponse( interface ClaudeStreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; } @@ -676,6 +679,7 @@ async function writeClaudeSSEStream( typeof optionsOrLatency === "number" ? { latency: optionsOrLatency } : (optionsOrLatency ?? {}); const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -686,7 +690,7 @@ async function writeClaudeSSEStream( let chunkIndex = 0; for (const event of events) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) await delay(chunkDelay, signal); if (signal?.aborted) return false; if (res.writableEnded) return true; @@ -930,6 +934,8 @@ export async function handleMessages( const completed = await writeClaudeSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -979,6 +985,8 @@ export async function handleMessages( const completed = await writeClaudeSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1028,6 +1036,8 @@ export async function handleMessages( const completed = await writeClaudeSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/ndjson-writer.ts b/src/ndjson-writer.ts index 1e2ab7d..ac6103a 100644 --- a/src/ndjson-writer.ts +++ b/src/ndjson-writer.ts @@ -6,12 +6,14 @@ */ import type * as http from "node:http"; -import type { StreamingProfile } from "./types.js"; +import type { StreamingProfile, RecordedTimings } from "./types.js"; import { delay, calculateDelay } from "./sse-writer.js"; export interface NDJSONStreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; } @@ -24,6 +26,7 @@ export async function writeNDJSONStream( const opts = options ?? {}; const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -34,7 +37,7 @@ export async function writeNDJSONStream( let chunkIndex = 0; for (const chunk of chunks) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) { await delay(chunkDelay, signal); } diff --git a/src/ollama.ts b/src/ollama.ts index 584355d..0491572 100644 --- a/src/ollama.ts +++ b/src/ollama.ts @@ -707,6 +707,8 @@ export async function handleOllama( const completed = await writeNDJSONStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -751,6 +753,8 @@ export async function handleOllama( const completed = await writeNDJSONStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -786,6 +790,8 @@ export async function handleOllama( const completed = await writeNDJSONStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1046,6 +1052,8 @@ export async function handleOllamaGenerate( const completed = await writeNDJSONStream(res, chunks, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/responses.ts b/src/responses.ts index 4938c6a..6560252 100644 --- a/src/responses.ts +++ b/src/responses.ts @@ -35,6 +35,7 @@ import { import { matchFixture } from "./router.js"; import { writeErrorResponse, delay, calculateDelay } from "./sse-writer.js"; import { createInterruptionSignal } from "./interruption.js"; +import type { RecordedTimings } from "./types.js"; import type { Journal } from "./journal.js"; import { applyChaos } from "./chaos.js"; import { proxyAndRecord } from "./recorder.js"; @@ -861,6 +862,8 @@ function buildContentWithToolCallsResponse( interface ResponsesStreamOptions { latency?: number; streamingProfile?: StreamingProfile; + recordedTimings?: RecordedTimings; + replaySpeed?: number; signal?: AbortSignal; onChunkSent?: () => void; } @@ -874,6 +877,7 @@ async function writeResponsesSSEStream( typeof optionsOrLatency === "number" ? { latency: optionsOrLatency } : (optionsOrLatency ?? {}); const latency = opts.latency ?? 0; const profile = opts.streamingProfile; + const { recordedTimings, replaySpeed } = opts; const signal = opts.signal; const onChunkSent = opts.onChunkSent; @@ -884,7 +888,7 @@ async function writeResponsesSSEStream( let chunkIndex = 0; for (const event of events) { - const chunkDelay = calculateDelay(chunkIndex, profile, latency); + const chunkDelay = calculateDelay(chunkIndex, profile, latency, recordedTimings, replaySpeed); if (chunkDelay > 0) await delay(chunkDelay, signal); if (signal?.aborted) return false; if (res.writableEnded) return true; @@ -1065,6 +1069,8 @@ export async function handleResponses( const response = await resolveResponse(fixture, completionReq); const latency = fixture.latency ?? defaults.latency; const chunkSize = Math.max(1, fixture.chunkSize ?? defaults.chunkSize); + const fixtureTimings = fixture.recordedTimings; + const effectiveReplaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; // Error response if (isErrorResponse(response)) { @@ -1117,6 +1123,8 @@ export async function handleResponses( const completed = await writeResponsesSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixtureTimings, + replaySpeed: effectiveReplaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1163,6 +1171,8 @@ export async function handleResponses( const completed = await writeResponsesSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixtureTimings, + replaySpeed: effectiveReplaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); @@ -1207,6 +1217,8 @@ export async function handleResponses( const completed = await writeResponsesSSEStream(res, events, { latency, streamingProfile: fixture.streamingProfile, + recordedTimings: fixtureTimings, + replaySpeed: effectiveReplaySpeed, signal: interruption?.signal, onChunkSent: interruption?.tick, }); diff --git a/src/server.ts b/src/server.ts index db28801..c6e8dda 100644 --- a/src/server.ts +++ b/src/server.ts @@ -273,7 +273,7 @@ async function handleControlAPI( return true; } - const converted = parsed.fixtures.map(entryToFixture); + const converted = parsed.fixtures.map((e) => entryToFixture(e)); const issues = validateFixtures(converted); const errors = issues.filter((i) => i.severity === "error"); if (errors.length > 0) { @@ -785,6 +785,8 @@ async function handleCompletions( signal: interruption?.signal, onChunkSent: interruption?.tick, usageChunk, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, }); if (!completed) { if (!res.writableEnded) res.destroy(); @@ -858,6 +860,8 @@ async function handleCompletions( signal: interruption?.signal, onChunkSent: interruption?.tick, usageChunk, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, }); if (!completed) { if (!res.writableEnded) res.destroy(); @@ -925,6 +929,8 @@ async function handleCompletions( signal: interruption?.signal, onChunkSent: interruption?.tick, usageChunk, + recordedTimings: fixture.recordedTimings, + replaySpeed: fixture.replaySpeed ?? defaults.replaySpeed, }); if (!completed) { if (!res.writableEnded) res.destroy(); @@ -979,6 +985,7 @@ export async function createServer( const defaults = { latency: serverOptions.latency ?? 0, chunkSize: Math.max(1, serverOptions.chunkSize ?? DEFAULT_CHUNK_SIZE), + replaySpeed: serverOptions.replaySpeed ?? 1.0, logger, get chaos() { return serverOptions.chaos; diff --git a/src/ws-gemini-live.ts b/src/ws-gemini-live.ts index b2c089c..94ec26f 100644 --- a/src/ws-gemini-live.ts +++ b/src/ws-gemini-live.ts @@ -28,7 +28,7 @@ import { strictOverrideField, } from "./helpers.js"; import { createInterruptionSignal } from "./interruption.js"; -import { delay } from "./sse-writer.js"; +import { delay, calculateDelay } from "./sse-writer.js"; import { DEFAULT_TEST_ID, type Journal } from "./journal.js"; import type { Logger } from "./logger.js"; import type { WebSocketConnection } from "./ws-framing.js"; @@ -225,6 +225,7 @@ export function handleWebSocketGeminiLive( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -271,6 +272,7 @@ async function processMessage( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -504,6 +506,8 @@ async function processMessage( } const interruption = createInterruptionSignal(fixture); + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; + const { recordedTimings } = fixture; let interrupted = false; // Stream text content chunks (turnComplete omitted — sent as a separate message later) @@ -520,7 +524,8 @@ async function processMessage( } else { for (let i = 0; i < chunkList.length; i++) { if (ws.isClosed) break; - if (latency > 0) await delay(latency, interruption?.signal); + const chunkDelay = calculateDelay(i, undefined, latency, recordedTimings, replaySpeed); + if (chunkDelay > 0) await delay(chunkDelay, interruption?.signal); if (interruption?.signal.aborted) { interrupted = true; break; @@ -558,7 +563,14 @@ async function processMessage( // Send tool calls if (!ws.isClosed) { - if (latency > 0) await delay(latency, interruption?.signal); + const tcDelay = calculateDelay( + chunkList.length, + undefined, + latency, + recordedTimings, + replaySpeed, + ); + if (tcDelay > 0) await delay(tcDelay, interruption?.signal); if (interruption?.signal.aborted) { ws.destroy(); journalEntry.response.interrupted = true; @@ -660,12 +672,15 @@ async function processMessage( } const interruption = createInterruptionSignal(fixture); + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; + const { recordedTimings } = fixture; let interrupted = false; // Stream content chunks without turnComplete (sent separately after) for (let i = 0; i < chunks.length; i++) { if (ws.isClosed) break; - if (latency > 0) await delay(latency, interruption?.signal); + const chunkDelay = calculateDelay(i, undefined, latency, recordedTimings, replaySpeed); + if (chunkDelay > 0) await delay(chunkDelay, interruption?.signal); if (interruption?.signal.aborted) { interrupted = true; break; @@ -721,12 +736,15 @@ async function processMessage( }); const interruption = createInterruptionSignal(fixture); + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; + const { recordedTimings } = fixture; if (ws.isClosed) { interruption?.cleanup(); return; } - if (latency > 0) await delay(latency, interruption?.signal); + const tcDelay = calculateDelay(0, undefined, latency, recordedTimings, replaySpeed); + if (tcDelay > 0) await delay(tcDelay, interruption?.signal); if (interruption?.signal.aborted) { ws.destroy(); journalEntry.response.interrupted = true; diff --git a/src/ws-realtime.ts b/src/ws-realtime.ts index 63f9d5b..46496c9 100644 --- a/src/ws-realtime.ts +++ b/src/ws-realtime.ts @@ -21,7 +21,7 @@ import { strictOverrideField, } from "./helpers.js"; import { createInterruptionSignal } from "./interruption.js"; -import { delay } from "./sse-writer.js"; +import { delay, calculateDelay } from "./sse-writer.js"; import { DEFAULT_TEST_ID, type Journal } from "./journal.js"; import type { Logger } from "./logger.js"; import type { WebSocketConnection } from "./ws-framing.js"; @@ -293,6 +293,7 @@ export function handleWebSocketRealtime( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -393,6 +394,7 @@ async function processMessage( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -655,6 +657,7 @@ async function handleResponseCreate( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -902,9 +905,19 @@ async function handleResponseCreate( // response.output_text.delta (chunked) — GA name const content = response.content; + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; + const { recordedTimings } = fixture; + let eventIndex = 0; for (let i = 0; i < content.length; i += chunkSize) { if (ws.isClosed) break; - if (latency > 0) await delay(latency, interruption?.signal); + const chunkDelay = calculateDelay( + eventIndex, + undefined, + latency, + recordedTimings, + replaySpeed, + ); + if (chunkDelay > 0) await delay(chunkDelay, interruption?.signal); if (interruption?.signal.aborted) { interrupted = true; break; @@ -923,6 +936,7 @@ async function handleResponseCreate( }, isBeta, ); + eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -1054,10 +1068,18 @@ async function handleResponseCreate( ); // response.function_call_arguments.delta (chunked) + // Continue eventIndex from content chunks to avoid re-triggering TTFT const args = tc.arguments; for (let i = 0; i < args.length; i += chunkSize) { if (ws.isClosed) break; - if (latency > 0) await delay(latency, interruption?.signal); + const chunkDelay = calculateDelay( + eventIndex, + undefined, + latency, + recordedTimings, + replaySpeed, + ); + if (chunkDelay > 0) await delay(chunkDelay, interruption?.signal); if (interruption?.signal.aborted) { interrupted = true; break; @@ -1076,6 +1098,7 @@ async function handleResponseCreate( }, isBeta, ); + eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -1253,12 +1276,22 @@ async function handleResponseCreate( // response.output_text.delta (chunked) — GA name const content = response.content; + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; + const { recordedTimings } = fixture; const interruption = createInterruptionSignal(fixture); let interrupted = false; + let eventIndex = 0; for (let i = 0; i < content.length; i += chunkSize) { if (ws.isClosed) break; - if (latency > 0) await delay(latency, interruption?.signal); + const chunkDelay = calculateDelay( + eventIndex, + undefined, + latency, + recordedTimings, + replaySpeed, + ); + if (chunkDelay > 0) await delay(chunkDelay, interruption?.signal); if (interruption?.signal.aborted) { interrupted = true; break; @@ -1277,6 +1310,7 @@ async function handleResponseCreate( }, isBeta, ); + eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -1408,7 +1442,10 @@ async function handleResponseCreate( const outputItems: unknown[] = []; const interruption = createInterruptionSignal(fixture); + const replaySpeed = fixture.replaySpeed ?? defaults.replaySpeed; + const { recordedTimings } = fixture; let interrupted = false; + let eventIndex = 0; for (let tcIdx = 0; tcIdx < response.toolCalls.length; tcIdx++) { const tc = response.toolCalls[tcIdx]; @@ -1445,10 +1482,18 @@ async function handleResponseCreate( ); // response.function_call_arguments.delta (chunked) + // eventIndex is continuous across all tool calls to avoid re-triggering TTFT const args = tc.arguments; for (let i = 0; i < args.length; i += chunkSize) { if (ws.isClosed) break; - if (latency > 0) await delay(latency, interruption?.signal); + const chunkDelay = calculateDelay( + eventIndex, + undefined, + latency, + recordedTimings, + replaySpeed, + ); + if (chunkDelay > 0) await delay(chunkDelay, interruption?.signal); if (interruption?.signal.aborted) { interrupted = true; break; @@ -1467,6 +1512,7 @@ async function handleResponseCreate( }, isBeta, ); + eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; diff --git a/src/ws-responses.ts b/src/ws-responses.ts index 224cc48..b8c5f06 100644 --- a/src/ws-responses.ts +++ b/src/ws-responses.ts @@ -27,7 +27,7 @@ import { flattenHeaders, } from "./helpers.js"; import { createInterruptionSignal } from "./interruption.js"; -import { delay } from "./sse-writer.js"; +import { delay, calculateDelay } from "./sse-writer.js"; import { DEFAULT_TEST_ID, type Journal } from "./journal.js"; import type { Logger } from "./logger.js"; import type { WebSocketConnection } from "./ws-framing.js"; @@ -71,6 +71,7 @@ export function handleWebSocketResponses( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -107,6 +108,7 @@ async function processMessage( defaults: { latency: number; chunkSize: number; + replaySpeed?: number; model: string; logger: Logger; strict?: boolean; @@ -268,6 +270,8 @@ async function processMessage( latency, interruption?.signal, interruption?.tick, + fixture.recordedTimings, + fixture.replaySpeed ?? defaults.replaySpeed, ); if (!completed) { ws.destroy(); @@ -303,6 +307,8 @@ async function processMessage( latency, interruption?.signal, interruption?.tick, + fixture.recordedTimings, + fixture.replaySpeed ?? defaults.replaySpeed, ); if (!completed) { ws.destroy(); @@ -336,6 +342,8 @@ async function processMessage( latency, interruption?.signal, interruption?.tick, + fixture.recordedTimings, + fixture.replaySpeed ?? defaults.replaySpeed, ); if (!completed) { ws.destroy(); @@ -367,13 +375,18 @@ async function sendEvents( latency: number, signal?: AbortSignal, onChunkSent?: () => void, + recordedTimings?: import("./types.js").RecordedTimings, + replaySpeed?: number, ): Promise { + let eventIndex = 0; for (const event of events) { if (ws.isClosed) return false; - if (latency > 0) await delay(latency, signal); + const chunkDelay = calculateDelay(eventIndex, undefined, latency, recordedTimings, replaySpeed); + if (chunkDelay > 0) await delay(chunkDelay, signal); if (signal?.aborted) return false; if (ws.isClosed) return false; ws.send(JSON.stringify(event)); + eventIndex++; onChunkSent?.(); if (signal?.aborted) return false; } From 3953eb995e6043302a546f20416829bd92fcccfb Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 18 May 2026 14:47:34 -0700 Subject: [PATCH 3/5] test: timing recording, replay, and Bedrock EventStream coverage --- src/__tests__/timing-recording.test.ts | 514 +++++++++++++++++++++++++ src/__tests__/timing-replay.test.ts | 249 ++++++++++++ 2 files changed, 763 insertions(+) create mode 100644 src/__tests__/timing-recording.test.ts create mode 100644 src/__tests__/timing-replay.test.ts diff --git a/src/__tests__/timing-recording.test.ts b/src/__tests__/timing-recording.test.ts new file mode 100644 index 0000000..d39b280 --- /dev/null +++ b/src/__tests__/timing-recording.test.ts @@ -0,0 +1,514 @@ +import { describe, it, expect, afterEach } from "vitest"; +import * as http from "node:http"; +import * as fs from "node:fs"; +import * as os from "node:os"; +import * as path from "node:path"; +import type { FixtureFile } from "../types.js"; +import { createServer, type ServerInstance } from "../server.js"; +import { encodeEventStreamMessage } from "../aws-event-stream.js"; + +// --------------------------------------------------------------------------- +// Test state +// --------------------------------------------------------------------------- + +let upstream: http.Server | undefined; +let recorder: ServerInstance | undefined; +let tmpDir: string | undefined; + +afterEach(async () => { + if (recorder) { + await new Promise((resolve) => recorder!.server.close(() => resolve())); + recorder = undefined; + } + if (upstream) { + await new Promise((resolve) => upstream!.close(() => resolve())); + upstream = undefined; + } + if (tmpDir) { + fs.rmSync(tmpDir, { recursive: true, force: true }); + tmpDir = undefined; + } +}); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function post( + url: string, + body: unknown, +): Promise<{ status: number; headers: http.IncomingHttpHeaders; body: string }> { + return new Promise((resolve, reject) => { + const data = JSON.stringify(body); + const parsed = new URL(url); + const req = http.request( + { + hostname: parsed.hostname, + port: parsed.port, + path: parsed.pathname, + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(data), + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (c: Buffer) => chunks.push(c)); + res.on("end", () => { + resolve({ + status: res.statusCode ?? 0, + headers: res.headers, + body: Buffer.concat(chunks).toString(), + }); + }); + }, + ); + req.on("error", reject); + req.write(data); + req.end(); + }); +} + +/** Consume a streaming response fully, returning the concatenated body. */ +function consumeStream(url: string, body: unknown): Promise<{ status: number; body: string }> { + return new Promise((resolve, reject) => { + const data = JSON.stringify(body); + const parsed = new URL(url); + const req = http.request( + { + hostname: parsed.hostname, + port: parsed.port, + path: parsed.pathname, + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(data), + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (c: Buffer) => chunks.push(c)); + res.on("end", () => { + resolve({ + status: res.statusCode ?? 0, + body: Buffer.concat(chunks).toString(), + }); + }); + }, + ); + req.on("error", reject); + req.write(data); + req.end(); + }); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("timing-aware recording", () => { + it("recorded fixture includes recordedTimings for SSE streaming response", async () => { + // Create a fake upstream that streams SSE with known delays + const TTFT_MS = 80; + const INTER_CHUNK_MS = 40; + upstream = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "text/event-stream" }); + const chunks = [ + `data: ${JSON.stringify({ id: "c1", choices: [{ delta: { content: "Hello" } }] })}\n\n`, + `data: ${JSON.stringify({ id: "c1", choices: [{ delta: { content: " world" } }] })}\n\n`, + "data: [DONE]\n\n", + ]; + let i = 0; + const send = () => { + if (i < chunks.length) { + res.write(chunks[i]); + i++; + setTimeout(send, INTER_CHUNK_MS); + } else { + res.end(); + } + }; + setTimeout(send, TTFT_MS); + }); + await new Promise((resolve) => upstream!.listen(0, "127.0.0.1", resolve)); + const upstreamPort = (upstream!.address() as { port: number }).port; + + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-timing-rec-")); + recorder = await createServer([], { + port: 0, + record: { + providers: { openai: `http://127.0.0.1:${upstreamPort}` }, + fixturePath: tmpDir, + }, + }); + + // Make a streaming request through the recorder + await consumeStream(`${recorder.url}/v1/chat/completions`, { + model: "gpt-4", + messages: [{ role: "user", content: "hi" }], + stream: true, + }); + + // Check the fixture file on disk for recordedTimings + const files = fs.readdirSync(tmpDir); + const fixtureFiles = files.filter((f) => f.endsWith(".json")); + expect(fixtureFiles).toHaveLength(1); + + const fixtureContent = JSON.parse( + fs.readFileSync(path.join(tmpDir, fixtureFiles[0]), "utf-8"), + ) as FixtureFile; + expect(fixtureContent.fixtures).toHaveLength(1); + + const fixture = fixtureContent.fixtures[0]; + expect(fixture.recordedTimings).toBeDefined(); + // Upstream sends first chunk after 80ms, but the recorder measures TTFT + // from response-headers-received to first-frame-boundary-detected, so on + // localhost the first data event often contains the first frame, yielding + // TTFT near 0. Assert it's a valid non-negative integer (not undefined/NaN). + expect(Number.isFinite(fixture.recordedTimings!.ttftMs)).toBe(true); + expect(fixture.recordedTimings!.ttftMs).toBeGreaterThanOrEqual(0); + expect(fixture.recordedTimings!.ttftMs).toBeLessThan(TTFT_MS * 3); + expect(fixture.recordedTimings!.interChunkDelaysMs.length).toBeGreaterThan(0); + expect(fixture.recordedTimings!.totalDurationMs).toBeGreaterThan(0); + // Total duration should reflect the upstream frame spacing + // (TTFT + 3 chunks * INTER_CHUNK_MS ~ 200ms; allow slack for CI) + expect(fixture.recordedTimings!.totalDurationMs).toBeGreaterThanOrEqual(INTER_CHUNK_MS * 1.5); + }); + + it("recordedTimings inter-chunk delays roughly match upstream spacing", async () => { + const INTER_CHUNK_MS = 60; + const NUM_DATA_CHUNKS = 4; + upstream = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "text/event-stream" }); + let i = 0; + const send = () => { + if (i < NUM_DATA_CHUNKS) { + res.write( + `data: ${JSON.stringify({ id: "c1", choices: [{ delta: { content: `chunk${i}` } }] })}\n\n`, + ); + i++; + setTimeout(send, INTER_CHUNK_MS); + } else { + res.write("data: [DONE]\n\n"); + res.end(); + } + }; + // First chunk after a short TTFT + setTimeout(send, 30); + }); + await new Promise((resolve) => upstream!.listen(0, "127.0.0.1", resolve)); + const upstreamPort = (upstream!.address() as { port: number }).port; + + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-timing-rec-delays-")); + recorder = await createServer([], { + port: 0, + record: { + providers: { openai: `http://127.0.0.1:${upstreamPort}` }, + fixturePath: tmpDir, + }, + }); + + await consumeStream(`${recorder.url}/v1/chat/completions`, { + model: "gpt-4", + messages: [{ role: "user", content: "spacing test" }], + stream: true, + }); + + const files = fs.readdirSync(tmpDir); + const fixtureFiles = files.filter((f) => f.endsWith(".json")); + const fixtureContent = JSON.parse( + fs.readFileSync(path.join(tmpDir, fixtureFiles[0]), "utf-8"), + ) as FixtureFile; + const timings = fixtureContent.fixtures[0].recordedTimings; + expect(timings).toBeDefined(); + + // We sent NUM_DATA_CHUNKS data frames + 1 DONE frame. + // interChunkDelaysMs should have length >= NUM_DATA_CHUNKS (one per + // gap between consecutive frames). Allow variance but check that + // each inter-chunk delay is within a plausible range of the target. + expect(timings!.interChunkDelaysMs.length).toBeGreaterThanOrEqual(NUM_DATA_CHUNKS - 1); + for (const delay of timings!.interChunkDelaysMs) { + // Each delay should be roughly INTER_CHUNK_MS, allow 0..3x range + // for scheduler jitter on CI + expect(delay).toBeGreaterThanOrEqual(0); + expect(delay).toBeLessThan(INTER_CHUNK_MS * 4); + } + }); + + it("non-streaming response does NOT get recordedTimings", async () => { + // Upstream returns a plain JSON (non-streaming) response + upstream = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + choices: [{ message: { content: "Paris", role: "assistant" }, finish_reason: "stop" }], + }), + ); + }); + await new Promise((resolve) => upstream!.listen(0, "127.0.0.1", resolve)); + const upstreamPort = (upstream!.address() as { port: number }).port; + + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-timing-rec-nons-")); + recorder = await createServer([], { + port: 0, + record: { + providers: { openai: `http://127.0.0.1:${upstreamPort}` }, + fixturePath: tmpDir, + }, + }); + + await post(`${recorder.url}/v1/chat/completions`, { + model: "gpt-4", + messages: [{ role: "user", content: "capital of France" }], + }); + + const files = fs.readdirSync(tmpDir); + const fixtureFiles = files.filter((f) => f.endsWith(".json")); + const fixtureContent = JSON.parse( + fs.readFileSync(path.join(tmpDir, fixtureFiles[0]), "utf-8"), + ) as FixtureFile; + // Non-streaming should NOT have recordedTimings + expect(fixtureContent.fixtures[0].recordedTimings).toBeUndefined(); + }); + + it("NDJSON streaming response gets recordedTimings", async () => { + // Simulate an Ollama-style NDJSON upstream + upstream = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/x-ndjson" }); + const chunks = [ + JSON.stringify({ message: { content: "Hello" }, done: false }) + "\n", + JSON.stringify({ message: { content: " world" }, done: false }) + "\n", + JSON.stringify({ message: { content: "" }, done: true }) + "\n", + ]; + let i = 0; + const send = () => { + if (i < chunks.length) { + res.write(chunks[i]); + i++; + setTimeout(send, 40); + } else { + res.end(); + } + }; + setTimeout(send, 50); + }); + await new Promise((resolve) => upstream!.listen(0, "127.0.0.1", resolve)); + const upstreamPort = (upstream!.address() as { port: number }).port; + + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-timing-rec-ndjson-")); + recorder = await createServer([], { + port: 0, + record: { + providers: { ollama: `http://127.0.0.1:${upstreamPort}` }, + fixturePath: tmpDir, + }, + }); + + await consumeStream(`${recorder.url}/api/chat`, { + model: "llama3", + messages: [{ role: "user", content: "hi" }], + stream: true, + }); + + const files = fs.readdirSync(tmpDir); + const fixtureFiles = files.filter((f) => f.endsWith(".json")); + expect(fixtureFiles.length).toBeGreaterThan(0); + + const fixtureContent = JSON.parse( + fs.readFileSync(path.join(tmpDir, fixtureFiles[0]), "utf-8"), + ) as FixtureFile; + const timings = fixtureContent.fixtures[0].recordedTimings; + expect(timings).toBeDefined(); + // TTFT can be 0ms on localhost (same event-loop tick as stream start) + expect(timings!.ttftMs).toBeGreaterThanOrEqual(0); + expect(timings!.interChunkDelaysMs.length).toBeGreaterThan(0); + expect(timings!.totalDurationMs).toBeGreaterThan(0); + }); +}); + +describe("Bedrock binary EventStream timing recording", () => { + it("captures frame timings for binary EventStream response", async () => { + const INTER_FRAME_MS = 50; + const NUM_FRAMES = 4; + + // Create a fake upstream that streams Bedrock binary EventStream frames + upstream = http.createServer((_req, res) => { + res.writeHead(200, { + "Content-Type": "application/vnd.amazon.eventstream", + "Transfer-Encoding": "chunked", + }); + + const frames = [ + encodeEventStreamMessage("message_start", { + type: "message_start", + message: { + id: "msg_test", + type: "message", + role: "assistant", + content: [], + model: "claude-v3", + stop_reason: null, + usage: { input_tokens: 10, output_tokens: 0 }, + }, + }), + encodeEventStreamMessage("content_block_start", { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + }), + encodeEventStreamMessage("content_block_delta", { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: "Hello" }, + }), + encodeEventStreamMessage("content_block_delta", { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: " world" }, + }), + ]; + + let i = 0; + const send = () => { + if (i < frames.length) { + res.write(frames[i]); + i++; + setTimeout(send, INTER_FRAME_MS); + } else { + res.end(); + } + }; + // Start sending immediately (no initial TTFT delay) + send(); + }); + await new Promise((resolve) => upstream!.listen(0, "127.0.0.1", resolve)); + const upstreamPort = (upstream!.address() as { port: number }).port; + + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-timing-bedrock-")); + recorder = await createServer([], { + port: 0, + record: { + providers: { bedrock: `http://127.0.0.1:${upstreamPort}` }, + fixturePath: tmpDir, + }, + }); + + // Make a request through the recorder proxy to the Bedrock streaming endpoint + await consumeStream(`${recorder.url}/model/anthropic.claude-v3/invoke-with-response-stream`, { + anthropic_version: "bedrock-2023-05-31", + max_tokens: 512, + messages: [{ role: "user", content: "hello" }], + }); + + // Read the recorded fixture and verify timings + const files = fs.readdirSync(tmpDir); + const fixtureFiles = files.filter((f) => f.endsWith(".json")); + expect(fixtureFiles).toHaveLength(1); + + const fixtureContent = JSON.parse( + fs.readFileSync(path.join(tmpDir, fixtureFiles[0]), "utf-8"), + ) as FixtureFile; + expect(fixtureContent.fixtures).toHaveLength(1); + + const fixture = fixtureContent.fixtures[0]; + expect(fixture.recordedTimings).toBeDefined(); + // We sent NUM_FRAMES frames with INTER_FRAME_MS between them. + // interChunkDelaysMs should have NUM_FRAMES - 1 entries (gaps between frames). + expect(fixture.recordedTimings!.interChunkDelaysMs.length).toBe(NUM_FRAMES - 1); + for (const d of fixture.recordedTimings!.interChunkDelaysMs) { + expect(d).toBeGreaterThanOrEqual(0); + expect(d).toBeLessThan(INTER_FRAME_MS * 4); + } + expect(fixture.recordedTimings!.totalDurationMs).toBeGreaterThanOrEqual( + INTER_FRAME_MS * (NUM_FRAMES - 1) * 0.5, + ); + }); + + it("captures TTFT for Bedrock binary EventStream with initial delay", async () => { + const TTFT_MS = 80; + const INTER_FRAME_MS = 30; + + upstream = http.createServer((_req, res) => { + res.writeHead(200, { + "Content-Type": "application/vnd.amazon.eventstream", + "Transfer-Encoding": "chunked", + }); + + const frames = [ + encodeEventStreamMessage("message_start", { + type: "message_start", + message: { + id: "msg_ttft", + type: "message", + role: "assistant", + content: [], + model: "claude-v3", + stop_reason: null, + usage: { input_tokens: 5, output_tokens: 0 }, + }, + }), + encodeEventStreamMessage("content_block_start", { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + }), + encodeEventStreamMessage("content_block_delta", { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: "Hi" }, + }), + ]; + + let i = 0; + const send = () => { + if (i < frames.length) { + res.write(frames[i]); + i++; + setTimeout(send, INTER_FRAME_MS); + } else { + res.end(); + } + }; + // Delay before the first frame to simulate TTFT + setTimeout(send, TTFT_MS); + }); + await new Promise((resolve) => upstream!.listen(0, "127.0.0.1", resolve)); + const upstreamPort = (upstream!.address() as { port: number }).port; + + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-timing-bedrock-ttft-")); + recorder = await createServer([], { + port: 0, + record: { + providers: { bedrock: `http://127.0.0.1:${upstreamPort}` }, + fixturePath: tmpDir, + }, + }); + + await consumeStream(`${recorder.url}/model/anthropic.claude-v3/invoke-with-response-stream`, { + anthropic_version: "bedrock-2023-05-31", + max_tokens: 512, + messages: [{ role: "user", content: "ttft test" }], + }); + + const files = fs.readdirSync(tmpDir); + const fixtureFiles = files.filter((f) => f.endsWith(".json")); + expect(fixtureFiles).toHaveLength(1); + + const fixtureContent = JSON.parse( + fs.readFileSync(path.join(tmpDir, fixtureFiles[0]), "utf-8"), + ) as FixtureFile; + const timings = fixtureContent.fixtures[0].recordedTimings; + expect(timings).toBeDefined(); + + // TTFT measures time from stream start (response-headers-received) to + // first binary frame arrival. On localhost the first data event can + // occasionally arrive in the same millisecond as the response callback, + // yielding ttftMs = 0. Assert non-negative and bounded. + expect(Number.isFinite(timings!.ttftMs)).toBe(true); + expect(timings!.ttftMs).toBeGreaterThanOrEqual(0); + expect(timings!.ttftMs).toBeLessThan(TTFT_MS * 4); + expect(timings!.totalDurationMs).toBeGreaterThan(0); + expect(timings!.interChunkDelaysMs.length).toBeGreaterThan(0); + }); +}); diff --git a/src/__tests__/timing-replay.test.ts b/src/__tests__/timing-replay.test.ts new file mode 100644 index 0000000..ca034dd --- /dev/null +++ b/src/__tests__/timing-replay.test.ts @@ -0,0 +1,249 @@ +import { describe, it, expect, afterEach } from "vitest"; +import http from "node:http"; +import { createServer, type ServerInstance } from "../server.js"; +import type { Fixture, SSEChunk, ChatCompletionRequest, RecordedTimings } from "../types.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function parseSSEResponse(body: string): SSEChunk[] { + return body + .split("\n\n") + .filter((line) => line.startsWith("data: ") && !line.includes("[DONE]")) + .map((line) => JSON.parse(line.slice(6))); +} + +async function httpPost(url: string, body: object): Promise<{ status: number; body: string }> { + return new Promise((resolve, reject) => { + const req = http.request( + url, + { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (c) => chunks.push(c)); + res.on("end", () => + resolve({ + status: res.statusCode!, + body: Buffer.concat(chunks).toString(), + }), + ); + }, + ); + req.on("error", reject); + req.write(JSON.stringify(body)); + req.end(); + }); +} + +function chatRequest( + userContent: string, + extra: Partial = {}, +): ChatCompletionRequest { + return { + model: "gpt-4", + stream: true, + messages: [{ role: "user", content: userContent }], + ...extra, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +let instance: ServerInstance | null = null; + +afterEach(async () => { + if (instance) { + await new Promise((resolve) => instance!.server.close(() => resolve())); + instance = null; + } +}); + +describe("timing-aware replay through handleCompletions", () => { + it("fixture with recordedTimings replays with delays (total duration is roughly proportional)", async () => { + // 50ms TTFT + 4 inter-chunk delays of 30ms each = ~170ms total + const timings: RecordedTimings = { + ttftMs: 50, + interChunkDelaysMs: [30, 30, 30, 30], + totalDurationMs: 170, + }; + + const fixtures: Fixture[] = [ + { + match: { userMessage: "timing-test" }, + response: { content: "Hello world from timing test" }, + recordedTimings: timings, + }, + ]; + + instance = await createServer(fixtures, { + port: 0, + chunkSize: 5, // small chunks to generate multiple SSE frames + }); + + const start = Date.now(); + const res = await httpPost(`${instance.url}/v1/chat/completions`, chatRequest("timing-test")); + const elapsed = Date.now() - start; + + expect(res.status).toBe(200); + const chunks = parseSSEResponse(res.body); + expect(chunks.length).toBeGreaterThan(1); + + // With recordedTimings, the total elapsed time should be at least the TTFT + // plus some inter-chunk delays. Allow generous tolerance for CI jitter, + // but it should be meaningfully > 0 (i.e. delays are being applied). + expect(elapsed).toBeGreaterThanOrEqual(40); // at least ~TTFT minus jitter + }); + + it("replaySpeed 2.0 halves the replay duration", async () => { + // 80ms TTFT + 4 x 40ms inter-chunk = ~240ms at 1x speed + const timings: RecordedTimings = { + ttftMs: 80, + interChunkDelaysMs: [40, 40, 40, 40], + totalDurationMs: 240, + }; + + const fixtures: Fixture[] = [ + { + match: { userMessage: "speed-test" }, + response: { content: "Hello world from speed test" }, + recordedTimings: timings, + replaySpeed: 2.0, + }, + ]; + + instance = await createServer(fixtures, { + port: 0, + chunkSize: 5, + }); + + const start = Date.now(); + const res = await httpPost(`${instance.url}/v1/chat/completions`, chatRequest("speed-test")); + const elapsed = Date.now() - start; + + expect(res.status).toBe(200); + const chunks = parseSSEResponse(res.body); + expect(chunks.length).toBeGreaterThan(1); + + // At 2x speed, effective delays are halved. The full 1x duration would be + // ~240ms, so at 2x it should be ~120ms. We verify it's well below 1x + // but still non-trivial (delays are being applied, just faster). + expect(elapsed).toBeGreaterThanOrEqual(50); // still has meaningful delay + expect(elapsed).toBeLessThan(200); // well below 1x baseline of ~240ms + }); + + it("recordedTimings alone impose real delays (positive control)", async () => { + // 200ms TTFT + 3 inter-chunk delays of 100ms = ~500ms total. + // Without any streamingProfile override, elapsed should be >= 300ms. + const timings: RecordedTimings = { + ttftMs: 200, + interChunkDelaysMs: [100, 100, 100], + totalDurationMs: 500, + }; + + const fixtures: Fixture[] = [ + { + match: { userMessage: "positive-control" }, + response: { content: "Hello world" }, + recordedTimings: timings, + }, + ]; + + instance = await createServer(fixtures, { + port: 0, + chunkSize: 5, + }); + + const start = Date.now(); + const res = await httpPost( + `${instance.url}/v1/chat/completions`, + chatRequest("positive-control"), + ); + const elapsed = Date.now() - start; + + expect(res.status).toBe(200); + const chunks = parseSSEResponse(res.body); + expect(chunks.length).toBeGreaterThan(1); + + // recordedTimings should impose real delays: TTFT(200) + delays(3*100) = ~500ms. + // Even with CI jitter, elapsed must be >= 300ms to prove timing is applied. + expect(elapsed).toBeGreaterThanOrEqual(300); + }); + + it("streamingProfile overrides recordedTimings (precedence)", async () => { + // Same recordedTimings as positive control above (~500ms total), + // but streamingProfile at very high TPS should override and be near-instant. + const timings: RecordedTimings = { + ttftMs: 200, + interChunkDelaysMs: [100, 100, 100], + totalDurationMs: 500, + }; + + const fixtures: Fixture[] = [ + { + match: { userMessage: "precedence-test" }, + response: { content: "Hello world" }, + recordedTimings: timings, + streamingProfile: { ttft: 0, tps: 10000 }, // near-instant + }, + ]; + + instance = await createServer(fixtures, { + port: 0, + chunkSize: 5, + }); + + const start = Date.now(); + const res = await httpPost( + `${instance.url}/v1/chat/completions`, + chatRequest("precedence-test"), + ); + const elapsed = Date.now() - start; + + expect(res.status).toBe(200); + const chunks = parseSSEResponse(res.body); + expect(chunks.length).toBeGreaterThan(0); + + // With streamingProfile overriding, total time should be well under + // what recordedTimings would have imposed (~500ms). Near-instant. + expect(elapsed).toBeLessThan(200); + }); + + it("fixture without recordedTimings uses existing latency model (backward compat)", async () => { + const fixtures: Fixture[] = [ + { + match: { userMessage: "compat-test" }, + response: { content: "Hello backward compat" }, + // No recordedTimings, no streamingProfile — uses global latency + }, + ]; + + instance = await createServer(fixtures, { + port: 0, + chunkSize: 10, + latency: 0, // zero latency for fast test + }); + + const start = Date.now(); + const res = await httpPost(`${instance.url}/v1/chat/completions`, chatRequest("compat-test")); + const elapsed = Date.now() - start; + + expect(res.status).toBe(200); + const chunks = parseSSEResponse(res.body); + expect(chunks.length).toBeGreaterThan(0); + + // With zero latency and no timing features, should complete near-instantly + expect(elapsed).toBeLessThan(100); + + // Verify content is correct + const content = chunks.map((c) => c.choices?.[0]?.delta?.content ?? "").join(""); + expect(content).toContain("Hello backward compat"); + }); +}); From f2c5df8118cc0c2bea33380a950b6b80d071ebee Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 18 May 2026 14:47:35 -0700 Subject: [PATCH 4/5] docs: timing-aware recording, replay, and --replay-speed flag --- CHANGELOG.md | 9 +++++++++ README.md | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6563dc4..ceaad09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ ### Added +- **Timing-aware recording and replay** — proxy recording captures per-frame + arrival timestamps as `recordedTimings` on fixtures. Replay uses recorded + timings for approximate timing reproduction based on recorded TTFT and + inter-frame cadence instead of the synthetic model. Replay chunk count may + differ from recording chunk count — TTFT and average pace are preserved, + not per-token fidelity. `--replay-speed N` multiplier applies to all delay + sources (recorded timings, streaming profiles, global latency). Per-fixture + `replaySpeed` override. Covers SSE, NDJSON, Bedrock EventStream, and + WebSocket protocols. - **Gemini `embedContent` endpoint** — `POST /v1beta/models/{model}:embedContent` with deterministic fallback embeddings and fixture matching - **`/v1/images/edit` and `/v1/images/variations` endpoints** — multipart diff --git a/README.md b/README.md index 2e88bc7..4850bb5 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ Run them all on one port with `npx @copilotkit/aimock --config aimock.json`, or ## Features - **[Record & Replay](https://aimock.copilotkit.dev/record-replay)** — Proxy real APIs, save as fixtures, replay deterministically forever +- **Timing-aware recording and replay** — Recorded fixtures capture per-frame arrival timestamps; replay uses recorded timings for approximate timing reproduction based on recorded TTFT and inter-frame cadence (replay chunk count may differ from recording — TTFT and average pace are preserved, not per-token fidelity) with configurable `--replay-speed` multiplier - **[Multi-turn Conversations](https://aimock.copilotkit.dev/multi-turn)** — Record and replay multi-turn traces with tool rounds; match distinct turns via `turnIndex`, `hasToolResult`, `toolCallId`, `sequenceIndex`, `systemMessage` (gate on host-supplied agent context), or custom predicates - **[14 LLM Providers](https://aimock.copilotkit.dev/docs)** — OpenAI Chat, OpenAI Responses, OpenAI Realtime (GA + Beta shim), Claude, Gemini (REST + embedContent), Gemini Live, Gemini Interactions, Azure, Bedrock, Vertex AI, Ollama (chat + embeddings), Cohere (chat + embed), ElevenLabs TTS — full streaming support - **Multimedia APIs** — [image generation](https://aimock.copilotkit.dev/images) (DALL-E, Imagen), [image editing](https://aimock.copilotkit.dev/images) (/v1/images/edit), [text-to-speech](https://aimock.copilotkit.dev/speech) (OpenAI + ElevenLabs), [audio transcription](https://aimock.copilotkit.dev/transcription), [audio translation](https://aimock.copilotkit.dev/transcription) (/v1/audio/translations), [video generation](https://aimock.copilotkit.dev/video), [fal.ai](https://aimock.copilotkit.dev/fal-ai) (image / video / audio with queue lifecycle) @@ -101,6 +102,9 @@ npx -p @copilotkit/aimock llmock --record --provider-openai https://api.openai.c npx -p @copilotkit/aimock llmock --record --provider-openai https://api.openai.com \ --body-timeout-ms 180000 +# Replay recorded fixtures at 2× speed +npx -p @copilotkit/aimock llmock -p 4010 -f ./fixtures --replay-speed 2 + # Convert fixtures from other tools npx @copilotkit/aimock convert vidaimock ./templates/ ./fixtures/ npx @copilotkit/aimock convert mockllm ./config.yaml ./fixtures/ From 3c0a6f3aae0362db59d2b3c4832428b2456d4512 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 18 May 2026 15:00:23 -0700 Subject: [PATCH 5/5] chore: bump version to 1.26.0 --- .claude-plugin/marketplace.json | 2 +- .claude-plugin/plugin.json | 2 +- charts/aimock/Chart.yaml | 2 +- package.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index 28827f4..46a5f5b 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -9,7 +9,7 @@ "source": { "source": "npm", "package": "@copilotkit/aimock", - "version": "^1.25.0" + "version": "^1.26.0" }, "description": "Fixture authoring skill for @copilotkit/aimock — LLM, multimedia (image/TTS/transcription/video), MCP, A2A, AG-UI, vector, embeddings, structured output, sequential responses, streaming physics, record/replay, agent loop patterns, and debugging" } diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json index a2c8102..5f40028 100644 --- a/.claude-plugin/plugin.json +++ b/.claude-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "aimock", - "version": "1.25.0", + "version": "1.26.0", "description": "Fixture authoring guidance for @copilotkit/aimock — LLM, multimedia, MCP, A2A, AG-UI, vector, and service mocking", "author": { "name": "CopilotKit" diff --git a/charts/aimock/Chart.yaml b/charts/aimock/Chart.yaml index b9fe27a..63fd9c4 100644 --- a/charts/aimock/Chart.yaml +++ b/charts/aimock/Chart.yaml @@ -3,4 +3,4 @@ name: aimock description: Mock infrastructure for AI application testing (OpenAI, Anthropic, Gemini, MCP, A2A, vector) type: application version: 0.1.0 -appVersion: "1.25.0" +appVersion: "1.26.0" diff --git a/package.json b/package.json index 134568d..38d7bb4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@copilotkit/aimock", - "version": "1.25.0", + "version": "1.26.0", "description": "Mock infrastructure for AI application testing — LLM APIs, image generation, text-to-speech, transcription, audio generation, video generation, MCP tools, A2A agents, AG-UI event streams, vector databases, search, rerank, and moderation. One package, one port, zero dependencies.", "license": "MIT", "keywords": [