Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,11 @@ export class StreamManager extends EventEmitter {
}

/**
* Complete a tool call by updating its part and emitting tool-call-end event
* Complete a tool call by updating its part and emitting tool-call-end event.
* CRITICAL: Flushes partial to disk BEFORE emitting event to prevent race conditions
* where listeners (e.g., sendQueuedMessages) read stale partial data.
*/
private completeToolCall(
private async completeToolCall(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo,
toolCalls: Map<
Expand All @@ -578,7 +580,7 @@ export class StreamManager extends EventEmitter {
toolCallId: string,
toolName: string,
output: unknown
): void {
): Promise<void> {
// Find and update the existing tool part
const existingPartIndex = streamInfo.parts.findIndex(
(p) => p.type === "dynamic-tool" && p.toolCallId === toolCallId
Expand Down Expand Up @@ -609,7 +611,13 @@ export class StreamManager extends EventEmitter {
}
}

// Emit tool-call-end event
// CRITICAL: Flush partial to disk BEFORE emitting event
// This ensures listeners (like sendQueuedMessages) see the tool result when they
// read partial.json via commitToHistory. Without this await, there's a race condition
// where the partial is read before the tool result is written, causing "amnesia".
await this.flushPartialWrite(workspaceId, streamInfo);

// Emit tool-call-end event (listeners can now safely read partial)
this.emit("tool-call-end", {
type: "tool-call-end",
workspaceId: workspaceId as string,
Expand All @@ -618,9 +626,6 @@ export class StreamManager extends EventEmitter {
toolName,
result: output,
} as ToolCallEndEvent);

// Schedule partial write
void this.schedulePartialWrite(workspaceId, streamInfo);
}

/**
Expand Down Expand Up @@ -762,8 +767,8 @@ export class StreamManager extends EventEmitter {
const strippedOutput = stripEncryptedContent(part.output);
toolCall.output = strippedOutput;

// Use shared completion logic
this.completeToolCall(
// Use shared completion logic (await to ensure partial is flushed before event)
await this.completeToolCall(
workspaceId,
streamInfo,
toolCalls,
Expand Down Expand Up @@ -799,8 +804,8 @@ export class StreamManager extends EventEmitter {
: JSON.stringify(toolErrorPart.error),
};

// Use shared completion logic
this.completeToolCall(
// Use shared completion logic (await to ensure partial is flushed before event)
await this.completeToolCall(
workspaceId,
streamInfo,
toolCalls,
Expand Down