Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .storybook/mocks/orpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,22 @@ export function createMockORPCClient(options: MockORPCClientOptions = {}): APICl
data: { success: true, output: "", exitCode: 0, wall_duration_ms: 0 },
};
},
onChat: async function* (input: { workspaceId: string }) {
onChat: async function* (
input: { workspaceId: string },
options?: { signal?: AbortSignal }
) {
if (!onChat) {
// Default mock behavior: subscriptions should remain open.
// If this ends, WorkspaceStore will retry and reset state, which flakes stories.
yield { type: "caught-up" } as WorkspaceChatMessage;

await new Promise<void>((resolve) => {
if (options?.signal?.aborted) {
resolve();
return;
}
options?.signal?.addEventListener("abort", () => resolve(), { once: true });
});
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/browser/components/RightSidebar/StatsTab.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const VIEW_MODE_OPTIONS: Array<ToggleOption<ViewMode>> = [
interface ModelBreakdownEntry {
key: string;
model: string;
mode?: "plan" | "exec";
mode?: string;
totalDurationMs: number;
totalToolExecutionMs: number;
totalStreamingMs: number;
Expand Down
8 changes: 4 additions & 4 deletions src/browser/components/RightSidebar/statsTabCalculations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface ModelBreakdownEntry {
tokensPerSec: number | null;
avgTokensPerMsg: number | null;
avgReasoningPerMsg: number | null;
mode?: "plan" | "exec";
mode?: string;
}

export interface ModelBreakdownData {
Expand Down Expand Up @@ -139,7 +139,7 @@ function getModelDisplayName(model: string): string {
const MODE_SUFFIX_PLAN = ":plan" as const;
const MODE_SUFFIX_EXEC = ":exec" as const;

function parseStatsKey(key: string): { model: string; mode?: "plan" | "exec" } {
function parseStatsKey(key: string): { model: string; mode?: string } {
if (key.endsWith(MODE_SUFFIX_PLAN)) {
return { model: key.slice(0, -MODE_SUFFIX_PLAN.length), mode: "plan" };
}
Expand Down Expand Up @@ -220,7 +220,7 @@ export function computeModelBreakdownData(params: {
ttftCount: number;
liveTPS: number | null;
liveTokenCount: number;
mode?: "plan" | "exec";
mode?: string;
}

const breakdown: Record<string, BreakdownEntry> = {};
Expand Down Expand Up @@ -287,7 +287,7 @@ export function computeModelBreakdownData(params: {
const toModelBreakdownEntry = (
model: string,
stats: BreakdownEntry,
mode?: "plan" | "exec"
mode?: string
): ModelBreakdownEntry => {
const modelTime = Math.max(0, stats.totalDuration - stats.toolExecutionMs);
const avgTtft = stats.ttftCount > 0 ? stats.ttftSum / stats.ttftCount : null;
Expand Down
4 changes: 2 additions & 2 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export interface StreamTimingStats {
/** Live tokens-per-second during streaming - only available for active streams */
liveTPS?: number;
/** Mode (plan/exec) in which this stream occurred */
mode?: "plan" | "exec";
mode?: string;
}

/** Per-model timing statistics */
Expand All @@ -104,7 +104,7 @@ export interface ModelTimingStats {
/** Total reasoning/thinking tokens generated by this model */
totalReasoningTokens: number;
/** Mode extracted from composite key (undefined for old data without mode) */
mode?: "plan" | "exec";
mode?: string;
}

/**
Expand Down
16 changes: 8 additions & 8 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ interface StreamingContext {
pendingToolStarts: Map<string, number>;

/** Mode (plan/exec) */
mode?: "plan" | "exec";
mode?: string;
}

/**
Expand Down Expand Up @@ -222,7 +222,7 @@ export class StreamingMessageAggregator {
outputTokens: number;
reasoningTokens: number;
streamingMs: number; // Time from first token to end (for accurate tok/s)
mode?: "plan" | "exec"; // Mode in which this response occurred
mode?: string; // Mode in which this response occurred
} | null = null;

// Session-level timing stats: model -> stats (totals computed on-the-fly)
Expand Down Expand Up @@ -468,7 +468,7 @@ export class StreamingMessageAggregator {
// Streaming duration excludes TTFT and tool execution - used for avg tok/s
const streamingMs = Math.max(0, durationMs - (ttftMs ?? 0) - totalToolExecutionMs);

const mode = (message?.metadata?.mode ?? context.mode) as "plan" | "exec" | undefined;
const mode = message?.metadata?.mode ?? context.mode;

// Store last completed stream stats (include durations anchored in the renderer clock)
const startTime = endTime - durationMs;
Expand Down Expand Up @@ -648,7 +648,7 @@ export class StreamingMessageAggregator {
/** Live tokens-per-second (trailing window) */
liveTPS: number;
/** Mode (plan/exec) for this stream */
mode?: "plan" | "exec";
mode?: string;
} | null {
// Get the first (and typically only) active stream
const entries = Array.from(this.activeStreams.entries());
Expand Down Expand Up @@ -695,7 +695,7 @@ export class StreamingMessageAggregator {
outputTokens: number;
reasoningTokens: number;
streamingMs: number;
mode?: "plan" | "exec";
mode?: string;
} | null {
return this.lastCompletedStreamStats;
}
Expand Down Expand Up @@ -728,7 +728,7 @@ export class StreamingMessageAggregator {
totalOutputTokens: number;
totalReasoningTokens: number;
/** Mode extracted from composite key, undefined for old data */
mode?: "plan" | "exec";
mode?: string;
}
>;
} | null {
Expand All @@ -755,15 +755,15 @@ export class StreamingMessageAggregator {
responseCount: number;
totalOutputTokens: number;
totalReasoningTokens: number;
mode?: "plan" | "exec";
mode?: string;
}
> = {};

for (const [key, stats] of modelEntries) {
// Parse composite key: "model" or "model:mode"
// Model names can contain colons (e.g., "mux-gateway:provider/model")
// so we look for ":plan" or ":exec" suffix specifically
let mode: "plan" | "exec" | undefined;
let mode: string | undefined;
if (key.endsWith(":plan")) {
mode = "plan";
} else if (key.endsWith(":exec")) {
Expand Down
4 changes: 2 additions & 2 deletions src/browser/utils/messages/compactionOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe("applyCompactionOverrides", () => {
const result = applyCompactionOverrides(baseOptions, compactData);

expect(result.mode).toBe("compact");
expect(result.toolPolicy).toEqual([]);
expect(result.toolPolicy).toEqual([{ regex_match: ".*", action: "disable" }]);
});

it("disables all tools even when base options has tool policy", () => {
Expand All @@ -74,7 +74,7 @@ describe("applyCompactionOverrides", () => {
const result = applyCompactionOverrides(baseWithTools, compactData);

expect(result.mode).toBe("compact");
expect(result.toolPolicy).toEqual([]); // Tools always disabled for compaction
expect(result.toolPolicy).toEqual([{ regex_match: ".*", action: "disable" }]); // Tools always disabled for compaction
});

it("applies all overrides together", () => {
Expand Down
3 changes: 2 additions & 1 deletion src/browser/utils/messages/compactionOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export function applyCompactionOverrides(
thinkingLevel: baseOptions.thinkingLevel,
maxOutputTokens: compactData.maxOutputTokens,
mode: "compact" as const,
toolPolicy: [], // Disable all tools during compaction
// Disable all tools during compaction - regex .* matches all tool names
toolPolicy: [{ regex_match: ".*", action: "disable" }],
};
}
4 changes: 2 additions & 2 deletions src/common/orpc/schemas/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ export const StreamStartEventSchema = z.object({
startTime: z.number().meta({
description: "Backend timestamp when stream started (Date.now())",
}),
mode: z.enum(["plan", "exec"]).optional().meta({
description: "Agent mode (plan/exec) for this stream",
mode: z.string().optional().meta({
description: "Agent mode for this stream",
}),
});

Expand Down
3 changes: 2 additions & 1 deletion src/common/orpc/schemas/workspaceStats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { z } from "zod";

const ModeSchema = z.enum(["plan", "exec"]);
// Mode is a string to support any mode value (plan, exec, compact, etc.)
const ModeSchema = z.string();

export const TimingAnomalySchema = z.enum([
"negative_duration",
Expand Down
9 changes: 8 additions & 1 deletion src/node/services/initStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ export class InitStateManager extends EventEmitter {
});

// Emit init-output for each accumulated line with original timestamps
for (const timedLine of state.lines) {
// Defensive: state.lines could be undefined from old persisted data
const lines = state.lines ?? [];
for (const timedLine of lines) {
// Skip malformed entries (missing required fields)
if (typeof timedLine.line !== "string" || typeof timedLine.timestamp !== "number") {
log.warn(`[InitStateManager] Skipping malformed init-output:`, timedLine);
continue;
}
events.push({
type: "init-output",
workspaceId,
Expand Down
4 changes: 2 additions & 2 deletions src/node/services/sessionTimingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ interface ActiveStreamState {
workspaceId: string;
messageId: string;
model: string;
mode?: "plan" | "exec";
mode?: string;

startTimeMs: number;
firstTokenTimeMs: number | null;
Expand All @@ -64,7 +64,7 @@ interface ActiveStreamState {
lastEventTimestampMs: number;
}

function getModelKey(model: string, mode: "plan" | "exec" | undefined): string {
function getModelKey(model: string, mode: string | undefined): string {
return mode ? `${model}:${mode}` : model;
}

Expand Down
67 changes: 67 additions & 0 deletions src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,73 @@ describe("StreamManager - previousResponseId recovery", () => {
});
});

describe("StreamManager - replayStream", () => {
test("replayStream snapshots parts so reconnect doesn't block until stream ends", async () => {
const mockHistoryService = createMockHistoryService();
const mockPartialService = createMockPartialService();
const streamManager = new StreamManager(mockHistoryService, mockPartialService);

// Suppress error events from bubbling up as uncaught exceptions during tests
streamManager.on("error", () => undefined);

const workspaceId = "ws-replay-snapshot";

const deltas: string[] = [];
streamManager.on("stream-delta", (event: { delta: string }) => {
deltas.push(event.delta);
});

// Inject an active stream into the private workspaceStreams map.
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const workspaceStreamsValue = Reflect.get(streamManager, "workspaceStreams");
if (!(workspaceStreamsValue instanceof Map)) {
throw new Error("StreamManager.workspaceStreams is not a Map");
}
const workspaceStreams = workspaceStreamsValue as Map<string, unknown>;

const streamInfo = {
state: "streaming",
messageId: "msg-1",
model: "claude-sonnet-4",
historySequence: 1,
startTime: 123,
initialMetadata: {},
parts: [{ type: "text", text: "a", timestamp: 10 }],
};

workspaceStreams.set(workspaceId, streamInfo);

// Patch the private tokenTracker to (a) avoid worker setup and (b) mutate parts during replay.
const tokenTracker = Reflect.get(streamManager, "tokenTracker") as {
setModel: (model: string) => Promise<void>;
countTokens: (text: string) => Promise<number>;
};

tokenTracker.setModel = () => Promise.resolve();

let pushed = false;
tokenTracker.countTokens = async () => {
if (!pushed) {
pushed = true;
// While replay is mid-await, simulate the running stream appending more parts.
(streamInfo.parts as Array<{ type: string; text?: string; timestamp?: number }>).push({
type: "text",
text: "b",
timestamp: 20,
});
}
// Force an await boundary so the mutation happens during replay.
await new Promise((resolve) => setTimeout(resolve, 0));
return 1;
};

await streamManager.replayStream(workspaceId);

// If replayStream iterates the live array, it would also emit "b".
expect(deltas).toEqual(["a"]);
});
});

describe("StreamManager - ask_user_question Partial Persistence", () => {
// Note: The ask_user_question tool blocks waiting for user input.
// If the app restarts during that wait, the partial must be persisted.
Expand Down
17 changes: 13 additions & 4 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1787,10 +1787,19 @@ export class StreamManager extends EventEmitter {
...(replayMode && { mode: replayMode }),
});

// Replay accumulated parts as events using shared emission logic
// This guarantees replay produces identical events to the original stream
for (const part of streamInfo.parts) {
await this.emitPartAsEvent(typedWorkspaceId, streamInfo.messageId, part);
// Replay accumulated parts as events using shared emission logic.
// IMPORTANT: Snapshot the parts array up-front.
//
// streamInfo.parts is mutated while the stream is running. Because emitPartAsEvent() is async
// (tokenization happens in worker threads), iterating the live array would keep consuming newly
// appended parts and can effectively block until the stream ends.
//
// That blocks AgentSession.emitHistoricalEvents() from sending "caught-up" on reconnect,
// leaving the renderer stuck in "Loading workspace" and suppressing the streaming indicator.
const replayParts = streamInfo.parts.slice();
const replayMessageId = streamInfo.messageId;
for (const part of replayParts) {
await this.emitPartAsEvent(typedWorkspaceId, replayMessageId, part);
}
}

Expand Down