diff --git a/apps/code/src/renderer/features/sessions/service/service.recovery.integration.test.ts b/apps/code/src/renderer/features/sessions/service/service.recovery.integration.test.ts new file mode 100644 index 000000000..617ad07f7 --- /dev/null +++ b/apps/code/src/renderer/features/sessions/service/service.recovery.integration.test.ts @@ -0,0 +1,925 @@ +/** + * End-to-end recovery test against the REAL Zustand session store. + * + * This test exercises the actual store so the + * `updateSession -> getSessionByTaskId -> drain` chain is real — the precise + * interaction the unit tests stub out. It deterministically reproduces a + * resumed cloud run that goes idle, an SSE transport drop flips the session to `disconnected`, a + * user message is queued, and nothing ever drains it. + * + * Only the tRPC network boundary is faked, that boundary is the thing we simulate dropping. + */ +import type { ContentBlock } from "@agentclientprotocol/sdk"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const mockTrpcAgent = vi.hoisted(() => ({ + start: { mutate: vi.fn() }, + reconnect: { mutate: vi.fn() }, + cancel: { mutate: vi.fn() }, + prompt: { mutate: vi.fn() }, + cancelPrompt: { mutate: vi.fn() }, + setConfigOption: { mutate: vi.fn() }, + respondToPermission: { mutate: vi.fn() }, + cancelPermission: { mutate: vi.fn() }, + onSessionEvent: { subscribe: vi.fn() }, + onPermissionRequest: { subscribe: vi.fn() }, + onSessionIdleKilled: { subscribe: vi.fn(() => ({ unsubscribe: vi.fn() })) }, + resetAll: { mutate: vi.fn().mockResolvedValue(undefined) }, + getPreviewConfigOptions: { query: vi.fn().mockResolvedValue([]) }, +})); + +const mockTrpcWorkspace = vi.hoisted(() => ({ + verify: { query: vi.fn() }, +})); + +const mockTrpcLogs = vi.hoisted(() => ({ + fetchS3Logs: { query: vi.fn() }, + readLocalLogs: { query: vi.fn() }, + writeLocalLogs: { mutate: vi.fn() }, +})); + +const mockTrpcCloudTask = vi.hoisted(() => ({ + sendCommand: { mutate: vi.fn() }, + watch: { mutate: vi.fn().mockResolvedValue(undefined) }, + retry: { mutate: vi.fn().mockResolvedValue(undefined) }, + unwatch: { mutate: vi.fn().mockResolvedValue(undefined) }, + onUpdate: { subscribe: vi.fn() }, +})); + +const mockTrpcFs = vi.hoisted(() => ({ + readFileAsBase64: { query: vi.fn() }, +})); + +const mockTrpcHandoff = vi.hoisted(() => ({ + preflightToCloud: { query: vi.fn() }, + executeToCloud: { mutate: vi.fn() }, +})); + +const mockTrpcOs = vi.hoisted(() => ({ + openExternal: { mutate: vi.fn() }, +})); + +vi.mock("@renderer/trpc/client", () => ({ + trpcClient: { + agent: mockTrpcAgent, + workspace: mockTrpcWorkspace, + logs: mockTrpcLogs, + cloudTask: mockTrpcCloudTask, + fs: mockTrpcFs, + handoff: mockTrpcHandoff, + os: mockTrpcOs, + }, +})); + +const mockAuthenticatedClient = vi.hoisted(() => ({ + createTaskRun: vi.fn(), + appendTaskRunLog: vi.fn(), + getTaskRun: vi.fn(), + getTask: vi.fn(), + runTaskInCloud: vi.fn(), + prepareTaskRunArtifactUploads: vi.fn(), + finalizeTaskRunArtifactUploads: vi.fn(), + prepareTaskStagedArtifactUploads: vi.fn(), + finalizeTaskStagedArtifactUploads: vi.fn(), + startGithubUserIntegrationConnect: vi.fn(), +})); + +type MockAuthenticatedClient = typeof mockAuthenticatedClient; + +const mockBuildAuthenticatedClient = vi.hoisted(() => + vi.fn<() => MockAuthenticatedClient | null>(() => mockAuthenticatedClient), +); + +const mockAuth = vi.hoisted(() => ({ + fetchAuthState: vi.fn<() => Promise>>(async () => ({ + status: "authenticated", + bootstrapComplete: true, + cloudRegion: "us", + projectId: 123, + availableProjectIds: [123], + availableOrgIds: [], + hasCodeAccess: true, + needsScopeReauth: false, + })), + getAuthenticatedClient: vi.fn<() => Promise | null>>( + async () => mockBuildAuthenticatedClient(), + ), + createAuthenticatedClient: vi.fn((authState: Record) => { + return authState.status === "authenticated" + ? mockBuildAuthenticatedClient() + : null; + }), +})); + +vi.mock("@features/auth/hooks/authQueries", () => ({ + AUTH_SCOPED_QUERY_META: { authScoped: true }, + clearAuthScopedQueries: vi.fn(), + getAuthIdentity: vi.fn(), + fetchAuthState: mockAuth.fetchAuthState, +})); +vi.mock("@features/auth/hooks/authClient", () => ({ + getAuthenticatedClient: mockAuth.getAuthenticatedClient, + createAuthenticatedClient: mockAuth.createAuthenticatedClient, +})); + +vi.mock("@features/sessions/stores/modelsStore", () => ({ + useModelsStore: { + getState: () => ({ + getEffectiveModel: () => "claude-3-opus", + }), + }, +})); + +const mockSessionConfigStore = vi.hoisted(() => ({ + getPersistedConfigOptions: vi.fn(() => undefined), + setPersistedConfigOptions: vi.fn(), + removePersistedConfigOptions: vi.fn(), + updatePersistedConfigOptionValue: vi.fn(), +})); + +vi.mock( + "@features/sessions/stores/sessionConfigStore", + () => mockSessionConfigStore, +); + +const mockAdapterFns = vi.hoisted(() => ({ + setAdapter: vi.fn(), + getAdapter: vi.fn(), + removeAdapter: vi.fn(), +})); + +const mockSessionAdapterStore = vi.hoisted(() => ({ + useSessionAdapterStore: { + getState: vi.fn(() => ({ + adaptersByRunId: {}, + ...mockAdapterFns, + })), + }, +})); + +vi.mock( + "@features/sessions/stores/sessionAdapterStore", + () => mockSessionAdapterStore, +); + +const mockGetIsOnline = vi.hoisted(() => vi.fn(() => true)); + +vi.mock("@renderer/stores/connectivityStore", () => ({ + getIsOnline: () => mockGetIsOnline(), +})); + +const mockSettingsState = vi.hoisted(() => ({ + customInstructions: "", +})); + +vi.mock("@features/settings/stores/settingsStore", () => ({ + useSettingsStore: { + getState: () => mockSettingsState, + }, +})); + +vi.mock("@features/sidebar/hooks/useTaskViewed", () => ({ + taskViewedApi: { + markActivity: vi.fn(), + markAsViewed: vi.fn(), + }, +})); + +vi.mock("@utils/analytics", () => ({ + track: vi.fn(), + buildPermissionToolMetadata: vi.fn(() => ({})), +})); +vi.mock("@utils/logger", () => ({ + logger: { + scope: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), + }, +})); +vi.mock("@utils/notifications", () => ({ + notifyPermissionRequest: vi.fn(), + notifyPromptComplete: vi.fn(), +})); +vi.mock("@renderer/utils/toast", () => ({ + toast: { error: vi.fn(), info: vi.fn() }, +})); +vi.mock("@utils/queryClient", () => ({ + queryClient: { + invalidateQueries: vi.fn(), + refetchQueries: vi.fn(), + setQueriesData: vi.fn(), + }, +})); +vi.mock("@shared/utils/urls", () => ({ + getCloudUrlFromRegion: () => "https://api.anthropic.com", +})); + +const mockConvertStoredEntriesToEvents = vi.hoisted(() => + vi.fn<(entries: unknown[]) => unknown[]>(() => []), +); + +vi.mock("@utils/session", async () => { + const actual = + await vi.importActual("@utils/session"); + return { + convertStoredEntriesToEvents: mockConvertStoredEntriesToEvents, + createUserPromptEvent: vi.fn((prompt, ts) => ({ + type: "acp_message", + ts, + message: { + jsonrpc: "2.0", + id: ts, + method: "session/prompt", + params: { prompt }, + }, + })), + createUserMessageEvent: vi.fn((message, ts) => ({ + type: "user", + ts, + message, + })), + createUserShellExecuteEvent: vi.fn(() => ({ + type: "acp_message", + ts: Date.now(), + message: {}, + })), + extractPromptText: vi.fn((p) => (typeof p === "string" ? p : "text")), + getUserShellExecutesSinceLastPrompt: vi.fn(() => []), + isFatalSessionError: actual.isFatalSessionError, + isRateLimitError: actual.isRateLimitError, + normalizePromptToBlocks: vi.fn((p) => + typeof p === "string" ? [{ type: "text", text: p }] : p, + ), + shellExecutesToContextBlocks: vi.fn(() => []), + }; +}); + +// NOTE: deliberately NOT mocking "@features/sessions/stores/sessionStore" — +// the real Zustand store is the whole point of this test. +import type { AgentSession } from "@features/sessions/stores/sessionStore"; +import { + sessionStoreSetters, + useSessionStore, +} from "@features/sessions/stores/sessionStore"; +import { getSessionService, resetSessionService } from "./service"; + +const TASK_ID = "task-299bc88e"; +const RUN_ID = "run-6f83616d"; + +type CloudUpdateOnData = (update: Record) => void; + +function latestOnData(): CloudUpdateOnData { + const calls = mockTrpcCloudTask.onUpdate.subscribe.mock.calls; + const last = calls.at(-1); + if (!last) throw new Error("watchCloudTask did not subscribe to onUpdate"); + return (last[1] as { onData: CloudUpdateOnData }).onData; +} + +function makeBaseSession(overrides: Partial): AgentSession { + return { + taskRunId: RUN_ID, + taskId: TASK_ID, + taskTitle: "Idle queued-up messages", + channel: `agent-event:${RUN_ID}`, + events: [], + startedAt: Date.now(), + status: "connecting", + isPromptPending: false, + isCompacting: false, + promptStartedAt: null, + pendingPermissions: new Map(), + pausedDurationMs: 0, + messageQueue: [], + optimisticItems: [], + isCloud: true, + cloudStatus: "in_progress", + processedLineCount: 0, + ...overrides, + }; +} + +describe("SessionService cloud queue recovery (real store, e2e)", () => { + beforeEach(() => { + vi.clearAllMocks(); + useSessionStore.setState({ sessions: {}, taskIdIndex: {} }); + mockConvertStoredEntriesToEvents.mockImplementation(() => []); + resetSessionService(); + mockSettingsState.customInstructions = ""; + mockGetIsOnline.mockReturnValue(true); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + mockAuth.fetchAuthState.mockResolvedValue({ + status: "authenticated", + bootstrapComplete: true, + cloudRegion: "us", + projectId: 123, + availableProjectIds: [123], + availableOrgIds: [], + hasCodeAccess: true, + needsScopeReauth: false, + }); + mockTrpcAgent.onSessionEvent.subscribe.mockReturnValue({ + unsubscribe: vi.fn(), + }); + mockTrpcAgent.onPermissionRequest.subscribe.mockReturnValue({ + unsubscribe: vi.fn(), + }); + mockTrpcCloudTask.onUpdate.subscribe.mockReturnValue({ + unsubscribe: vi.fn(), + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcFs.readFileAsBase64.query.mockResolvedValue(null); + mockAuthenticatedClient.prepareTaskRunArtifactUploads.mockResolvedValue([]); + mockAuthenticatedClient.finalizeTaskRunArtifactUploads.mockResolvedValue( + [], + ); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + }); + + it("recovers a stranded queue after an idle resumed run drops to disconnected", async () => { + const service = getSessionService(); + + // Subscribe (captures the onUpdate.onData channel) without letting the + // async hydrate clobber the state we control below. + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + const onData = latestOnData(); + + // Start: agent booting, not yet ready (mirrors a snapshot-resume run + // before its run_started/turn_complete reaches the renderer). + sessionStoreSetters.setSession(makeBaseSession({ status: "disconnected" })); + + // --- Phase A: the agent's resume turn completes ------------------------- + // The real _posthog/turn_complete handler must flip the session to + // "connected" AND record agentIdleForRunId for this exact run. + const turnCompleteEvent = { + type: "acp_message" as const, + ts: Date.now(), + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + onData({ + kind: "snapshot", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + newEntries: [{ notification: { method: "_posthog/turn_complete" } }], + totalEntryCount: 1, + }); + + await vi.waitFor(() => { + const s = useSessionStore.getState().sessions[RUN_ID]; + expect(s?.status).toBe("connected"); + expect(s?.agentIdleForRunId).toBe(RUN_ID); + }); + + // --- Phase B: SSE transport drop, then the user sends a message -------- + // retryCloudTaskWatch() flips the session to "disconnected" (api.py-side + // run is still alive/in_progress). The user's message gets queued + // because status !== "connected" — exactly the production deadlock. + // + // Keep the queue-gate's retry in-flight (never resolves) so the + // post-retry recovery (trigger #2) cannot pre-empt this case. This test + // isolates the status-update-driven recovery (trigger #1) below. + mockTrpcCloudTask.retry.mutate.mockReturnValueOnce( + new Promise(() => {}), + ); + sessionStoreSetters.updateSession(RUN_ID, { status: "disconnected" }); + + const sendResult = await service.sendPrompt(TASK_ID, "lol"); + expect(sendResult.stopReason).toBe("queued"); + + const afterQueue = useSessionStore.getState().sessions[RUN_ID]; + expect(afterQueue?.status).toBe("disconnected"); + expect(afterQueue?.messageQueue).toHaveLength(1); + expect(afterQueue?.messageQueue[0]?.content).toBe("lol"); + // Pre-fix: nothing below would ever drain this. It would stay "Queued" + // forever (no fresh run_started/turn_complete arrives for an idle run). + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + + // --- Phase C: watcher reconnects, refetches run state = in_progress ---- + // The status-driven recovery path must observe the agent already booted + // for THIS run, flip disconnected -> connected, and drain the queue. + onData({ + kind: "status", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: TASK_ID, + runId: RUN_ID, + method: "user_message", + params: expect.objectContaining({ content: "lol" }), + }), + ); + }); + + const recovered = useSessionStore.getState().sessions[RUN_ID]; + expect(recovered?.status).toBe("connected"); + expect(recovered?.messageQueue).toHaveLength(0); + }); + + it("drains a queue stranded on an idle disconnected run via the real retry path (no injected status update)", async () => { + const service = getSessionService(); + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + + // An idle, already-bootstrapped run that completed its turn for THIS run + // (live idle flag set) then dropped to disconnected on an SSE blip. The + // api.py-side run is still alive, so cloudStatus stays in_progress. + sessionStoreSetters.setSession( + makeBaseSession({ + status: "disconnected", + cloudStatus: "in_progress", + agentIdleForRunId: RUN_ID, + }), + ); + + // User sends a message while disconnected. sendCloudPrompt's queue gate + // enqueues it and fires retryCloudTaskWatch() (status is disconnected). + // The main-process retry of an already-bootstrapped watcher only + // reconnects SSE with start=latest and, for an idle run, delivers NO + // fresh status/snapshot — so NOTHING is injected via onData here. This + // is the exact production shape of the original deadlock. + const sendResult = await service.sendPrompt(TASK_ID, "lol"); + expect(sendResult.stopReason).toBe("queued"); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + + // No onData(...) is ever called. The queue must still drain, purely from + // the post-retry recovery inside retryCloudTaskWatch(). + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: TASK_ID, + runId: RUN_ID, + method: "user_message", + params: expect.objectContaining({ content: "lol" }), + }), + ); + }); + const drained = useSessionStore.getState().sessions[RUN_ID]; + expect(drained?.status).toBe("connected"); + expect(drained?.messageQueue).toHaveLength(0); + }); + + it("does not drain while the agent is still booting (boot race protected)", async () => { + const service = getSessionService(); + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + const onData = latestOnData(); + + // Disconnected, queued message, but the agent has NEVER booted for this + // run (no run_started/turn_complete, no agentIdleForRunId). Draining now + // would race sendInitialTaskMessage/sendResumeMessage. + const queued: ContentBlock = { type: "text", text: "lol" }; + sessionStoreSetters.setSession( + makeBaseSession({ + status: "disconnected", + messageQueue: [ + { id: "q-1", content: "lol", rawPrompt: [queued], queuedAt: 1 }, + ], + }), + ); + + onData({ + kind: "status", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect(useSessionStore.getState().sessions[RUN_ID]?.status).toBe( + "disconnected", + ); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + }); + + it("does not drain on a current-run run_started snapshot until turn_complete (initial/resume turn race)", async () => { + const service = getSessionService(); + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + const onData = latestOnData(); + + // Disconnected, queued message. The agent has NOT completed a turn for + // this run (no agentIdleForRunId, no turn_complete). + const queued: ContentBlock = { type: "text", text: "lol" }; + sessionStoreSetters.setSession( + makeBaseSession({ + status: "disconnected", + messageQueue: [ + { id: "q-1", content: "lol", rawPrompt: [queued], queuedAt: 1 }, + ], + }), + ); + + // A snapshot delivers THIS run's _posthog/run_started AND status + // in_progress. The run_started handler flips status -> "connected", and + // the same in_progress snapshot then calls the recovery helper. Status + // is now "connected" but the agent is mid-boot: the initial/resume turn + // starts right after run_started. Draining here races + // sendInitialTaskMessage/sendResumeMessage — it must NOT drain. + const runStartedEvent = { + type: "acp_message" as const, + ts: 1, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: RUN_ID, + taskId: TASK_ID, + agentVersion: "1.2.3", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([runStartedEvent]); + onData({ + kind: "snapshot", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + newEntries: [{ notification: { method: "_posthog/run_started" } }], + totalEntryCount: 1, + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + // run_started brought the session to "connected", but the queue must + // stay put: run_started alone is not idle. + expect(useSessionStore.getState().sessions[RUN_ID]?.status).toBe( + "connected", + ); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + + // The initial/resume turn finally completes -> NOW it is safe to drain. + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 2, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + const processedBeforeTurnComplete = + useSessionStore.getState().sessions[RUN_ID]?.processedLineCount ?? 0; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + onData({ + kind: "snapshot", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + newEntries: [{ notification: { method: "_posthog/turn_complete" } }], + totalEntryCount: processedBeforeTurnComplete + 1, + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: TASK_ID, + runId: RUN_ID, + method: "user_message", + params: expect.objectContaining({ content: "lol" }), + }), + ); + }); + const drained = useSessionStore.getState().sessions[RUN_ID]; + expect(drained?.status).toBe("connected"); + expect(drained?.messageQueue).toHaveLength(0); + }); + + it("does not dispatch a queued follow-up mid-turn after retryCloudTaskWatch clears isPromptPending", async () => { + const service = getSessionService(); + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + const onData = latestOnData(); + + // Agent booted and idle from a prior turn. + sessionStoreSetters.setSession( + makeBaseSession({ status: "connected", agentIdleForRunId: RUN_ID }), + ); + + // A new turn starts: the agent receives a session/prompt. The real + // handler must clear the idle marker (a turn is now in flight) — even + // though no turn_complete has arrived yet. + const promptEvent = { + type: "acp_message" as const, + ts: Date.now(), + message: { + jsonrpc: "2.0" as const, + id: 1, + method: "session/prompt", + params: { prompt: [{ type: "text", text: "do the work" }] }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([promptEvent]); + onData({ + kind: "snapshot", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + newEntries: [{ notification: { method: "session/prompt" } }], + totalEntryCount: 1, + }); + + await vi.waitFor(() => { + const s = useSessionStore.getState().sessions[RUN_ID]; + expect(s?.isPromptPending).toBe(true); + expect(s?.agentIdleForRunId).not.toBe(RUN_ID); + }); + + // SSE drops; retryCloudTaskWatch forcibly clears isPromptPending even + // though the remote turn is still running. The idle marker stays + // cleared — that is the signal recovery must trust, not isPromptPending. + await service.retryCloudTaskWatch(TASK_ID); + const afterRetry = useSessionStore.getState().sessions[RUN_ID]; + expect(afterRetry?.status).toBe("disconnected"); + expect(afterRetry?.isPromptPending).toBe(false); + expect(afterRetry?.agentIdleForRunId).not.toBe(RUN_ID); + + // User sends a follow-up while disconnected -> it queues. + const onDataAfterRetry = latestOnData(); + const sendResult = await service.sendPrompt(TASK_ID, "follow up"); + expect(sendResult.stopReason).toBe("queued"); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + + // Watcher reconnects, refetches run state = in_progress. Recovery must + // NOT fire: the agent is mid-turn (idle marker cleared, turn_complete + // not yet seen). Dispatching now is the race this guards against. + onDataAfterRetry({ + kind: "status", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect(useSessionStore.getState().sessions[RUN_ID]?.status).toBe( + "disconnected", + ); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + + // The in-flight turn finally completes -> NOW it is safe to drain. + const turnCompleteEvent = { + type: "acp_message" as const, + ts: Date.now(), + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + onDataAfterRetry({ + kind: "snapshot", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + newEntries: [ + { notification: { method: "session/prompt" } }, + { notification: { method: "_posthog/turn_complete" } }, + ], + totalEntryCount: 2, + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: TASK_ID, + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + const drained = useSessionStore.getState().sessions[RUN_ID]; + expect(drained?.status).toBe("connected"); + expect(drained?.messageQueue).toHaveLength(0); + }); + + it("clears the idle marker when sendCloudPrompt starts a turn even if the session/prompt log never arrives", async () => { + const service = getSessionService(); + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + + // Agent booted and idle from a prior turn. + sessionStoreSetters.setSession( + makeBaseSession({ status: "connected", agentIdleForRunId: RUN_ID }), + ); + + // User sends a prompt while connected -> sendCloudPrompt starts a turn. + // The cloud accepts it into the running sandbox. Crucially NO polled + // session/prompt echo is ever delivered (the SSE drops first), so the + // only thing that can clear the now-stale idle marker is the + // sendCloudPrompt turn-start update itself. + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValueOnce({ + success: true, + result: { queued: true }, + }); + await service.sendPrompt(TASK_ID, "do the work"); + + const afterSend = useSessionStore.getState().sessions[RUN_ID]; + expect(afterSend?.isPromptPending).toBe(true); + expect(afterSend?.agentIdleForRunId).not.toBe(RUN_ID); + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); + + // SSE drops; retryCloudTaskWatch forcibly clears isPromptPending even + // though the remote turn is still running. The idle marker stays + // cleared — that is the signal recovery must trust, not isPromptPending. + await service.retryCloudTaskWatch(TASK_ID); + const afterRetry = useSessionStore.getState().sessions[RUN_ID]; + expect(afterRetry?.status).toBe("disconnected"); + expect(afterRetry?.isPromptPending).toBe(false); + expect(afterRetry?.agentIdleForRunId).not.toBe(RUN_ID); + + // User queues a follow-up while disconnected. + const onDataAfterRetry = latestOnData(); + const sendResult = await service.sendPrompt(TASK_ID, "follow up"); + expect(sendResult.stopReason).toBe("queued"); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + + // Reconnect -> in_progress. Recovery must NOT fire mid-turn: the idle + // marker is cleared and no turn_complete has arrived for the in-flight + // turn. Without the sendCloudPrompt clear, a stale idle marker would + // make this drain the follow-up while the first turn is still running. + onDataAfterRetry({ + kind: "status", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); + expect(useSessionStore.getState().sessions[RUN_ID]?.status).toBe( + "disconnected", + ); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + + // The in-flight turn finally completes -> NOW it is safe to drain. + // Anchor totalEntryCount to the live processedLineCount so the delta is + // deterministically positive regardless of what the reconnect hydrate + // set it to (a fixed count can hit the no-delta dedup guard). + const turnCompleteEvent = { + type: "acp_message" as const, + ts: Date.now(), + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + const processedBeforeTurnComplete = + useSessionStore.getState().sessions[RUN_ID]?.processedLineCount ?? 0; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + onDataAfterRetry({ + kind: "snapshot", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + newEntries: [{ notification: { method: "_posthog/turn_complete" } }], + totalEntryCount: processedBeforeTurnComplete + 1, + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: TASK_ID, + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + const drained = useSessionStore.getState().sessions[RUN_ID]; + expect(drained?.status).toBe("connected"); + expect(drained?.messageQueue).toHaveLength(0); + }); + + it("does not recover from a prior run's turn_complete carried into the resumed session", async () => { + const service = getSessionService(); + service.watchCloudTask( + TASK_ID, + RUN_ID, + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run", + ); + const onData = latestOnData(); + + // resumeCloudRun copies the PREVIOUS run's history into the new run's + // session. The prior run's run_started + turn_complete must NOT make the + // new run look idle before its own resume turn completes. No live flag + // (recreated from logs); no current-run run_started in events yet. + const priorRunStarted = { + type: "acp_message" as const, + ts: 1, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { sessionId: "old", runId: "old-run", taskId: TASK_ID }, + }, + }; + const priorTurnComplete = { + type: "acp_message" as const, + ts: 2, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "old", stopReason: "end_turn" }, + }, + }; + sessionStoreSetters.setSession( + makeBaseSession({ + status: "disconnected", + events: [priorRunStarted, priorTurnComplete], + messageQueue: [{ id: "q-1", content: "follow up", queuedAt: 1 }], + }), + ); + + onData({ + kind: "status", + taskId: TASK_ID, + runId: RUN_ID, + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + // With the real store, a missing current-run boundary would let the + // prior turn_complete recover -> connected -> drain. Assert it stays put. + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect(useSessionStore.getState().sessions[RUN_ID]?.status).toBe( + "disconnected", + ); + expect( + useSessionStore.getState().sessions[RUN_ID]?.messageQueue, + ).toHaveLength(1); + }); +}); diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 1baaf9689..22da3ac81 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1163,7 +1163,7 @@ describe("SessionService", () => { }); }); - it("flushes queued cloud messages when cloudStatus flips to in_progress on a connected session", async () => { + it("flushes queued cloud messages when cloudStatus flips to in_progress on a connected, idle session", async () => { const service = getSessionService(); mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); const queuedMessage = { @@ -1171,6 +1171,9 @@ describe("SessionService", () => { content: "follow up", queuedAt: 1700000000, }; + // `agentIdleForRunId` proves a turn_complete fired for THIS run. + // Without it, a connected-but-mid-boot session would race the + // initial/resume turn — the recovery helper must not drain. const sessionWithQueue = createMockSession({ taskRunId: "run-123", taskId: "task-123", @@ -1178,6 +1181,7 @@ describe("SessionService", () => { isCloud: true, cloudStatus: "in_progress", events: [], + agentIdleForRunId: "run-123", messageQueue: [queuedMessage], }); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( @@ -1405,9 +1409,200 @@ describe("SessionService", () => { await vi.waitFor(() => { expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( "run-123", - { status: "connected" }, + { status: "connected", agentIdleForRunId: "run-123" }, + ); + }); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-123", + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + }); + + it("recovers a disconnected idle resumed run and drains the queue on an in_progress status update", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // The agent already booted/turn-completed for this exact run, then an + // SSE transport drop (or the retry it triggers) flipped the session to + // "disconnected". No fresh run_started/turn_complete will ever arrive + // for the idle run. + const disconnectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: false, + agentIdleForRunId: "run-123", + events: [], + messageQueue: [queuedMessage], + }); + const connectedSession = createMockSession({ + ...disconnectedSession, + status: "connected", + }); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": disconnectedSession, + }); + // The recovery path reads via getSessions (disconnected); the queue + // dispatcher then reads via getSessionByTaskId after status is flipped. + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + connectedSession, + ); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + { + status: "connected", + errorTitle: undefined, + errorMessage: undefined, + }, + ); + }); + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-123", + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), ); }); + }); + + it("recovers a disconnected run from a current-run run_started + turn_complete when the live flag was lost", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // No live `agentIdleForRunId` (session recreated from logs and the + // no-delta dedup guard skipped reprocessing), but THIS run's + // run_started followed by a turn_complete is still in events — a + // completed turn for the current run, so the agent is idle. + const runStartedEvent = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: "run-123", + taskId: "task-123", + agentVersion: "2.3.556", + }, + }, + }; + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + const disconnectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: false, + agentIdleForRunId: undefined, + events: [runStartedEvent, turnCompleteEvent], + messageQueue: [queuedMessage], + }); + const connectedSession = createMockSession({ + ...disconnectedSession, + status: "connected", + }); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": disconnectedSession, + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + connectedSession, + ); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); await vi.waitFor(() => { expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( @@ -1420,6 +1615,359 @@ describe("SessionService", () => { }); }); + it("does not recover a disconnected run when boot evidence is from a different run id", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // run_started belongs to a PREVIOUS run — must not be mistaken for the + // new run's boot after a resume. + const staleRunStartedEvent = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: "old-run", + taskId: "task-123", + agentVersion: "2.3.556", + }, + }, + }; + const disconnectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: false, + agentIdleForRunId: undefined, + events: [staleRunStartedEvent], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + disconnectedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": disconnectedSession, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + }); + + it("does not recover from a carried-over prior-run turn_complete", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // Resume copies the PREVIOUS run's history into the new run's + // session. The prior run's run_started + turn_complete must not make + // the new run look idle before its own resume turn completes. + const priorRunStarted = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { sessionId: "old", runId: "old-run", taskId: "task-123" }, + }, + }; + const priorTurnComplete = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "old", stopReason: "end_turn" }, + }, + }; + const disconnectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: false, + agentIdleForRunId: undefined, + events: [priorRunStarted, priorTurnComplete], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + disconnectedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": disconnectedSession, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + // The recovery branch flips status -> connected; assert it never fired + // (sendCommand alone is insufficient — the mocked store would bail the + // drain on the stale disconnected status regardless). + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ status: "connected" }), + ); + }); + + it("does not recover when the current run started but its turn has not completed", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // Prior-run history + the current run's run_started, but no + // turn_complete for the current run yet (resume turn still running). + const priorTurnComplete = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "old", stopReason: "end_turn" }, + }, + }; + const currentRunStarted = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { sessionId: "new", runId: "run-123", taskId: "task-123" }, + }, + }; + const disconnectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: false, + agentIdleForRunId: undefined, + events: [priorTurnComplete, currentRunStarted], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + disconnectedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": disconnectedSession, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ status: "connected" }), + ); + }); + + it("does not recover a disconnected run while a prompt is in flight", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const disconnectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: true, + agentIdleForRunId: "run-123", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + disconnectedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": disconnectedSession, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ status: "connected" }), + ); + }); + + it("does not recover a still-booting disconnected run with no boot evidence", async () => { + const service = getSessionService(); + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + // Fresh boot: no run_started for this run yet, no live flag. Draining + // now would race sendInitialTaskMessage/sendResumeMessage. + const bootingSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + cloudStatus: "in_progress", + isPromptPending: false, + agentIdleForRunId: undefined, + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + bootingSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": bootingSession, + }); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: { + kind: "status"; + taskId: string; + runId: string; + status: "in_progress"; + }) => void; + }; + subscribeOptions.onData({ + kind: "status", + taskId: "task-123", + runId: "run-123", + status: "in_progress", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ status: "connected" }), + ); + }); + it("clears isPromptPending from structured turn completion logs on hydration", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ @@ -1809,6 +2357,12 @@ describe("SessionService", () => { { status: "connected" }, ); }); + // run_started must NOT mark the agent idle — the resume/initial turn + // starts right after it. + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ agentIdleForRunId: "run-123" }), + ); }); it("captures agentVersion from run_started params onto the session", async () => { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 06339a021..2c6f8f8b4 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1090,6 +1090,12 @@ export class SessionService { pausedDurationMs: 0, currentPromptId: msg.id, }); + const promptSession = sessionStoreSetters.getSessions()[taskRunId]; + if (promptSession?.isCloud && promptSession.agentIdleForRunId) { + sessionStoreSetters.updateSession(taskRunId, { + agentIdleForRunId: undefined, + }); + } } if ( "id" in msg && @@ -1164,22 +1170,21 @@ export class SessionService { const session = sessionStoreSetters.getSessions()[taskRunId]; if (session?.isCloud) { // Backward compat: treat turn_complete as an implicit run_started - // for agents that predate the run_started notification. + // for agents that predate the run_started notification. The turn + // finished, so the agent is idle for this run, lets a later + // transport drop recover readiness. + const updates: Partial = {}; if (session.status !== "connected") { - sessionStoreSetters.updateSession(taskRunId, { - status: "connected", - }); + updates.status = "connected"; + } + if (session.agentIdleForRunId !== taskRunId) { + updates.agentIdleForRunId = taskRunId; + } + if (Object.keys(updates).length > 0) { + sessionStoreSetters.updateSession(taskRunId, updates); } if (session.messageQueue.length > 0) { - const taskId = session.taskId; - setTimeout(() => { - this.sendQueuedCloudMessages(taskId).catch((err) => - log.error("turn_complete-driven cloud queue flush failed", { - taskId, - error: err, - }), - ); - }, 0); + this.scheduleCloudQueueFlush(session.taskId, "turn_complete"); } } } @@ -1783,6 +1788,7 @@ export class SessionService { isPromptPending: true, promptStartedAt: Date.now(), pausedDurationMs: 0, + agentIdleForRunId: undefined, }); sessionStoreSetters.appendOptimisticItem(session.taskRunId, { type: "user_message", @@ -3164,6 +3170,15 @@ export class SessionService { }); throw error; } + + // The main-process retry of an already-bootstrapped + // watcher only reconnects SSE (`start=latest`) and emits no fresh + // status/snapshot for an idle run, so the update-driven trigger in + // `handleCloudTaskUpdate` would never fire, the queued message would + // stay stuck. Attempt the same guarded recovery here once the reconnect + // request has been accepted. No-ops unless a queue is stranded on an + // idle, provably-alive run. + this.tryRecoverIdleCloudQueue(session.taskRunId); } /** @@ -3199,6 +3214,125 @@ export class SessionService { sessionStoreSetters.updateSession(session.taskRunId, { taskTitle }); } + /** + * Drain the cloud queue, the deferral breaks out of + * the synchronous store-update frame so the dispatcher reads committed + * state; `sendQueuedCloudMessages` is reentrancy-guarded so stacked + * schedules from multiple triggers collapse to one. + */ + private scheduleCloudQueueFlush(taskId: string, reason: string): void { + setTimeout(() => { + this.sendQueuedCloudMessages(taskId).catch((err) => + log.error("cloud queue flush failed", { taskId, reason, error: err }), + ); + }, 0); + } + + /** + * True when the agent for this exact run is idle: it has completed at + * least one turn for this run and is not mid-turn. Tracked live via + * `agentIdleForRunId` (set only on `_posthog/turn_complete`), with a + * fallback that replays events for the case where a session was recreated + * from logs and the live flag was never set (no-delta dedup guard skipped + * reprocessing). + * + * Deliberately independent of `isPromptPending`: `retryCloudTaskWatch()` + * forcibly clears it on reconnect, so trusting it would let recovery + * dispatch a queued follow-up while a remote turn is still running. + */ + private isAgentIdleForRun(session: AgentSession): boolean { + if (session.agentIdleForRunId === session.taskRunId) { + return true; + } + let seenCurrentRunStart = false; + let idle = false; + for (const acpMsg of session.events) { + const msg = acpMsg.message; + if ( + "method" in msg && + isNotification(msg.method, POSTHOG_NOTIFICATIONS.RUN_STARTED) + ) { + const params = (msg as { params?: { runId?: unknown } }).params; + if (params?.runId === session.taskRunId) { + seenCurrentRunStart = true; + idle = false; + } + continue; + } + if (!seenCurrentRunStart) { + continue; + } + if (isJsonRpcRequest(msg) && msg.method === "session/prompt") { + idle = false; + continue; + } + if (isTurnCompleteEvent(acpMsg)) { + idle = true; + } + } + return idle; + } + + /** + * Guarded recovery for a queued cloud message stranded by a transport + * drop on an idle, already-bootstrapped run. + * + * `run_started` is normally the canonical "agent is ready" trigger and + * would race with `sendInitialTaskMessage` while still booting, so the + * safe default remains "drain only once status is connected". But an + * idle run stays `in_progress` on the server while emitting NO fresh + * `run_started`/`turn_complete` (those only fire on boot or a new turn). + * If an SSE transport drop or the `retryCloudTaskWatch` it triggers + * flipped the session to disconnected/error AFTER the agent already + * booted for this exact run, nothing flips it back to "connected" and + * the queued message is stranded forever. When the run is provably + * alive (`cloudStatus === "in_progress"`) and the agent provably idle + * for THIS run (`isAgentIdleForRun`), recover readiness and drain. + */ + private tryRecoverIdleCloudQueue(taskRunId: string): void { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (!session?.isCloud || session.messageQueue.length === 0) { + return; + } + if (session.cloudStatus !== "in_progress") { + return; + } + + // The agent must be provably idle for this run, the + // connected path included. `status: "connected"` alone is NOT proof of + // idleness: the `_posthog/run_started` handler flips status to + // "connected" before the initial/resume turn even starts, so a + // connected-but-not-idle session is mid-boot. Draining now would race + // with `sendInitialTaskMessage`/`sendResumeMessage` and one prompt + // would be cancelled. Only `_posthog/turn_complete` makes the agent + // idle for the run (`isAgentIdleForRun`). + if (!this.isAgentIdleForRun(session)) { + return; + } + + const recoverableAfterTransportDrop = + (session.status === "disconnected" || session.status === "error") && + !session.isPromptPending; + + if (session.status !== "connected" && !recoverableAfterTransportDrop) { + return; + } + + if (recoverableAfterTransportDrop) { + sessionStoreSetters.updateSession(taskRunId, { + status: "connected", + errorTitle: undefined, + errorMessage: undefined, + }); + log.info("Recovered cloud session readiness after transport drop", { + taskId: session.taskId, + previousStatus: session.status, + }); + } + + this.scheduleCloudQueueFlush(session.taskId, "idle-run-recovery"); + } + private handleCloudTaskUpdate( taskRunId: string, update: CloudTaskUpdatePayload, @@ -3290,30 +3424,8 @@ export class SessionService { branch: update.branch, }); - // Recovery path for missed `turn_complete` notifications. `run_started` - // is normally the canonical "agent is ready" trigger and would race with - // `sendInitialTaskMessage` — but only while `session.status` is not yet - // "connected". Once status is "connected", the agent's handshake is - // done; if the run becomes `in_progress` and we still hold queued - // messages, attempt to drain. `sendQueuedCloudMessages` itself bails - // when `isPromptPending` is true, preserving the race protection. if (update.status === "in_progress") { - const sessionAfter = sessionStoreSetters.getSessions()[taskRunId]; - if ( - sessionAfter?.isCloud && - sessionAfter.status === "connected" && - sessionAfter.messageQueue.length > 0 - ) { - const taskId = sessionAfter.taskId; - setTimeout(() => { - this.sendQueuedCloudMessages(taskId).catch((err) => - log.error("status-driven cloud queue flush failed", { - taskId, - error: err, - }), - ); - }, 0); - } + this.tryRecoverIdleCloudQueue(taskRunId); } if (isTerminalStatus(update.status)) { diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 23257d7a7..718206228 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -99,6 +99,17 @@ export interface AgentSession { * against agent capabilities (especially relevant for cloud sandboxes * where the agent version can lag behind the desktop). */ agentVersion?: string; + /** Task run id for which the agent is idle. + * Set ONLY on `_posthog/turn_complete`, cleared when a + * `session/prompt` (or `sendCloudPrompt`) starts a turn. `run_started` + * does NOT set it: the initial/resume turn begins right after that + * handshake, so treating run_started as idle would drain a queued + * follow-up into the boot/resume turn race. Drives transport-drop queue + * recovery. Deliberately tracked independently of `isPromptPending`: + * `retryCloudTaskWatch()` forcibly clears `isPromptPending` on reconnect, + * so it cannot be trusted to mean "no remote turn in flight", using it + * for recovery would dispatch a queued follow-up mid-turn. */ + agentIdleForRunId?: string; } // --- Config Option Helpers ---