diff --git a/.storybook/mocks/orpc.ts b/.storybook/mocks/orpc.ts index 10328b44b..7318d3d30 100644 --- a/.storybook/mocks/orpc.ts +++ b/.storybook/mocks/orpc.ts @@ -287,9 +287,22 @@ export function createMockORPCClient(options: MockORPCClientOptions = {}): APICl data: { success: true, output: "", exitCode: 0, wall_duration_ms: 0 }, }; }, - onChat: async function* (input: { workspaceId: string }) { + onChat: async function* ( + input: { workspaceId: string }, + options?: { signal?: AbortSignal } + ) { if (!onChat) { + // Default mock behavior: subscriptions should remain open. + // If this ends, WorkspaceStore will retry and reset state, which flakes stories. yield { type: "caught-up" } as WorkspaceChatMessage; + + await new Promise((resolve) => { + if (options?.signal?.aborted) { + resolve(); + return; + } + options?.signal?.addEventListener("abort", () => resolve(), { once: true }); + }); return; } diff --git a/src/browser/components/RightSidebar/StatsTab.tsx b/src/browser/components/RightSidebar/StatsTab.tsx index 30d0f9e65..fccee25aa 100644 --- a/src/browser/components/RightSidebar/StatsTab.tsx +++ b/src/browser/components/RightSidebar/StatsTab.tsx @@ -37,7 +37,7 @@ const VIEW_MODE_OPTIONS: Array> = [ interface ModelBreakdownEntry { key: string; model: string; - mode?: "plan" | "exec"; + mode?: string; totalDurationMs: number; totalToolExecutionMs: number; totalStreamingMs: number; diff --git a/src/browser/components/RightSidebar/statsTabCalculations.ts b/src/browser/components/RightSidebar/statsTabCalculations.ts index f3e31aea2..e5bab0c24 100644 --- a/src/browser/components/RightSidebar/statsTabCalculations.ts +++ b/src/browser/components/RightSidebar/statsTabCalculations.ts @@ -28,7 +28,7 @@ export interface ModelBreakdownEntry { tokensPerSec: number | null; avgTokensPerMsg: number | null; avgReasoningPerMsg: number | null; - mode?: "plan" | "exec"; + mode?: string; } export interface ModelBreakdownData { @@ -139,7 +139,7 @@ function getModelDisplayName(model: string): string { const MODE_SUFFIX_PLAN = ":plan" as const; const MODE_SUFFIX_EXEC = ":exec" as const; -function parseStatsKey(key: string): { model: string; mode?: "plan" | "exec" } { +function parseStatsKey(key: string): { model: string; mode?: string } { if (key.endsWith(MODE_SUFFIX_PLAN)) { return { model: key.slice(0, -MODE_SUFFIX_PLAN.length), mode: "plan" }; } @@ -220,7 +220,7 @@ export function computeModelBreakdownData(params: { ttftCount: number; liveTPS: number | null; liveTokenCount: number; - mode?: "plan" | "exec"; + mode?: string; } const breakdown: Record = {}; @@ -287,7 +287,7 @@ export function computeModelBreakdownData(params: { const toModelBreakdownEntry = ( model: string, stats: BreakdownEntry, - mode?: "plan" | "exec" + mode?: string ): ModelBreakdownEntry => { const modelTime = Math.max(0, stats.totalDuration - stats.toolExecutionMs); const avgTtft = stats.ttftCount > 0 ? stats.ttftSum / stats.ttftCount : null; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index b90ecbcc1..87c5f1249 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -84,7 +84,7 @@ export interface StreamTimingStats { /** Live tokens-per-second during streaming - only available for active streams */ liveTPS?: number; /** Mode (plan/exec) in which this stream occurred */ - mode?: "plan" | "exec"; + mode?: string; } /** Per-model timing statistics */ @@ -104,7 +104,7 @@ export interface ModelTimingStats { /** Total reasoning/thinking tokens generated by this model */ totalReasoningTokens: number; /** Mode extracted from composite key (undefined for old data without mode) */ - mode?: "plan" | "exec"; + mode?: string; } /** diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 3b4ea9cb7..7e4c66303 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -70,7 +70,7 @@ interface StreamingContext { pendingToolStarts: Map; /** Mode (plan/exec) */ - mode?: "plan" | "exec"; + mode?: string; } /** @@ -222,7 +222,7 @@ export class StreamingMessageAggregator { outputTokens: number; reasoningTokens: number; streamingMs: number; // Time from first token to end (for accurate tok/s) - mode?: "plan" | "exec"; // Mode in which this response occurred + mode?: string; // Mode in which this response occurred } | null = null; // Session-level timing stats: model -> stats (totals computed on-the-fly) @@ -468,7 +468,7 @@ export class StreamingMessageAggregator { // Streaming duration excludes TTFT and tool execution - used for avg tok/s const streamingMs = Math.max(0, durationMs - (ttftMs ?? 0) - totalToolExecutionMs); - const mode = (message?.metadata?.mode ?? context.mode) as "plan" | "exec" | undefined; + const mode = message?.metadata?.mode ?? context.mode; // Store last completed stream stats (include durations anchored in the renderer clock) const startTime = endTime - durationMs; @@ -648,7 +648,7 @@ export class StreamingMessageAggregator { /** Live tokens-per-second (trailing window) */ liveTPS: number; /** Mode (plan/exec) for this stream */ - mode?: "plan" | "exec"; + mode?: string; } | null { // Get the first (and typically only) active stream const entries = Array.from(this.activeStreams.entries()); @@ -695,7 +695,7 @@ export class StreamingMessageAggregator { outputTokens: number; reasoningTokens: number; streamingMs: number; - mode?: "plan" | "exec"; + mode?: string; } | null { return this.lastCompletedStreamStats; } @@ -728,7 +728,7 @@ export class StreamingMessageAggregator { totalOutputTokens: number; totalReasoningTokens: number; /** Mode extracted from composite key, undefined for old data */ - mode?: "plan" | "exec"; + mode?: string; } >; } | null { @@ -755,7 +755,7 @@ export class StreamingMessageAggregator { responseCount: number; totalOutputTokens: number; totalReasoningTokens: number; - mode?: "plan" | "exec"; + mode?: string; } > = {}; @@ -763,7 +763,7 @@ export class StreamingMessageAggregator { // Parse composite key: "model" or "model:mode" // Model names can contain colons (e.g., "mux-gateway:provider/model") // so we look for ":plan" or ":exec" suffix specifically - let mode: "plan" | "exec" | undefined; + let mode: string | undefined; if (key.endsWith(":plan")) { mode = "plan"; } else if (key.endsWith(":exec")) { diff --git a/src/browser/utils/messages/compactionOptions.test.ts b/src/browser/utils/messages/compactionOptions.test.ts index 0033373eb..a6a257ec3 100644 --- a/src/browser/utils/messages/compactionOptions.test.ts +++ b/src/browser/utils/messages/compactionOptions.test.ts @@ -62,7 +62,7 @@ describe("applyCompactionOverrides", () => { const result = applyCompactionOverrides(baseOptions, compactData); expect(result.mode).toBe("compact"); - expect(result.toolPolicy).toEqual([]); + expect(result.toolPolicy).toEqual([{ regex_match: ".*", action: "disable" }]); }); it("disables all tools even when base options has tool policy", () => { @@ -74,7 +74,7 @@ describe("applyCompactionOverrides", () => { const result = applyCompactionOverrides(baseWithTools, compactData); expect(result.mode).toBe("compact"); - expect(result.toolPolicy).toEqual([]); // Tools always disabled for compaction + expect(result.toolPolicy).toEqual([{ regex_match: ".*", action: "disable" }]); // Tools always disabled for compaction }); it("applies all overrides together", () => { diff --git a/src/browser/utils/messages/compactionOptions.ts b/src/browser/utils/messages/compactionOptions.ts index e388f91f3..b7efdbf77 100644 --- a/src/browser/utils/messages/compactionOptions.ts +++ b/src/browser/utils/messages/compactionOptions.ts @@ -35,6 +35,7 @@ export function applyCompactionOverrides( thinkingLevel: baseOptions.thinkingLevel, maxOutputTokens: compactData.maxOutputTokens, mode: "compact" as const, - toolPolicy: [], // Disable all tools during compaction + // Disable all tools during compaction - regex .* matches all tool names + toolPolicy: [{ regex_match: ".*", action: "disable" }], }; } diff --git a/src/common/orpc/schemas/stream.ts b/src/common/orpc/schemas/stream.ts index d66748f68..84e6ee852 100644 --- a/src/common/orpc/schemas/stream.ts +++ b/src/common/orpc/schemas/stream.ts @@ -43,8 +43,8 @@ export const StreamStartEventSchema = z.object({ startTime: z.number().meta({ description: "Backend timestamp when stream started (Date.now())", }), - mode: z.enum(["plan", "exec"]).optional().meta({ - description: "Agent mode (plan/exec) for this stream", + mode: z.string().optional().meta({ + description: "Agent mode for this stream", }), }); diff --git a/src/common/orpc/schemas/workspaceStats.ts b/src/common/orpc/schemas/workspaceStats.ts index 56039da63..37e7efb36 100644 --- a/src/common/orpc/schemas/workspaceStats.ts +++ b/src/common/orpc/schemas/workspaceStats.ts @@ -1,6 +1,7 @@ import { z } from "zod"; -const ModeSchema = z.enum(["plan", "exec"]); +// Mode is a string to support any mode value (plan, exec, compact, etc.) +const ModeSchema = z.string(); export const TimingAnomalySchema = z.enum([ "negative_duration", diff --git a/src/node/services/initStateManager.ts b/src/node/services/initStateManager.ts index f415ee5ab..a6b5c38ac 100644 --- a/src/node/services/initStateManager.ts +++ b/src/node/services/initStateManager.ts @@ -100,7 +100,14 @@ export class InitStateManager extends EventEmitter { }); // Emit init-output for each accumulated line with original timestamps - for (const timedLine of state.lines) { + // Defensive: state.lines could be undefined from old persisted data + const lines = state.lines ?? []; + for (const timedLine of lines) { + // Skip malformed entries (missing required fields) + if (typeof timedLine.line !== "string" || typeof timedLine.timestamp !== "number") { + log.warn(`[InitStateManager] Skipping malformed init-output:`, timedLine); + continue; + } events.push({ type: "init-output", workspaceId, diff --git a/src/node/services/sessionTimingService.ts b/src/node/services/sessionTimingService.ts index 3193c24e1..a456f2e83 100644 --- a/src/node/services/sessionTimingService.ts +++ b/src/node/services/sessionTimingService.ts @@ -48,7 +48,7 @@ interface ActiveStreamState { workspaceId: string; messageId: string; model: string; - mode?: "plan" | "exec"; + mode?: string; startTimeMs: number; firstTokenTimeMs: number | null; @@ -64,7 +64,7 @@ interface ActiveStreamState { lastEventTimestampMs: number; } -function getModelKey(model: string, mode: "plan" | "exec" | undefined): string { +function getModelKey(model: string, mode: string | undefined): string { return mode ? `${model}:${mode}` : model; } diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 550e49483..1705c8da3 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -511,6 +511,73 @@ describe("StreamManager - previousResponseId recovery", () => { }); }); +describe("StreamManager - replayStream", () => { + test("replayStream snapshots parts so reconnect doesn't block until stream ends", async () => { + const mockHistoryService = createMockHistoryService(); + const mockPartialService = createMockPartialService(); + const streamManager = new StreamManager(mockHistoryService, mockPartialService); + + // Suppress error events from bubbling up as uncaught exceptions during tests + streamManager.on("error", () => undefined); + + const workspaceId = "ws-replay-snapshot"; + + const deltas: string[] = []; + streamManager.on("stream-delta", (event: { delta: string }) => { + deltas.push(event.delta); + }); + + // Inject an active stream into the private workspaceStreams map. + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const workspaceStreamsValue = Reflect.get(streamManager, "workspaceStreams"); + if (!(workspaceStreamsValue instanceof Map)) { + throw new Error("StreamManager.workspaceStreams is not a Map"); + } + const workspaceStreams = workspaceStreamsValue as Map; + + const streamInfo = { + state: "streaming", + messageId: "msg-1", + model: "claude-sonnet-4", + historySequence: 1, + startTime: 123, + initialMetadata: {}, + parts: [{ type: "text", text: "a", timestamp: 10 }], + }; + + workspaceStreams.set(workspaceId, streamInfo); + + // Patch the private tokenTracker to (a) avoid worker setup and (b) mutate parts during replay. + const tokenTracker = Reflect.get(streamManager, "tokenTracker") as { + setModel: (model: string) => Promise; + countTokens: (text: string) => Promise; + }; + + tokenTracker.setModel = () => Promise.resolve(); + + let pushed = false; + tokenTracker.countTokens = async () => { + if (!pushed) { + pushed = true; + // While replay is mid-await, simulate the running stream appending more parts. + (streamInfo.parts as Array<{ type: string; text?: string; timestamp?: number }>).push({ + type: "text", + text: "b", + timestamp: 20, + }); + } + // Force an await boundary so the mutation happens during replay. + await new Promise((resolve) => setTimeout(resolve, 0)); + return 1; + }; + + await streamManager.replayStream(workspaceId); + + // If replayStream iterates the live array, it would also emit "b". + expect(deltas).toEqual(["a"]); + }); +}); + describe("StreamManager - ask_user_question Partial Persistence", () => { // Note: The ask_user_question tool blocks waiting for user input. // If the app restarts during that wait, the partial must be persisted. diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index e639f001f..fbe1966da 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -1787,10 +1787,19 @@ export class StreamManager extends EventEmitter { ...(replayMode && { mode: replayMode }), }); - // Replay accumulated parts as events using shared emission logic - // This guarantees replay produces identical events to the original stream - for (const part of streamInfo.parts) { - await this.emitPartAsEvent(typedWorkspaceId, streamInfo.messageId, part); + // Replay accumulated parts as events using shared emission logic. + // IMPORTANT: Snapshot the parts array up-front. + // + // streamInfo.parts is mutated while the stream is running. Because emitPartAsEvent() is async + // (tokenization happens in worker threads), iterating the live array would keep consuming newly + // appended parts and can effectively block until the stream ends. + // + // That blocks AgentSession.emitHistoricalEvents() from sending "caught-up" on reconnect, + // leaving the renderer stuck in "Loading workspace" and suppressing the streaming indicator. + const replayParts = streamInfo.parts.slice(); + const replayMessageId = streamInfo.messageId; + for (const part of replayParts) { + await this.emitPartAsEvent(typedWorkspaceId, replayMessageId, part); } }