Skip to content

Commit 5057c6a

Browse files
committed
🤖 refactor: populate softInterruptPending via stream-delta events
Change soft-interrupt communication from IPC request/response to stream-delta event propagation. When user presses Escape the first time, stopStream() emits an empty stream-delta with softInterruptPending=true, which the frontend aggregator receives and updates StreamingContext. Benefits: - Semantically correct: stream-delta events signal state transitions - Consistent with existing patterns: uses event flow like other updates - Simpler frontend: no manual state setting needed - Type-safe: compiler enforces optional field handling Changes: - streamManager.stopStream(): Emit stream-delta with flag on first Escape - StreamDeltaEvent: Added optional softInterruptPending field - StreamingMessageAggregator: Handle flag from stream-delta, skip empty deltas - Removed unused setSoftInterruptPending() method - Updated all canInterrupt references to use new interruptible field Generated with `mux`
1 parent eb385d7 commit 5057c6a

File tree

12 files changed

+161
-61
lines changed

12 files changed

+161
-61
lines changed

src/App.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { matchesKeybind, KEYBINDS } from "./utils/ui/keybinds";
1313
import { useResumeManager } from "./hooks/useResumeManager";
1414
import { useUnreadTracking } from "./hooks/useUnreadTracking";
1515
import { useAutoCompactContinue } from "./hooks/useAutoCompactContinue";
16-
import { useWorkspaceStoreRaw, useWorkspaceRecency } from "./stores/WorkspaceStore";
16+
import { useWorkspaceStoreRaw, useWorkspaceRecency, canInterrupt } from "./stores/WorkspaceStore";
1717
import { ChatInput } from "./components/ChatInput/index";
1818

1919
import { useStableReference, compareMaps } from "./hooks/useStableReference";
@@ -490,7 +490,7 @@ function AppInner() {
490490
const allStates = workspaceStore.getAllStates();
491491
const streamingModels = new Map<string, string>();
492492
for (const [workspaceId, state] of allStates) {
493-
if (state.canInterrupt && state.currentModel) {
493+
if (canInterrupt(state.interruptType) && state.currentModel) {
494494
streamingModels.set(workspaceId, state.currentModel);
495495
}
496496
}

src/components/AIView.tsx

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { formatKeybind, KEYBINDS } from "@/utils/ui/keybinds";
2121
import { useAutoScroll } from "@/hooks/useAutoScroll";
2222
import { usePersistedState } from "@/hooks/usePersistedState";
2323
import { useThinking } from "@/contexts/ThinkingContext";
24-
import { useWorkspaceState, useWorkspaceAggregator } from "@/stores/WorkspaceStore";
24+
import { useWorkspaceState, useWorkspaceAggregator, canInterrupt } from "@/stores/WorkspaceStore";
2525
import { WorkspaceHeader } from "./WorkspaceHeader";
2626
import { getModelName } from "@/utils/ai/models";
2727
import type { DisplayedMessage } from "@/types/message";
@@ -220,15 +220,15 @@ const AIViewInner: React.FC<AIViewProps> = ({
220220
// Track if last message was interrupted or errored (for RetryBarrier)
221221
// Uses same logic as useResumeManager for DRY
222222
const showRetryBarrier = workspaceState
223-
? !workspaceState.canInterrupt &&
223+
? !canInterrupt(workspaceState.interruptType) &&
224224
hasInterruptedStream(workspaceState.messages, workspaceState.pendingStreamStartTime)
225225
: false;
226226

227227
// Handle keyboard shortcuts (using optional refs that are safe even if not initialized)
228228
useAIViewKeybinds({
229229
workspaceId,
230230
currentModel: workspaceState?.currentModel ?? null,
231-
canInterrupt: workspaceState?.canInterrupt ?? false,
231+
canInterrupt: canInterrupt(workspaceState.interruptType),
232232
showRetryBarrier,
233233
currentWorkspaceThinking,
234234
setThinkingLevel,
@@ -278,7 +278,13 @@ const AIViewInner: React.FC<AIViewProps> = ({
278278
}
279279

280280
// Extract state from workspace state
281-
const { messages, canInterrupt, isCompacting, loading, currentModel } = workspaceState;
281+
const {
282+
messages,
283+
interruptType: interruptible,
284+
isCompacting,
285+
loading,
286+
currentModel,
287+
} = workspaceState;
282288

283289
// Get active stream message ID for token counting
284290
const activeStreamMessageId = aggregator.getActiveStreamMessageId();
@@ -290,6 +296,14 @@ const AIViewInner: React.FC<AIViewProps> = ({
290296
// Merge consecutive identical stream errors
291297
const mergedMessages = mergeConsecutiveStreamErrors(messages);
292298

299+
const model = currentModel ? getModelName(currentModel) : "";
300+
const interrupting = interruptible === "hard";
301+
302+
const prefix = interrupting ? "⏸️ Interrupting " : "";
303+
const action = interrupting ? "" : isCompacting ? "compacting..." : "streaming...";
304+
305+
const statusText = `${prefix}${model} ${action}`.trim();
306+
293307
// When editing, find the cutoff point
294308
const editCutoffHistoryId = editingMessage
295309
? mergedMessages.find(
@@ -362,8 +376,8 @@ const AIViewInner: React.FC<AIViewProps> = ({
362376
onTouchMove={markUserInteraction}
363377
onScroll={handleScroll}
364378
role="log"
365-
aria-live={canInterrupt ? "polite" : "off"}
366-
aria-busy={canInterrupt}
379+
aria-live={canInterrupt(interruptible) ? "polite" : "off"}
380+
aria-busy={canInterrupt(interruptible)}
367381
aria-label="Conversation transcript"
368382
tabIndex={0}
369383
className="h-full overflow-y-auto p-[15px] leading-[1.5] break-words whitespace-pre-wrap"
@@ -428,21 +442,13 @@ const AIViewInner: React.FC<AIViewProps> = ({
428442
</>
429443
)}
430444
<PinnedTodoList workspaceId={workspaceId} />
431-
{canInterrupt && (
445+
{canInterrupt(interruptible) && (
432446
<StreamingBarrier
433-
statusText={
434-
isCompacting
435-
? currentModel
436-
? `${getModelName(currentModel)} compacting...`
437-
: "compacting..."
438-
: currentModel
439-
? `${getModelName(currentModel)} streaming...`
440-
: "streaming..."
441-
}
447+
statusText={statusText}
442448
cancelText={
443449
isCompacting
444450
? `${formatKeybind(vimEnabled ? KEYBINDS.INTERRUPT_STREAM_VIM : KEYBINDS.INTERRUPT_STREAM_NORMAL)} cancel | ${formatKeybind(KEYBINDS.ACCEPT_EARLY_COMPACTION)} accept early`
445-
: `hit ${formatKeybind(vimEnabled ? KEYBINDS.INTERRUPT_STREAM_VIM : KEYBINDS.INTERRUPT_STREAM_NORMAL)} to cancel`
451+
: `hit ${formatKeybind(vimEnabled ? KEYBINDS.INTERRUPT_STREAM_VIM : KEYBINDS.INTERRUPT_STREAM_NORMAL)} to ${interruptible === "hard" ? "force" : ""} cancel`
446452
}
447453
tokenCount={
448454
activeStreamMessageId
@@ -499,7 +505,7 @@ const AIViewInner: React.FC<AIViewProps> = ({
499505
editingMessage={editingMessage}
500506
onCancelEdit={handleCancelEdit}
501507
onEditLastUserMessage={() => void handleEditLastUserMessage()}
502-
canInterrupt={canInterrupt}
508+
canInterrupt={canInterrupt(interruptible)}
503509
onReady={handleChatInputReady}
504510
/>
505511
</div>

src/components/AgentStatusIndicator.tsx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import React, { useCallback, useMemo } from "react";
22
import { cn } from "@/lib/utils";
33
import { TooltipWrapper, Tooltip } from "./Tooltip";
4-
import { useWorkspaceSidebarState } from "@/stores/WorkspaceStore";
4+
import { canInterrupt, useWorkspaceSidebarState } from "@/stores/WorkspaceStore";
55
import { getStatusTooltip } from "@/utils/ui/statusTooltip";
66

77
interface AgentStatusIndicatorProps {
@@ -22,10 +22,10 @@ export const AgentStatusIndicator: React.FC<AgentStatusIndicatorProps> = ({
2222
className,
2323
}) => {
2424
// Get workspace state
25-
const { canInterrupt, currentModel, agentStatus, recencyTimestamp } =
25+
const { interruptType, currentModel, agentStatus, recencyTimestamp } =
2626
useWorkspaceSidebarState(workspaceId);
2727

28-
const streaming = canInterrupt;
28+
const streaming = canInterrupt(interruptType);
2929

3030
// Compute unread status if lastReadTimestamp provided (sidebar only)
3131
const unread = useMemo(() => {

src/components/ChatInput/index.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,11 @@ export const ChatInput: React.FC<ChatInputProps> = (props) => {
315315
// Allow external components (e.g., CommandPalette, Queued message edits) to insert text
316316
useEffect(() => {
317317
const handler = (e: Event) => {
318-
const customEvent = e as CustomEvent<{ text: string; mode?: "append" | "replace"; imageParts?: ImagePart[] }>;
318+
const customEvent = e as CustomEvent<{
319+
text: string;
320+
mode?: "append" | "replace";
321+
imageParts?: ImagePart[];
322+
}>;
319323

320324
const { text, mode = "append", imageParts } = customEvent.detail;
321325

src/components/Messages/MessageRenderer.tsx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { ReasoningMessage } from "./ReasoningMessage";
77
import { StreamErrorMessage } from "./StreamErrorMessage";
88
import { HistoryHiddenMessage } from "./HistoryHiddenMessage";
99
import { InitMessage } from "./InitMessage";
10-
// Note: QueuedMessage is NOT imported here - it's rendered directly in AIView after StreamingBarrier
1110

1211
interface MessageRendererProps {
1312
message: DisplayedMessage;

src/hooks/useResumeManager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { useEffect, useRef } from "react";
2-
import { useWorkspaceStoreRaw, type WorkspaceState } from "@/stores/WorkspaceStore";
2+
import { canInterrupt, useWorkspaceStoreRaw, type WorkspaceState } from "@/stores/WorkspaceStore";
33
import { CUSTOM_EVENTS, type CustomEventType } from "@/constants/events";
44
import { getAutoRetryKey, getRetryStateKey } from "@/constants/storage";
55
import { getSendOptionsFromStorage } from "@/utils/messages/sendOptions";
@@ -97,7 +97,7 @@ export function useResumeManager() {
9797
}
9898

9999
// 1. Must have interrupted stream that's eligible for auto-retry (not currently streaming)
100-
if (state.canInterrupt) return false; // Currently streaming
100+
if (canInterrupt(state.interruptType)) return false; // Currently streaming
101101

102102
if (!isEligibleForAutoRetry(state.messages, state.pendingStreamStartTime)) {
103103
return false;

src/services/streamManager.ts

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ interface WorkspaceStreamInfo {
106106
partialWritePromise?: Promise<void>;
107107
// Track background processing promise for guaranteed cleanup
108108
processingPromise: Promise<void>;
109+
// Flag for soft-interrupt: when true, stream will end at next block boundary
110+
softInterruptPending: boolean;
109111
// Temporary directory for tool outputs (auto-cleaned when stream ends)
110112
runtimeTempDir: string;
111113
// Runtime for temp directory cleanup
@@ -414,31 +416,63 @@ export class StreamManager extends EventEmitter {
414416

415417
streamInfo.abortController.abort();
416418

417-
// CRITICAL: Wait for processing to fully complete before cleanup
418-
// This prevents race conditions where the old stream is still running
419-
// while a new stream starts (e.g., old stream writing to partial.json)
420-
await streamInfo.processingPromise;
419+
await this.cleanupStream(workspaceId, streamInfo);
420+
} catch (error) {
421+
console.error("Error during stream cancellation:", error);
422+
// Force cleanup even if cancellation fails
423+
this.workspaceStreams.delete(workspaceId);
424+
}
425+
}
426+
427+
// Checks if a soft interrupt is necessary, and performs one if so
428+
// Similar to cancelStreamSafely but performs cleanup without blocking
429+
private async checkSoftCancelStream(
430+
workspaceId: WorkspaceId,
431+
streamInfo: WorkspaceStreamInfo
432+
): Promise<void> {
433+
if (!streamInfo.softInterruptPending) return;
434+
try {
435+
streamInfo.state = StreamState.STOPPING;
421436

422-
// Get usage and duration metadata (usage may be undefined if aborted early)
423-
const { usage, duration } = await this.getStreamMetadata(streamInfo);
437+
// Flush any pending partial write immediately (preserves work on interruption)
438+
await this.flushPartialWrite(workspaceId, streamInfo);
424439

425-
// Emit abort event with usage if available
426-
this.emit("stream-abort", {
427-
type: "stream-abort",
428-
workspaceId: workspaceId as string,
429-
messageId: streamInfo.messageId,
430-
metadata: { usage, duration },
431-
});
440+
streamInfo.abortController.abort();
432441

433-
// Clean up immediately
434-
this.workspaceStreams.delete(workspaceId);
442+
// Return back to the stream loop so we can wait for it to finish before
443+
// sending the stream abort event.
444+
void this.cleanupStream(workspaceId, streamInfo);
435445
} catch (error) {
436446
console.error("Error during stream cancellation:", error);
437447
// Force cleanup even if cancellation fails
438448
this.workspaceStreams.delete(workspaceId);
439449
}
440450
}
441451

452+
private async cleanupStream(
453+
workspaceId: WorkspaceId,
454+
streamInfo: WorkspaceStreamInfo
455+
): Promise<void> {
456+
// CRITICAL: Wait for processing to fully complete before cleanup
457+
// This prevents race conditions where the old stream is still running
458+
// while a new stream starts (e.g., old stream writing to partial.json)
459+
await streamInfo.processingPromise;
460+
461+
// Get usage and duration metadata (usage may be undefined if aborted early)
462+
const { usage, duration } = await this.getStreamMetadata(streamInfo);
463+
464+
// Emit abort event with usage if available
465+
this.emit("stream-abort", {
466+
type: "stream-abort",
467+
workspaceId: workspaceId as string,
468+
messageId: streamInfo.messageId,
469+
metadata: { usage, duration },
470+
});
471+
472+
// Clean up immediately
473+
this.workspaceStreams.delete(workspaceId);
474+
}
475+
442476
/**
443477
* Atomically creates a new stream with all necessary setup
444478
*/
@@ -525,6 +559,7 @@ export class StreamManager extends EventEmitter {
525559
lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write
526560
partialWritePromise: undefined, // No write in flight initially
527561
processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream
562+
softInterruptPending: false, // Initialize to false
528563
runtimeTempDir, // Stream-scoped temp directory for tool outputs
529564
runtime, // Runtime for temp directory cleanup
530565
};
@@ -688,6 +723,7 @@ export class StreamManager extends EventEmitter {
688723
workspaceId: workspaceId as string,
689724
messageId: streamInfo.messageId,
690725
});
726+
await this.checkSoftCancelStream(workspaceId, streamInfo);
691727
break;
692728
}
693729

@@ -742,6 +778,7 @@ export class StreamManager extends EventEmitter {
742778
strippedOutput
743779
);
744780
}
781+
await this.checkSoftCancelStream(workspaceId, streamInfo);
745782
break;
746783
}
747784

@@ -778,6 +815,7 @@ export class StreamManager extends EventEmitter {
778815
toolErrorPart.toolName,
779816
errorOutput
780817
);
818+
await this.checkSoftCancelStream(workspaceId, streamInfo);
781819
break;
782820
}
783821

@@ -823,9 +861,14 @@ export class StreamManager extends EventEmitter {
823861
case "start-step":
824862
case "text-start":
825863
case "finish":
826-
case "finish-step":
827864
// These events can be logged or handled if needed
828865
break;
866+
867+
case "finish-step":
868+
case "text-end":
869+
case "tool-input-end":
870+
await this.checkSoftCancelStream(workspaceId, streamInfo);
871+
break;
829872
}
830873
}
831874

@@ -1196,14 +1239,32 @@ export class StreamManager extends EventEmitter {
11961239

11971240
/**
11981241
* Stops an active stream for a workspace
1242+
* First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..."
1243+
* Second call: Hard aborts the stream immediately
11991244
*/
12001245
async stopStream(workspaceId: string): Promise<Result<void>> {
12011246
const typedWorkspaceId = workspaceId as WorkspaceId;
12021247

12031248
try {
12041249
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);
1205-
if (streamInfo) {
1250+
if (!streamInfo) {
1251+
return Ok(undefined); // No active stream
1252+
}
1253+
1254+
if (streamInfo.softInterruptPending) {
12061255
await this.cancelStreamSafely(typedWorkspaceId, streamInfo);
1256+
} else {
1257+
// First Escape: Soft interrupt - emit delta to notify frontend
1258+
streamInfo.softInterruptPending = true;
1259+
this.emit("stream-delta", {
1260+
type: "stream-delta",
1261+
workspaceId: workspaceId,
1262+
messageId: streamInfo.messageId,
1263+
delta: "",
1264+
tokens: 0,
1265+
timestamp: Date.now(),
1266+
softInterruptPending: true, // Signal to frontend
1267+
});
12071268
}
12081269
return Ok(undefined);
12091270
} catch (error) {

src/stores/WorkspaceStore.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ describe("WorkspaceStore", () => {
273273
// Object.is() comparison and skip re-renders for primitive values.
274274
// TODO: Optimize aggregator caching in Phase 2
275275
expect(state1).toEqual(state2);
276-
expect(state1.canInterrupt).toBe(state2.canInterrupt);
276+
expect(state1.interruptType).toBe(state2.interruptType);
277277
expect(state1.loading).toBe(state2.loading);
278278
});
279279
});
@@ -428,7 +428,7 @@ describe("WorkspaceStore", () => {
428428

429429
const state2 = store.getWorkspaceState("test-workspace");
430430
expect(state1).not.toBe(state2); // Cache should be invalidated
431-
expect(state2.canInterrupt).toBe(true); // Stream started, so can interrupt
431+
expect(state2.interruptType).toBeTruthy(); // Stream started, so can interrupt
432432
});
433433

434434
it("invalidates getAllStates() cache when workspace changes", async () => {

0 commit comments

Comments
 (0)