Skip to content

Commit 56e9dac

Browse files
committed
🤖 Refactor: Make silent event drops structurally impossible
Replaced separate isStreamEvent() check and processStreamEvent() dispatch with a single event handler map. This makes it impossible to add a buffered event type without a handler: - Single source of truth: bufferedEventHandlers map defines both which events to buffer (keys) and how to handle them (values) - No synchronization bugs: Can't add to one without the other - Simpler code: ~100 LoC of if-chains replaced with map dispatch - Self-documenting: Map keys show all buffered event types at a glance Net: -30 LoC, same behavior, zero risk of silent drops Generated with `cmux`
1 parent 2c7b820 commit 56e9dac

File tree

1 file changed

+98
-142
lines changed

1 file changed

+98
-142
lines changed

src/stores/WorkspaceStore.ts

Lines changed: 98 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,7 @@ import { updatePersistedState } from "@/hooks/usePersistedState";
99
import { getRetryStateKey } from "@/constants/storage";
1010
import { CUSTOM_EVENTS } from "@/constants/events";
1111
import { useSyncExternalStore } from "react";
12-
import {
13-
isCaughtUpMessage,
14-
isStreamError,
15-
isDeleteMessage,
16-
isStreamStart,
17-
isStreamDelta,
18-
isStreamEnd,
19-
isStreamAbort,
20-
isToolCallStart,
21-
isToolCallDelta,
22-
isToolCallEnd,
23-
isReasoningDelta,
24-
isReasoningEnd,
25-
isInitStart,
26-
isInitOutput,
27-
isInitEnd,
28-
} from "@/types/ipc";
12+
import { isCaughtUpMessage, isStreamError, isDeleteMessage } from "@/types/ipc";
2913
import { MapStore } from "./MapStore";
3014
import { createDisplayUsage } from "@/utils/tokens/displayUsage";
3115
import { WorkspaceConsumerManager } from "./WorkspaceConsumerManager";
@@ -125,6 +109,93 @@ export class WorkspaceStore {
125109
private historicalMessages = new Map<string, CmuxMessage[]>();
126110
private pendingStreamEvents = new Map<string, WorkspaceChatMessage[]>();
127111

112+
/**
113+
* Map of event types to their handlers. This is the single source of truth for:
114+
* 1. Which events should be buffered during replay (the keys)
115+
* 2. How to process those events (the values)
116+
*
117+
* By keeping check and processing in one place, we make it structurally impossible
118+
* to buffer an event type without having a handler for it.
119+
*/
120+
private readonly bufferedEventHandlers: Record<
121+
string,
122+
(workspaceId: string, aggregator: StreamingMessageAggregator, data: WorkspaceChatMessage) => void
123+
> = {
124+
"stream-start": (workspaceId, aggregator, data) => {
125+
aggregator.handleStreamStart(data as never);
126+
if (this.onModelUsed) {
127+
this.onModelUsed((data as { model: string }).model);
128+
}
129+
updatePersistedState(getRetryStateKey(workspaceId), {
130+
attempt: 0,
131+
retryStartTime: Date.now(),
132+
});
133+
this.states.bump(workspaceId);
134+
},
135+
"stream-delta": (workspaceId, aggregator, data) => {
136+
aggregator.handleStreamDelta(data as never);
137+
this.states.bump(workspaceId);
138+
},
139+
"stream-end": (workspaceId, aggregator, data) => {
140+
aggregator.handleStreamEnd(data as never);
141+
aggregator.clearTokenState((data as { messageId: string }).messageId);
142+
143+
if (this.handleCompactionCompletion(workspaceId, aggregator, data)) {
144+
return;
145+
}
146+
147+
this.states.bump(workspaceId);
148+
this.checkAndBumpRecencyIfChanged();
149+
this.finalizeUsageStats(workspaceId, (data as { metadata?: never }).metadata);
150+
},
151+
"stream-abort": (workspaceId, aggregator, data) => {
152+
aggregator.clearTokenState((data as { messageId: string }).messageId);
153+
aggregator.handleStreamAbort(data as never);
154+
155+
if (this.handleCompactionAbort(workspaceId, aggregator, data)) {
156+
return;
157+
}
158+
159+
this.states.bump(workspaceId);
160+
this.dispatchResumeCheck(workspaceId);
161+
this.finalizeUsageStats(workspaceId, (data as { metadata?: never }).metadata);
162+
},
163+
"tool-call-start": (workspaceId, aggregator, data) => {
164+
aggregator.handleToolCallStart(data as never);
165+
this.states.bump(workspaceId);
166+
},
167+
"tool-call-delta": (workspaceId, aggregator, data) => {
168+
aggregator.handleToolCallDelta(data as never);
169+
this.states.bump(workspaceId);
170+
},
171+
"tool-call-end": (workspaceId, aggregator, data) => {
172+
aggregator.handleToolCallEnd(data as never);
173+
this.states.bump(workspaceId);
174+
this.consumerManager.scheduleCalculation(workspaceId, aggregator);
175+
},
176+
"reasoning-delta": (workspaceId, aggregator, data) => {
177+
aggregator.handleReasoningDelta(data as never);
178+
this.states.bump(workspaceId);
179+
},
180+
"reasoning-end": (workspaceId, aggregator, data) => {
181+
aggregator.handleReasoningEnd(data as never);
182+
this.states.bump(workspaceId);
183+
},
184+
"init-start": (workspaceId, aggregator, data) => {
185+
aggregator.handleMessage(data);
186+
this.states.bump(workspaceId);
187+
},
188+
"init-output": (workspaceId, aggregator, data) => {
189+
aggregator.handleMessage(data);
190+
this.states.bump(workspaceId);
191+
},
192+
"init-end": (workspaceId, aggregator, data) => {
193+
aggregator.handleMessage(data);
194+
this.states.bump(workspaceId);
195+
},
196+
};
197+
198+
128199
// Cache of last known recency per workspace (for change detection)
129200
private recencyCache = new Map<string, number | null>();
130201

@@ -784,29 +855,11 @@ export class WorkspaceStore {
784855
}
785856

786857
/**
787-
* Check if data is a stream event or init event that should be buffered until caught-up.
788-
*
789-
* Init events may arrive:
790-
* - BEFORE caught-up: During replay from init-status.json (historical)
791-
* - AFTER caught-up: During live workspace creation (real-time)
792-
*
793-
* Like stream events, init events are buffered during replay to avoid O(N) re-renders.
858+
* Check if data is a buffered event type by checking the handler map.
859+
* This ensures isStreamEvent() and processStreamEvent() can never fall out of sync.
794860
*/
795-
private isStreamEvent(data: WorkspaceChatMessage): boolean {
796-
return (
797-
isStreamStart(data) ||
798-
isStreamDelta(data) ||
799-
isStreamEnd(data) ||
800-
isStreamAbort(data) ||
801-
isToolCallStart(data) ||
802-
isToolCallDelta(data) ||
803-
isToolCallEnd(data) ||
804-
isReasoningDelta(data) ||
805-
isReasoningEnd(data) ||
806-
isInitStart(data) ||
807-
isInitOutput(data) ||
808-
isInitEnd(data)
809-
);
861+
private isBufferedEvent(data: WorkspaceChatMessage): boolean {
862+
return "type" in data && data.type in this.bufferedEventHandlers;
810863
}
811864

812865
private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void {
@@ -864,7 +917,7 @@ export class WorkspaceStore {
864917
//
865918
// This is especially important for workspaces with long histories (100+ messages),
866919
// where unbuffered rendering would cause visible lag and UI stutter.
867-
if (!isCaughtUp && this.isStreamEvent(data)) {
920+
if (!isCaughtUp && this.isBufferedEvent(data)) {
868921
const pending = this.pendingStreamEvents.get(workspaceId) ?? [];
869922
pending.push(data);
870923
this.pendingStreamEvents.set(workspaceId, pending);
@@ -880,6 +933,7 @@ export class WorkspaceStore {
880933
aggregator: StreamingMessageAggregator,
881934
data: WorkspaceChatMessage
882935
): void {
936+
// Handle non-buffered special events first
883937
if (isStreamError(data)) {
884938
aggregator.handleStreamError(data);
885939
this.states.bump(workspaceId);
@@ -890,111 +944,13 @@ export class WorkspaceStore {
890944
if (isDeleteMessage(data)) {
891945
aggregator.handleDeleteMessage(data);
892946
this.states.bump(workspaceId);
893-
this.checkAndBumpRecencyIfChanged(); // Message deleted, update recency
894-
return;
895-
}
896-
897-
if (isStreamStart(data)) {
898-
aggregator.handleStreamStart(data);
899-
if (this.onModelUsed) {
900-
this.onModelUsed(data.model);
901-
}
902-
updatePersistedState(getRetryStateKey(workspaceId), {
903-
attempt: 0,
904-
retryStartTime: Date.now(),
905-
});
906-
this.states.bump(workspaceId);
907-
return;
908-
}
909-
910-
if (isStreamDelta(data)) {
911-
aggregator.handleStreamDelta(data);
912-
// Always bump for chat components to see deltas
913-
// Sidebar components won't re-render because getWorkspaceSidebarState() returns cached object
914-
this.states.bump(workspaceId);
915-
return;
916-
}
917-
918-
if (isStreamEnd(data)) {
919-
aggregator.handleStreamEnd(data);
920-
aggregator.clearTokenState(data.messageId);
921-
922-
// Early return if compaction handled (async replacement in progress)
923-
if (this.handleCompactionCompletion(workspaceId, aggregator, data)) {
924-
return;
925-
}
926-
927-
// Normal stream-end handling
928-
this.states.bump(workspaceId);
929-
this.checkAndBumpRecencyIfChanged(); // Stream ended, update recency
930-
931-
// Update usage stats and schedule consumer calculation
932-
// MUST happen after aggregator.handleStreamEnd() stores the metadata
933-
this.finalizeUsageStats(workspaceId, data.metadata);
934-
935-
return;
936-
}
937-
938-
if (isStreamAbort(data)) {
939-
aggregator.clearTokenState(data.messageId);
940-
aggregator.handleStreamAbort(data);
941-
942-
// Check if this was a compaction stream that got interrupted
943-
if (this.handleCompactionAbort(workspaceId, aggregator, data)) {
944-
// Compaction abort handled, don't do normal abort processing
945-
return;
946-
}
947-
948-
// Normal abort handling
949-
this.states.bump(workspaceId);
950-
this.dispatchResumeCheck(workspaceId);
951-
952-
// Update usage stats if available (abort may have usage if stream completed processing)
953-
// MUST happen after aggregator.handleStreamAbort() stores the metadata
954-
this.finalizeUsageStats(workspaceId, data.metadata);
955-
956-
return;
957-
}
958-
959-
if (isToolCallStart(data)) {
960-
aggregator.handleToolCallStart(data);
961-
this.states.bump(workspaceId);
962-
return;
963-
}
964-
965-
if (isToolCallDelta(data)) {
966-
aggregator.handleToolCallDelta(data);
967-
this.states.bump(workspaceId);
968-
return;
969-
}
970-
971-
if (isToolCallEnd(data)) {
972-
aggregator.handleToolCallEnd(data);
973-
this.states.bump(workspaceId);
974-
975-
// Bump consumers on tool-end for real-time updates during streaming
976-
// Tools complete before stream-end, so we want breakdown to update immediately
977-
this.consumerManager.scheduleCalculation(workspaceId, aggregator);
978-
979-
return;
980-
}
981-
982-
if (isReasoningDelta(data)) {
983-
aggregator.handleReasoningDelta(data);
984-
this.states.bump(workspaceId);
985-
return;
986-
}
987-
988-
if (isReasoningEnd(data)) {
989-
aggregator.handleReasoningEnd(data);
990-
this.states.bump(workspaceId);
947+
this.checkAndBumpRecencyIfChanged();
991948
return;
992949
}
993950

994-
// Handle init events (buffered like stream events during replay)
995-
if (isInitStart(data) || isInitOutput(data) || isInitEnd(data)) {
996-
aggregator.handleMessage(data);
997-
this.states.bump(workspaceId);
951+
// Try buffered event handlers (single source of truth)
952+
if ("type" in data && data.type in this.bufferedEventHandlers) {
953+
this.bufferedEventHandlers[data.type](workspaceId, aggregator, data);
998954
return;
999955
}
1000956

0 commit comments

Comments
 (0)