diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index f472790e5..b90ecbcc1 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -219,6 +219,42 @@ export interface WorkspaceConsumersState { isCalculating: boolean; } +interface WorkspaceChatTransientState { + caughtUp: boolean; + historicalMessages: MuxMessage[]; + pendingStreamEvents: WorkspaceChatMessage[]; + replayingHistory: boolean; + queuedMessage: QueuedMessage | null; + liveBashOutput: Map; +} + +function createInitialChatTransientState(): WorkspaceChatTransientState { + return { + caughtUp: false, + historicalMessages: [], + pendingStreamEvents: [], + replayingHistory: false, + queuedMessage: null, + liveBashOutput: new Map(), + }; +} + +const ON_CHAT_RETRY_BASE_MS = 250; +const ON_CHAT_RETRY_MAX_MS = 5000; + +type IteratorValidationFailedError = Error & { code: "EVENT_ITERATOR_VALIDATION_FAILED" }; + +function isIteratorValidationFailed(error: unknown): error is IteratorValidationFailedError { + return ( + error instanceof Error && + (error as { code?: unknown }).code === "EVENT_ITERATOR_VALIDATION_FAILED" + ); +} + +function calculateOnChatBackoffMs(attempt: number): number { + return Math.min(ON_CHAT_RETRY_BASE_MS * 2 ** attempt, ON_CHAT_RETRY_MAX_MS); +} + /** * External store for workspace aggregators and streaming state. * @@ -247,11 +283,11 @@ export class WorkspaceStore { // Supporting data structures private aggregators = new Map(); private ipcUnsubscribers = new Map void>(); - private caughtUp = new Map(); - private historicalMessages = new Map(); - private pendingStreamEvents = new Map(); + + // Per-workspace ephemeral chat state (buffering, queued message, live bash output, etc.) + private chatTransientState = new Map(); + private workspaceMetadata = new Map(); // Store metadata for name lookup - private queuedMessages = new Map(); // Cached queued messages // Workspace timing stats snapshots (from workspace.stats.subscribe) private statsEnabled = false; @@ -260,9 +296,6 @@ export class WorkspaceStore { private statsUnsubscribers = new Map void>(); // Cumulative session usage (from session-usage.json) - // UI-only incremental bash output streamed via bash-output events (not persisted). - // Keyed by toolCallId. - private liveBashOutput = new Map>(); private sessionUsage = new Map>(); // Idle compaction notification callbacks (called when backend signals idle compaction needed) @@ -390,11 +423,8 @@ export class WorkspaceStore { if (toolCallEnd.toolName === "bash") { const output = (toolCallEnd.result as { output?: unknown } | undefined)?.output; if (typeof output === "string") { - const perWorkspace = this.liveBashOutput.get(workspaceId); - perWorkspace?.delete(toolCallEnd.toolCallId); - if (perWorkspace?.size === 0) { - this.liveBashOutput.delete(workspaceId); - } + const transient = this.chatTransientState.get(workspaceId); + transient?.liveBashOutput.delete(toolCallEnd.toolCallId); } } @@ -451,7 +481,7 @@ export class WorkspaceStore { } : null; - this.queuedMessages.set(workspaceId, queuedMessage); + this.assertChatTransientState(workspaceId).queuedMessage = queuedMessage; this.states.bump(workspaceId); }, "restore-to-input": (workspaceId, _aggregator, data) => { @@ -477,9 +507,6 @@ export class WorkspaceStore { // Track previous sidebar state per workspace (to prevent unnecessary bumps) private previousSidebarValues = new Map(); - // Track workspaces currently replaying buffered history (to avoid O(N) scheduling) - private replayingHistory = new Set(); - // Track model usage (optional integration point for model bookkeeping) private readonly onModelUsed?: (model: string) => void; @@ -672,7 +699,7 @@ export class WorkspaceStore { workspaceId: string, aggregator: StreamingMessageAggregator ): void { - const perWorkspace = this.liveBashOutput.get(workspaceId); + const perWorkspace = this.chatTransientState.get(workspaceId)?.liveBashOutput; if (!perWorkspace || perWorkspace.size === 0) return; const activeToolCallIds = new Set(); @@ -687,10 +714,6 @@ export class WorkspaceStore { perWorkspace.delete(toolCallId); } } - - if (perWorkspace.size === 0) { - this.liveBashOutput.delete(workspaceId); - } } /** @@ -714,8 +737,7 @@ export class WorkspaceStore { }; getBashToolLiveOutput(workspaceId: string, toolCallId: string): LiveBashOutputView | null { - const perWorkspace = this.liveBashOutput.get(workspaceId); - const state = perWorkspace?.get(toolCallId); + const state = this.chatTransientState.get(workspaceId)?.liveBashOutput.get(toolCallId); // Important: return the stored object reference so useSyncExternalStore sees a stable snapshot. // (Returning a fresh object every call can trigger an infinite re-render loop.) @@ -732,6 +754,12 @@ export class WorkspaceStore { return aggregator; } + private assertChatTransientState(workspaceId: string): WorkspaceChatTransientState { + const state = this.chatTransientState.get(workspaceId); + assert(state, `Workspace ${workspaceId} not found - must call addWorkspace() first`); + return state; + } + /** * Get state for a specific workspace. * Lazy computation - only runs when version changes. @@ -743,7 +771,7 @@ export class WorkspaceStore { const aggregator = this.assertGet(workspaceId); const hasMessages = aggregator.hasMessages(); - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; + const transient = this.assertChatTransientState(workspaceId); const activeStreams = aggregator.getActiveStreams(); const messages = aggregator.getAllMessages(); const metadata = this.workspaceMetadata.get(workspaceId); @@ -751,11 +779,11 @@ export class WorkspaceStore { return { name: metadata?.name ?? workspaceId, // Fall back to ID if metadata missing messages: aggregator.getDisplayedMessages(), - queuedMessage: this.queuedMessages.get(workspaceId) ?? null, + queuedMessage: transient.queuedMessage, canInterrupt: activeStreams.length > 0, isCompacting: aggregator.isCompacting(), awaitingUserQuestion: aggregator.hasAwaitingUserQuestion(), - loading: !hasMessages && !isCaughtUp, + loading: !hasMessages && !transient.caughtUp, muxMessages: messages, currentModel: aggregator.getCurrentModel() ?? null, recencyTimestamp: aggregator.getRecencyTimestamp(), @@ -1028,7 +1056,7 @@ export class WorkspaceStore { */ getWorkspaceConsumers(workspaceId: string): WorkspaceConsumersState { const aggregator = this.aggregators.get(workspaceId); - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; + const isCaughtUp = this.chatTransientState.get(workspaceId)?.caughtUp ?? false; // Lazy trigger check (runs on EVERY access, not just when MapStore recomputes) const cached = this.consumerManager.getCachedState(workspaceId); @@ -1088,7 +1116,7 @@ export class WorkspaceStore { metadata?: { usage?: LanguageModelV2Usage } ): void { // During history replay: only bump usage, skip scheduling (caught-up schedules once at end) - if (this.replayingHistory.has(workspaceId)) { + if (this.chatTransientState.get(workspaceId)?.replayingHistory) { if (metadata?.usage) { this.usageStore.bump(workspaceId); } @@ -1108,6 +1136,136 @@ export class WorkspaceStore { } } + private sleepWithAbort(timeoutMs: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + if (signal.aborted) { + resolve(); + return; + } + + const timeout = setTimeout(() => { + resolve(); + }, timeoutMs); + + signal.addEventListener( + "abort", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true } + ); + }); + } + + private isWorkspaceSubscribed(workspaceId: string): boolean { + return this.ipcUnsubscribers.has(workspaceId); + } + + private async waitForClient(signal: AbortSignal): Promise | null> { + while (!signal.aborted) { + if (this.client) { + return this.client; + } + + // Wait for a client to be attached (e.g., initial connect or reconnect). + await this.sleepWithAbort(ON_CHAT_RETRY_BASE_MS, signal); + } + + return null; + } + + /** + * Reset derived UI state for a workspace so a fresh onChat replay can rebuild it. + * + * This is used when an onChat subscription ends unexpectedly (MessagePort/WebSocket hiccup). + * Without clearing, replayed history would be merged into stale state (loadHistoricalMessages + * only adds/overwrites, it doesn't delete messages that disappeared due to compaction/truncation). + */ + private resetChatStateForReplay(workspaceId: string): void { + const aggregator = this.aggregators.get(workspaceId); + if (!aggregator) { + return; + } + + // Clear any pending UI bumps from deltas - we're about to rebuild the message list. + this.cancelPendingIdleBump(workspaceId); + + aggregator.clear(); + + // Reset per-workspace transient state so the next replay rebuilds from the backend source of truth. + this.chatTransientState.set(workspaceId, createInitialChatTransientState()); + + this.states.bump(workspaceId); + this.checkAndBumpRecencyIfChanged(); + } + + /** + * Subscribe to workspace chat events (history replay + live streaming). + * Retries on unexpected iterator termination to avoid requiring a full app restart. + */ + private async runOnChatSubscription(workspaceId: string, signal: AbortSignal): Promise { + let attempt = 0; + + while (!signal.aborted) { + const client = this.client ?? (await this.waitForClient(signal)); + if (!client || signal.aborted) { + return; + } + + try { + const iterator = await client.workspace.onChat({ workspaceId }, { signal }); + + for await (const data of iterator) { + if (signal.aborted) { + return; + } + + // Connection is alive again - don't carry old backoff into the next failure. + attempt = 0; + + queueMicrotask(() => { + this.handleChatMessage(workspaceId, data); + }); + } + + // Iterator ended without an abort - treat as unexpected and retry. + if (signal.aborted) { + return; + } + + console.warn( + `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` + ); + } catch (error) { + // Suppress errors when subscription was intentionally cleaned up + if (signal.aborted) { + return; + } + + // EVENT_ITERATOR_VALIDATION_FAILED with ErrorEvent cause happens when: + // - The workspace was removed on server side (iterator ends with error) + // - Connection dropped (WebSocket/MessagePort error) + // Only suppress if workspace no longer exists (was removed during the race) + if (isIteratorValidationFailed(error) && !this.isWorkspaceSubscribed(workspaceId)) { + return; + } + + console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); + } + + const delayMs = calculateOnChatBackoffMs(attempt); + attempt++; + + await this.sleepWithAbort(delayMs, signal); + if (signal.aborted) { + return; + } + + this.resetChatStateForReplay(workspaceId); + } + } + /** * Add a workspace and subscribe to its IPC events. */ @@ -1138,12 +1296,9 @@ export class WorkspaceStore { this.derived.bump("recency"); } - // Initialize state - if (!this.caughtUp.has(workspaceId)) { - this.caughtUp.set(workspaceId, false); - } - if (!this.historicalMessages.has(workspaceId)) { - this.historicalMessages.set(workspaceId, []); + // Initialize transient chat state + if (!this.chatTransientState.has(workspaceId)) { + this.chatTransientState.set(workspaceId, createInitialChatTransientState()); } // Clear stale streaming state @@ -1151,59 +1306,32 @@ export class WorkspaceStore { // Subscribe to IPC events // Wrap in queueMicrotask to ensure IPC events don't update during React render - if (this.client) { - const controller = new AbortController(); - const { signal } = controller; - - // Fire and forget the async loop - (async () => { - try { - const iterator = await this.client!.workspace.onChat({ workspaceId }, { signal }); - - for await (const data of iterator) { - if (signal.aborted) break; - queueMicrotask(() => { - this.handleChatMessage(workspaceId, data); - }); - } - } catch (error) { - // Suppress errors when subscription was intentionally cleaned up - if (signal.aborted) return; - - // EVENT_ITERATOR_VALIDATION_FAILED with ErrorEvent cause happens when: - // - The workspace was removed on server side (iterator ends with error) - // - Connection dropped (WebSocket/MessagePort error) - // Only suppress if workspace no longer exists (was removed during the race) - const isIteratorError = - error instanceof Error && - "code" in error && - error.code === "EVENT_ITERATOR_VALIDATION_FAILED"; - const workspaceRemoved = !this.states.has(workspaceId); - if (isIteratorError && workspaceRemoved) return; - - console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); - } - })(); + const controller = new AbortController(); + const { signal } = controller; - this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); + this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); - // Fetch persisted session usage (fire-and-forget) - this.client.workspace - .getSessionUsage({ workspaceId }) - .then((data) => { - if (data) { - this.sessionUsage.set(workspaceId, data); - this.usageStore.bump(workspaceId); - } - }) - .catch((error) => { - console.warn(`Failed to fetch session usage for ${workspaceId}:`, error); - }); + // Fire and forget the subscription loop (retries on errors) + void this.runOnChatSubscription(workspaceId, signal); - if (this.statsEnabled) { - this.subscribeToStats(workspaceId); - } - } else { + // Fetch persisted session usage (fire-and-forget) + this.client?.workspace + .getSessionUsage({ workspaceId }) + .then((data) => { + if (data) { + this.sessionUsage.set(workspaceId, data); + this.usageStore.bump(workspaceId); + } + }) + .catch((error) => { + console.warn(`Failed to fetch session usage for ${workspaceId}:`, error); + }); + + if (this.statsEnabled) { + this.subscribeToStats(workspaceId); + } + + if (!this.client) { console.warn(`[WorkspaceStore] No ORPC client available for workspace ${workspaceId}`); } } @@ -1235,9 +1363,7 @@ export class WorkspaceStore { this.usageStore.delete(workspaceId); this.consumersStore.delete(workspaceId); this.aggregators.delete(workspaceId); - this.caughtUp.delete(workspaceId); - this.historicalMessages.delete(workspaceId); - this.pendingStreamEvents.delete(workspaceId); + this.chatTransientState.delete(workspaceId); this.recencyCache.delete(workspaceId); this.previousSidebarValues.delete(workspaceId); this.sidebarStateCache.delete(workspaceId); @@ -1245,7 +1371,6 @@ export class WorkspaceStore { this.workspaceCreatedAt.delete(workspaceId); this.workspaceStats.delete(workspaceId); this.statsStore.delete(workspaceId); - this.liveBashOutput.delete(workspaceId); this.sessionUsage.delete(workspaceId); } @@ -1291,12 +1416,9 @@ export class WorkspaceStore { this.usageStore.clear(); this.consumersStore.clear(); this.aggregators.clear(); - this.caughtUp.clear(); - this.historicalMessages.clear(); - this.pendingStreamEvents.clear(); + this.chatTransientState.clear(); this.workspaceStats.clear(); this.statsStore.clear(); - this.liveBashOutput.clear(); this.sessionUsage.clear(); this.recencyCache.clear(); this.previousSidebarValues.clear(); @@ -1385,36 +1507,35 @@ export class WorkspaceStore { // Aggregator must exist - IPC subscription happens in addWorkspace() const aggregator = this.assertGet(workspaceId); - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; - const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; + const transient = this.assertChatTransientState(workspaceId); if (isCaughtUpMessage(data)) { // Check if there's an active stream in buffered events (reconnection scenario) - const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? []; + const pendingEvents = transient.pendingStreamEvents; const hasActiveStream = pendingEvents.some( (event) => "type" in event && event.type === "stream-start" ); // Load historical messages first - if (historicalMsgs.length > 0) { - aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream); - this.historicalMessages.set(workspaceId, []); + if (transient.historicalMessages.length > 0) { + aggregator.loadHistoricalMessages(transient.historicalMessages, hasActiveStream); + transient.historicalMessages.length = 0; } // Mark that we're replaying buffered history (prevents O(N) scheduling) - this.replayingHistory.add(workspaceId); + transient.replayingHistory = true; // Process buffered stream events now that history is loaded for (const event of pendingEvents) { this.processStreamEvent(workspaceId, aggregator, event); } - this.pendingStreamEvents.set(workspaceId, []); + pendingEvents.length = 0; // Done replaying buffered events - this.replayingHistory.delete(workspaceId); + transient.replayingHistory = false; // Mark as caught up - this.caughtUp.set(workspaceId, true); + transient.caughtUp = true; this.states.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); // Messages loaded, update recency @@ -1449,10 +1570,8 @@ export class WorkspaceStore { // // This is especially important for workspaces with long histories (100+ messages), // where unbuffered rendering would cause visible lag and UI stutter. - if (!isCaughtUp && this.isBufferedEvent(data)) { - const pending = this.pendingStreamEvents.get(workspaceId) ?? []; - pending.push(data); - this.pendingStreamEvents.set(workspaceId, pending); + if (!transient.caughtUp && this.isBufferedEvent(data)) { + transient.pendingStreamEvents.push(data); return; } @@ -1504,18 +1623,16 @@ export class WorkspaceStore { if (isBashOutputEvent(data)) { if (data.text.length === 0) return; - const perWorkspace = - this.liveBashOutput.get(workspaceId) ?? new Map(); + const transient = this.assertChatTransientState(workspaceId); - const prev = perWorkspace.get(data.toolCallId); + const prev = transient.liveBashOutput.get(data.toolCallId); const next = appendLiveBashOutputChunk( prev, { text: data.text, isError: data.isError }, BASH_TRUNCATE_MAX_TOTAL_BYTES ); - perWorkspace.set(data.toolCallId, next); - this.liveBashOutput.set(workspaceId, perWorkspace); + transient.liveBashOutput.set(data.toolCallId, next); // High-frequency: throttle UI updates like other delta-style events. this.scheduleIdleStateBump(workspaceId); @@ -1530,12 +1647,11 @@ export class WorkspaceStore { // Regular messages (MuxMessage without type field) if (isMuxMessage(data)) { - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; - if (!isCaughtUp) { + const transient = this.assertChatTransientState(workspaceId); + + if (!transient.caughtUp) { // Buffer historical MuxMessages - const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; - historicalMsgs.push(data); - this.historicalMessages.set(workspaceId, historicalMsgs); + transient.historicalMessages.push(data); } else { // Process live events immediately (after history loaded) aggregator.handleMessage(data);