diff --git a/bun.lock b/bun.lock index 5ae86faa6..da15ce93c 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "mux", diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 51a395500..d08f1258f 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -185,7 +185,12 @@ export const workspace = { interruptStream: { input: z.object({ workspaceId: z.string(), - options: z.object({ abandonPartial: z.boolean().optional() }).optional(), + options: z + .object({ + soft: z.boolean().optional(), + abandonPartial: z.boolean().optional(), + }) + .optional(), }), output: ResultSchema(z.void(), z.string()), }, diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index d66a5ca9a..6c1c9a187 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -293,7 +293,7 @@ export class AgentSession { if (this.aiService.isStreaming(this.workspaceId)) { // MUST use abandonPartial=true to prevent handleAbort from performing partial compaction // with mismatched history (since we're about to truncate it) - const stopResult = await this.interruptStream(/* abandonPartial */ true); + const stopResult = await this.interruptStream({ abandonPartial: true }); if (!stopResult.success) { return Err(createUnknownSendMessageError(stopResult.error)); } @@ -405,24 +405,27 @@ export class AgentSession { return this.streamWithHistory(model, options); } - async interruptStream(abandonPartial?: boolean): Promise> { + async interruptStream(options?: { + soft?: boolean; + abandonPartial?: boolean; + }): Promise> { this.assertNotDisposed("interruptStream"); if (!this.aiService.isStreaming(this.workspaceId)) { return Ok(undefined); } - // Delete partial BEFORE stopping to prevent abort handler from committing it - // The abort handler in aiService.ts runs immediately when stopStream is called, - // so we must delete first to ensure it finds no partial to commit - if (abandonPartial) { + // For hard interrupts, delete partial BEFORE stopping to prevent abort handler + // from committing it. For soft interrupts, defer to stream-abort handler since + // the stream continues running and would recreate the partial. + if (options?.abandonPartial && !options?.soft) { const deleteResult = await this.partialService.deletePartial(this.workspaceId); if (!deleteResult.success) { return Err(deleteResult.error); } } - const stopResult = await this.aiService.stopStream(this.workspaceId, abandonPartial); + const stopResult = await this.aiService.stopStream(this.workspaceId, options); if (!stopResult.success) { return Err(stopResult.error); } diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index cbd0bab78..309c6a7e3 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -197,17 +197,20 @@ export class AIService extends EventEmitter { this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data)); this.streamManager.on("stream-end", (data) => this.emit("stream-end", data)); - // Handle stream-abort: commit partial to history before forwarding - // Note: If abandonPartial option was used, partial is already deleted by IPC handler + // Handle stream-abort: dispose of partial based on abandonPartial flag this.streamManager.on("stream-abort", (data: StreamAbortEvent) => { void (async () => { - // Check if partial still exists (not abandoned) - const partial = await this.partialService.readPartial(data.workspaceId); - if (partial) { + if (data.abandonPartial) { + // Caller requested discarding partial - delete without committing + await this.partialService.deletePartial(data.workspaceId); + } else { // Commit interrupted message to history with partial:true metadata // This ensures /clear and /truncate can clean up interrupted messages - await this.partialService.commitToHistory(data.workspaceId); - await this.partialService.deletePartial(data.workspaceId); + const partial = await this.partialService.readPartial(data.workspaceId); + if (partial) { + await this.partialService.commitToHistory(data.workspaceId); + await this.partialService.deletePartial(data.workspaceId); + } } // Forward abort event to consumers @@ -1084,12 +1087,15 @@ export class AIService extends EventEmitter { } } - async stopStream(workspaceId: string, abandonPartial?: boolean): Promise> { + async stopStream( + workspaceId: string, + options?: { soft?: boolean; abandonPartial?: boolean } + ): Promise> { if (this.mockModeEnabled && this.mockScenarioPlayer) { this.mockScenarioPlayer.stop(workspaceId); return Ok(undefined); } - return this.streamManager.stopStream(workspaceId, abandonPartial); + return this.streamManager.stopStream(workspaceId, options); } /** diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 4e665407a..7d0294e0a 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -112,6 +112,8 @@ interface WorkspaceStreamInfo { partialWritePromise?: Promise; // Track background processing promise for guaranteed cleanup processingPromise: Promise; + // Soft-interrupt state: when pending, stream will end at next block boundary + softInterrupt: { pending: false } | { pending: true; abandonPartial: boolean }; // Temporary directory for tool outputs (auto-cleaned when stream ends) runtimeTempDir: string; // Runtime for temp directory cleanup @@ -418,31 +420,40 @@ export class StreamManager extends EventEmitter { ): Promise { try { streamInfo.state = StreamState.STOPPING; - // Flush any pending partial write immediately (preserves work on interruption) await this.flushPartialWrite(workspaceId, streamInfo); streamInfo.abortController.abort(); - // CRITICAL: Wait for processing to fully complete before cleanup - // This prevents race conditions where the old stream is still running - // while a new stream starts (e.g., old stream writing to partial.json) - await streamInfo.processingPromise; + await this.cleanupStream(workspaceId, streamInfo, abandonPartial); + } catch (error) { + console.error("Error during stream cancellation:", error); + // Force cleanup even if cancellation fails + this.workspaceStreams.delete(workspaceId); + } + } - // Get usage and duration metadata (usage may be undefined if aborted early) - const { usage, duration } = await this.getStreamMetadata(streamInfo); + // Checks if a soft interrupt is necessary, and performs one if so + // Similar to cancelStreamSafely but performs cleanup without blocking + private async checkSoftCancelStream( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo + ): Promise { + if (!streamInfo.softInterrupt.pending) return; + try { + streamInfo.state = StreamState.STOPPING; - // Emit abort event with usage if available - this.emit("stream-abort", { - type: "stream-abort", - workspaceId: workspaceId as string, - messageId: streamInfo.messageId, - metadata: { usage, duration }, - abandonPartial, - }); + // Flush any pending partial write immediately (preserves work on interruption) + await this.flushPartialWrite(workspaceId, streamInfo); - // Clean up immediately - this.workspaceStreams.delete(workspaceId); + streamInfo.abortController.abort(); + + // Return back to the stream loop so we can wait for it to finish before + // sending the stream abort event. + const abandonPartial = streamInfo.softInterrupt.pending + ? streamInfo.softInterrupt.abandonPartial + : false; + void this.cleanupStream(workspaceId, streamInfo, abandonPartial); } catch (error) { console.error("Error during stream cancellation:", error); // Force cleanup even if cancellation fails @@ -450,6 +461,32 @@ export class StreamManager extends EventEmitter { } } + private async cleanupStream( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo, + abandonPartial?: boolean + ): Promise { + // CRITICAL: Wait for processing to fully complete before cleanup + // This prevents race conditions where the old stream is still running + // while a new stream starts (e.g., old stream writing to partial.json) + await streamInfo.processingPromise; + + // Get usage and duration metadata (usage may be undefined if aborted early) + const { usage, duration } = await this.getStreamMetadata(streamInfo); + + // Emit abort event with usage if available + this.emit("stream-abort", { + type: "stream-abort", + workspaceId: workspaceId as string, + messageId: streamInfo.messageId, + metadata: { usage, duration }, + abandonPartial, + }); + + // Clean up immediately + this.workspaceStreams.delete(workspaceId); + } + /** * Atomically creates a new stream with all necessary setup */ @@ -555,6 +592,7 @@ export class StreamManager extends EventEmitter { lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write partialWritePromise: undefined, // No write in flight initially processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream + softInterrupt: { pending: false }, runtimeTempDir, // Stream-scoped temp directory for tool outputs runtime, // Runtime for temp directory cleanup }; @@ -718,6 +756,7 @@ export class StreamManager extends EventEmitter { workspaceId: workspaceId as string, messageId: streamInfo.messageId, }); + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } @@ -772,6 +811,7 @@ export class StreamManager extends EventEmitter { strippedOutput ); } + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } @@ -808,6 +848,7 @@ export class StreamManager extends EventEmitter { toolErrorPart.toolName, errorOutput ); + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } @@ -852,6 +893,7 @@ export class StreamManager extends EventEmitter { case "start": case "start-step": case "text-start": + case "finish": // These events can be logged or handled if needed break; @@ -869,13 +911,14 @@ export class StreamManager extends EventEmitter { usage: finishStepPart.usage, }; this.emit("usage-delta", usageEvent); + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } - case "finish": - // No usage-delta here - totalUsage sums all steps, not current context. - // Last finish-step already has correct context window usage. + case "text-end": { + await this.checkSoftCancelStream(workspaceId, streamInfo); break; + } } } @@ -1363,14 +1406,32 @@ export class StreamManager extends EventEmitter { /** * Stops an active stream for a workspace + * First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..." + * Second call: Hard aborts the stream immediately */ - async stopStream(workspaceId: string, abandonPartial?: boolean): Promise> { + async stopStream( + workspaceId: string, + options?: { soft?: boolean; abandonPartial?: boolean } + ): Promise> { const typedWorkspaceId = workspaceId as WorkspaceId; try { const streamInfo = this.workspaceStreams.get(typedWorkspaceId); - if (streamInfo) { - await this.cancelStreamSafely(typedWorkspaceId, streamInfo, abandonPartial); + if (!streamInfo) { + return Ok(undefined); // No active stream + } + + const soft = options?.soft ?? false; + + if (soft) { + // Soft interrupt: set flag, will cancel at next block boundary + streamInfo.softInterrupt = { + pending: true, + abandonPartial: options?.abandonPartial ?? false, + }; + } else { + // Hard interrupt: cancel immediately + await this.cancelStreamSafely(typedWorkspaceId, streamInfo, options?.abandonPartial); } return Ok(undefined); } catch (error) { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 6839855cc..24c8c7bf3 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -875,17 +875,19 @@ export class WorkspaceService extends EventEmitter { async interruptStream( workspaceId: string, - options?: { abandonPartial?: boolean } + options?: { soft?: boolean; abandonPartial?: boolean } ): Promise> { try { const session = this.getOrCreateSession(workspaceId); - const stopResult = await session.interruptStream(options?.abandonPartial); + const stopResult = await session.interruptStream(options); if (!stopResult.success) { log.error("Failed to stop stream:", stopResult.error); return Err(stopResult.error); } - if (options?.abandonPartial) { + // For hard interrupts, delete partial immediately. For soft interrupts, + // defer to stream-abort handler (stream is still running and may recreate partial). + if (options?.abandonPartial && !options?.soft) { log.debug("Abandoning partial for workspace:", workspaceId); await this.partialService.deletePartial(workspaceId); }