Skip to content

Commit cd26335

Browse files
committed
fix: serialize mock scenario event processing
Mock events were dispatched concurrently via setTimeout, causing race conditions when modifying shared state. Added event queue to ActiveStream that processes handlers sequentially while preserving delay timing. Clear queue on cleanup to prevent orphaned events.
1 parent f997ff8 commit cd26335

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

src/services/mock/mockScenarioPlayer.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ interface MockPlayerDeps {
112112
interface ActiveStream {
113113
timers: NodeJS.Timeout[];
114114
messageId: string;
115+
eventQueue: Array<() => Promise<void>>;
116+
isProcessing: boolean;
115117
}
116118

117119
export class MockScenarioPlayer {
@@ -216,16 +218,48 @@ export class MockScenarioPlayer {
216218
this.activeStreams.set(workspaceId, {
217219
timers,
218220
messageId: turn.assistant.messageId,
221+
eventQueue: [],
222+
isProcessing: false,
219223
});
220224

221225
for (const event of turn.assistant.events) {
222226
const timer = setTimeout(() => {
223-
void this.dispatchEvent(workspaceId, event, turn.assistant.messageId, historySequence);
227+
this.enqueueEvent(workspaceId, () =>
228+
this.dispatchEvent(workspaceId, event, turn.assistant.messageId, historySequence)
229+
);
224230
}, event.delay);
225231
timers.push(timer);
226232
}
227233
}
228234

235+
private enqueueEvent(workspaceId: string, handler: () => Promise<void>): void {
236+
const active = this.activeStreams.get(workspaceId);
237+
if (!active) return;
238+
239+
active.eventQueue.push(handler);
240+
void this.processQueue(workspaceId);
241+
}
242+
243+
private async processQueue(workspaceId: string): Promise<void> {
244+
const active = this.activeStreams.get(workspaceId);
245+
if (!active || active.isProcessing) return;
246+
247+
active.isProcessing = true;
248+
249+
while (active.eventQueue.length > 0) {
250+
const handler = active.eventQueue.shift();
251+
if (!handler) break;
252+
253+
try {
254+
await handler();
255+
} catch (error) {
256+
console.error(`[MockScenarioPlayer] Event handler error for ${workspaceId}:`, error);
257+
}
258+
}
259+
260+
active.isProcessing = false;
261+
}
262+
229263
private async dispatchEvent(
230264
workspaceId: string,
231265
event: MockAssistantEvent,
@@ -368,9 +402,15 @@ export class MockScenarioPlayer {
368402
private cleanup(workspaceId: string): void {
369403
const active = this.activeStreams.get(workspaceId);
370404
if (!active) return;
405+
406+
// Clear all pending timers
371407
for (const timer of active.timers) {
372408
clearTimeout(timer);
373409
}
410+
411+
// Clear event queue to prevent any pending events from processing
412+
active.eventQueue = [];
413+
374414
this.activeStreams.delete(workspaceId);
375415
}
376416

0 commit comments

Comments
 (0)