diff --git a/bun.lock b/bun.lock index a4746ad5cbb..2871e4d6e93 100644 --- a/bun.lock +++ b/bun.lock @@ -1903,7 +1903,7 @@ "@solidjs/router": ["@solidjs/router@0.15.4", "", { "peerDependencies": { "solid-js": "^1.8.6" } }, "sha512-WOpgg9a9T638cR+5FGbFi/IV4l2FpmBs1GpIMSPa0Ce9vyJN7Wts+X2PqMf9IYn0zUj2MlSJtm1gp7/HI/n5TQ=="], - "@solidjs/start": ["@solidjs/start@https://pkg.pr.new/@solidjs/start@dfb2020", { "dependencies": { "@babel/core": "^7.28.3", "@babel/traverse": "^7.28.3", "@babel/types": "^7.28.5", "@solidjs/meta": "^0.29.4", "@tanstack/server-functions-plugin": "1.134.5", "@types/babel__traverse": "^7.28.0", "@types/micromatch": "^4.0.9", "cookie-es": "^2.0.0", "defu": "^6.1.4", "error-stack-parser": "^2.1.4", "es-module-lexer": "^1.7.0", "esbuild": "^0.25.3", "fast-glob": "^3.3.3", "h3": "npm:h3@2.0.1-rc.4", "html-to-image": "^1.11.13", "micromatch": "^4.0.8", "path-to-regexp": "^8.2.0", "pathe": "^2.0.3", "radix3": "^1.1.2", "seroval": "^1.3.2", "seroval-plugins": "^1.2.1", "shiki": "^1.26.1", "solid-js": "^1.9.9", "source-map-js": "^1.2.1", "srvx": "^0.9.1", "terracotta": "^1.0.6", "vite": "7.1.10", "vite-plugin-solid": "^2.11.9", "vitest": "^4.0.10" } }, "sha512-7JjjA49VGNOsMRI8QRUhVudZmv0CnJ18SliSgK1ojszs/c3ijftgVkzvXdkSLN4miDTzbkXewf65D6ZBo6W+GQ=="], + "@solidjs/start": ["@solidjs/start@https://pkg.pr.new/@solidjs/start@dfb2020", { "dependencies": { "@babel/core": "^7.28.3", "@babel/traverse": "^7.28.3", "@babel/types": "^7.28.5", "@solidjs/meta": "^0.29.4", "@tanstack/server-functions-plugin": "1.134.5", "@types/babel__traverse": "^7.28.0", "@types/micromatch": "^4.0.9", "cookie-es": "^2.0.0", "defu": "^6.1.4", "error-stack-parser": "^2.1.4", "es-module-lexer": "^1.7.0", "esbuild": "^0.25.3", "fast-glob": "^3.3.3", "h3": "npm:h3@2.0.1-rc.4", "html-to-image": "^1.11.13", "micromatch": "^4.0.8", "path-to-regexp": "^8.2.0", "pathe": "^2.0.3", "radix3": "^1.1.2", "seroval": "^1.3.2", "seroval-plugins": "^1.2.1", "shiki": "^1.26.1", "solid-js": "^1.9.9", "source-map-js": "^1.2.1", "srvx": "^0.9.1", "terracotta": "^1.0.6", "vite": "7.1.10", "vite-plugin-solid": "^2.11.9", "vitest": "^4.0.10" } }], "@speed-highlight/core": ["@speed-highlight/core@1.2.14", "", {}, "sha512-G4ewlBNhUtlLvrJTb88d2mdy2KRijzs4UhnlrOSRT4bmjh/IqNElZa3zkrZ+TC47TwtlDWzVLFADljF1Ijp5hA=="], diff --git a/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx b/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx index c13b436514f..e0913f9edc0 100644 --- a/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx @@ -1063,7 +1063,9 @@ export function Prompt(props: PromptProps) { flexDirection="row" gap={1} flexGrow={1} - justifyContent={status().type === "retry" ? "space-between" : "flex-start"} + justifyContent={ + status().type === "retry" || status().type === "reconnecting" ? "space-between" : "flex-start" + } > @@ -1128,6 +1130,41 @@ export function Prompt(props: PromptProps) { ) })()} + {(() => { + const reconnecting = createMemo(() => { + const s = status() + if (s.type !== "reconnecting") return + return s + }) + const [visible, setVisible] = createSignal(false) + let timer: ReturnType | undefined + createEffect(() => { + const r = reconnecting() + if (r) { + timer = setTimeout(() => setVisible(true), 1000) + } else { + clearTimeout(timer) + setVisible(false) + } + }) + onCleanup(() => clearTimeout(timer)) + const msg = createMemo(() => { + const r = reconnecting() + if (!r) return + if (r.message.length > 80) return r.message.slice(0, 80) + "..." + return r.message + }) + + return ( + + + + {msg()} [reconnecting attempt #{reconnecting()?.attempt}] + + + + ) + })()} 0 ? theme.primary : theme.text}> @@ -1138,7 +1175,7 @@ export function Prompt(props: PromptProps) { - + diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index bf5a0d3ce7f..22adf73904a 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -117,8 +117,29 @@ export namespace MCP { }) } + function isNetworkError(err: unknown): boolean { + if (err instanceof UnauthorizedError) return false + if (!(err instanceof Error)) return false + if ("code" in err && typeof (err as { code: unknown }).code === "number") return false + const msg = err.message.toLowerCase() + return ( + msg.includes("econnreset") || + msg.includes("econnrefused") || + msg.includes("etimedout") || + msg.includes("fetch failed") || + msg.includes("socket") || + msg.includes("network") || + msg.includes("connection") + ) + } + // Convert MCP tool definition to AI SDK Tool type - async function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Promise { + async function convertMcpTool( + mcpTool: MCPToolDef, + client: MCPClient, + timeout?: number, + reconnect?: () => Promise, + ): Promise { const inputSchema = mcpTool.inputSchema // Spread first, then override type to ensure it's always "object" @@ -133,17 +154,27 @@ export namespace MCP { description: mcpTool.description ?? "", inputSchema: jsonSchema(schema), execute: async (args: unknown) => { - return client.callTool( - { - name: mcpTool.name, - arguments: (args || {}) as Record, - }, - CallToolResultSchema, - { - resetTimeoutOnProgress: true, - timeout, - }, - ) + const call = (c: MCPClient) => + c.callTool( + { + name: mcpTool.name, + arguments: (args || {}) as Record, + }, + CallToolResultSchema, + { + resetTimeoutOnProgress: true, + timeout, + }, + ) + if (!reconnect) return call(client) + try { + return await call(client) + } catch (err) { + if (!isNetworkError(err)) throw err + const fresh = await reconnect().catch(() => undefined) + if (!fresh) throw err + return call(fresh) + } }, }) } @@ -636,10 +667,32 @@ export namespace MCP { const mcpConfig = config[clientName] const entry = isMcpConfigured(mcpConfig) ? mcpConfig : undefined const timeout = entry?.timeout ?? defaultTimeout + const reconnect: (() => Promise) | undefined = + entry && entry.type === "remote" + ? async () => { + const cur = await state() + const old = cur.clients[clientName] + if (old) { + await old.close().catch(() => {}) + delete cur.clients[clientName] + } + log.info("reconnecting remote mcp server after tool call failure", { clientName }) + const r = await create(clientName, entry).catch(() => undefined) + if (!r?.mcpClient) return undefined + cur.clients[clientName] = r.mcpClient + cur.status[clientName] = r.status + return r.mcpClient + } + : undefined for (const mcpTool of toolsResult.tools) { const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_") const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_") - result[sanitizedClientName + "_" + sanitizedToolName] = await convertMcpTool(mcpTool, client, timeout) + result[sanitizedClientName + "_" + sanitizedToolName] = await convertMcpTool( + mcpTool, + client, + timeout, + reconnect, + ) } } return result diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index 6ab45d028b9..1fe16c60699 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -1237,18 +1237,19 @@ export namespace Provider { if (existing) return existing const customFetch = options["fetch"] - const chunkTimeout = options["chunkTimeout"] + const chunkTimeoutRaw = options["chunkTimeout"] delete options["chunkTimeout"] + const chunkTimeout = typeof chunkTimeoutRaw === "number" && chunkTimeoutRaw > 0 ? chunkTimeoutRaw : 30_000 options["fetch"] = async (input: any, init?: BunFetchRequestInit) => { // Preserve custom fetch if it exists, wrap it with timeout logic const fetchFn = customFetch ?? fetch const opts = init ?? {} - const chunkAbortCtl = typeof chunkTimeout === "number" && chunkTimeout > 0 ? new AbortController() : undefined + const chunkAbortCtl = new AbortController() const signals: AbortSignal[] = [] if (opts.signal) signals.push(opts.signal) - if (chunkAbortCtl) signals.push(chunkAbortCtl.signal) + signals.push(chunkAbortCtl.signal) if (options["timeout"] !== undefined && options["timeout"] !== null && options["timeout"] !== false) signals.push(AbortSignal.timeout(options["timeout"])) @@ -1279,7 +1280,6 @@ export namespace Provider { timeout: false, }) - if (!chunkAbortCtl) return res return wrapSSE(res, chunkTimeout, chunkAbortCtl) } diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index f1335f6f21a..e461c871616 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -18,6 +18,16 @@ import type { Provider } from "@/provider/provider" import { ModelID, ProviderID } from "@/provider/schema" export namespace MessageV2 { + const NETWORK_ERROR_CODES = new Set([ + "ECONNRESET", + "ETIMEDOUT", + "ENETUNREACH", + "EHOSTUNREACH", + "ENOTFOUND", + "EPIPE", + "ECONNREFUSED", + ]) + export function isMedia(mime: string) { return mime.startsWith("image/") || mime === "application/pdf" } @@ -916,10 +926,10 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() - case (e as SystemError)?.code === "ECONNRESET": + case NETWORK_ERROR_CODES.has((e as SystemError)?.code ?? ""): return new MessageV2.APIError( { - message: "Connection reset by server", + message: "Network error", isRetryable: true, metadata: { code: (e as SystemError).code ?? "", @@ -929,6 +939,17 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() + case e instanceof Error && e.message === "SSE read timed out": + return new MessageV2.APIError( + { + message: "SSE read timed out", + isRetryable: true, + metadata: { + message: e.message, + }, + }, + { cause: e }, + ).toObject() case APICallError.isInstance(e): const parsed = ProviderError.parseAPICallError({ providerID: ctx.providerID, diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index ccb09e71ac7..fd0a02fdcb7 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -19,6 +19,7 @@ import type { SessionID, MessageID } from "./schema" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 + const MAX_NETWORK_RETRIES = 5 const log = Log.create({ service: "session.processor" }) export type Info = Awaited> @@ -34,7 +35,48 @@ export namespace SessionProcessor { let snapshot: string | undefined let blocked = false let attempt = 0 + let networkAttempt = 0 + let receivedChunk = false let needsCompaction = false + const cleanup = async () => { + const parts = await MessageV2.parts(input.assistantMessage.id) + for (const part of parts) { + if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { + await Session.removePart({ + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + partID: part.id, + }) + continue + } + if (part.type === "text") { + await Session.updatePart({ + ...part, + text: "", + time: part.time + ? { + start: part.time.start, + } + : undefined, + }) + continue + } + if (part.type === "reasoning") { + await Session.updatePart({ + ...part, + text: "", + time: { + start: part.time.start, + }, + }) + } + } + Object.keys(toolcalls).forEach((id) => { + delete toolcalls[id] + }) + input.assistantMessage.time.completed = undefined + await Session.updateMessage(input.assistantMessage) + } const result = { get message() { @@ -49,11 +91,13 @@ export namespace SessionProcessor { const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true while (true) { try { + receivedChunk = false let currentText: MessageV2.TextPart | undefined let reasoningMap: Record = {} const stream = await LLM.stream(streamInput) for await (const value of stream.fullStream) { + receivedChunk = true input.abort.throwIfAborted() switch (value.type) { case "start": @@ -366,16 +410,43 @@ export namespace SessionProcessor { } else { const retry = SessionRetry.retryable(error) if (retry !== undefined) { - attempt++ - const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) - await SessionStatus.set(input.sessionID, { - type: "retry", - attempt, - message: retry, - next: Date.now() + delay, - }) - await SessionRetry.sleep(delay, input.abort).catch(() => {}) - continue + const network = + MessageV2.APIError.isInstance(error) && + error.data.isRetryable && + (error.data.message.includes("Network error") || + error.data.message.includes("SSE read timed out") || + error.data.message.includes("Connection reset by server")) + if (network) { + networkAttempt++ + if (networkAttempt <= MAX_NETWORK_RETRIES) { + const delay = Math.min(1000 * Math.pow(2, networkAttempt - 1), 5000) + await SessionStatus.set(input.sessionID, { + type: "reconnecting", + attempt: networkAttempt, + message: retry, + }) + if (receivedChunk) { + await cleanup() + } + await SessionRetry.sleep(delay, input.abort).catch(() => {}) + continue + } + } + if (!network) { + attempt++ + const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) + await SessionStatus.set(input.sessionID, { + type: "retry", + attempt, + message: retry, + next: Date.now() + delay, + }) + if (receivedChunk) { + await cleanup() + } + await SessionRetry.sleep(delay, input.abort).catch(() => {}) + continue + } } input.assistantMessage.error = error Bus.publish(Session.Event.Error, { diff --git a/packages/opencode/src/session/status.ts b/packages/opencode/src/session/status.ts index 462d5ded488..b8e3768b136 100644 --- a/packages/opencode/src/session/status.ts +++ b/packages/opencode/src/session/status.ts @@ -18,6 +18,11 @@ export namespace SessionStatus { message: z.string(), next: z.number(), }), + z.object({ + type: z.literal("reconnecting"), + attempt: z.number(), + message: z.string(), + }), z.object({ type: z.literal("busy"), }), diff --git a/packages/opencode/test/session/reconnection.test.ts b/packages/opencode/test/session/reconnection.test.ts new file mode 100644 index 00000000000..cf4e9155089 --- /dev/null +++ b/packages/opencode/test/session/reconnection.test.ts @@ -0,0 +1,186 @@ +import { describe, test, expect } from "bun:test" +import path from "path" +import { Log } from "../../src/util/log" +import { Instance } from "../../src/project/instance" +import { Session } from "../../src/session" +import { SessionProcessor } from "../../src/session/processor" +import { SessionStatus } from "../../src/session/status" +import { SessionRetry } from "../../src/session/retry" +import { Bus } from "../../src/bus" +import { MessageV2 } from "../../src/session/message-v2" +import { LLM } from "../../src/session/llm" +import type { Provider } from "../../src/provider/provider" +import { MessageID, PartID } from "../../src/session/schema" +import { ProviderID, ModelID } from "../../src/provider/schema" + +const projectRoot = path.join(__dirname, "../..") +Log.init({ print: false }) + +const model: Provider.Model = { + id: ModelID.make("test-model"), + providerID: ProviderID.make("test"), + api: { id: "test", url: "http://localhost:9999", npm: "@ai-sdk/openai" }, + name: "Test Model", + capabilities: { + temperature: false, + reasoning: false, + attachment: false, + toolcall: false, + input: { text: true, audio: false, image: false, video: false, pdf: false }, + output: { text: true, audio: false, image: false, video: false, pdf: false }, + interleaved: false, + }, + cost: { input: 0, output: 0, cache: { read: 0, write: 0 } }, + limit: { context: 100000, output: 4096 }, + status: "active", + options: {}, + headers: {}, + release_date: "2024-01-01", +} + +async function makeMsg() { + const session = await Session.create({}) + const userID = MessageID.ascending() + await Session.updateMessage({ + id: userID, + sessionID: session.id, + role: "user", + time: { created: Date.now() }, + agent: "build", + model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") }, + } as unknown as MessageV2.Info) + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: session.id, + role: "assistant", + time: { created: Date.now() }, + parentID: userID, + modelID: ModelID.make("test"), + providerID: ProviderID.make("test"), + mode: "primary", + agent: "build", + path: { cwd: projectRoot, root: projectRoot }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + } + await Session.updateMessage(msg) + return { session, msg } +} + +async function* sseTimeout() { + yield { type: "start" } + throw new Error("SSE read timed out") +} + +async function* ok() { + yield { type: "start" } +} + +type Reconnecting = Extract + +describe("session.processor.reconnection", () => { + test("busy → reconnecting(1) → busy → success with partial part cleanup", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const { session, msg } = await makeMsg() + + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: msg.id, + type: "text", + text: "pre-existing partial", + time: { start: Date.now() }, + }) + + const statuses: SessionStatus.Info[] = [] + const unsub = Bus.subscribe(SessionStatus.Event.Status, (e) => { + statuses.push(e.properties.status) + }) + + const [prevStream, prevSleep] = [LLM.stream, SessionRetry.sleep] + ;(SessionRetry as any).sleep = async () => {} + + let call = 0 + ;(LLM as any).stream = async () => { + call++ + return { fullStream: call === 1 ? sseTimeout() : ok() } + } + + const ctrl = new AbortController() + const proc = SessionProcessor.create({ + assistantMessage: msg, + sessionID: session.id, + model, + abort: ctrl.signal, + }) + + const result = await proc.process({} as unknown as LLM.StreamInput) + + ;(LLM as any).stream = prevStream + ;(SessionRetry as any).sleep = prevSleep + unsub() + + expect(call).toBe(2) + expect(result).toBe("continue") + + const reconnecting = statuses.filter((s): s is Reconnecting => s.type === "reconnecting") + expect(reconnecting.length).toBe(1) + expect(reconnecting[0].attempt).toBe(1) + expect(reconnecting[0].message).toBe("SSE read timed out") + + expect(statuses.filter((s) => s.type === "busy").length).toBeGreaterThanOrEqual(2) + + const parts = await MessageV2.parts(msg.id) + const text = parts.find((p): p is MessageV2.TextPart => p.type === "text") + expect(text).toBeDefined() + expect(text?.text).toBe("") + + await Session.remove(session.id) + }, + }) + }, 30_000) + + test("max network retries exhausted: 5 reconnecting states → idle with error → stop", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const { session, msg } = await makeMsg() + + const statuses: SessionStatus.Info[] = [] + const unsub = Bus.subscribe(SessionStatus.Event.Status, (e) => { + statuses.push(e.properties.status) + }) + + const [prevStream, prevSleep] = [LLM.stream, SessionRetry.sleep] + ;(SessionRetry as any).sleep = async () => {} + ;(LLM as any).stream = async () => ({ fullStream: sseTimeout() }) + + const ctrl = new AbortController() + const proc = SessionProcessor.create({ + assistantMessage: msg, + sessionID: session.id, + model, + abort: ctrl.signal, + }) + + const result = await proc.process({} as unknown as LLM.StreamInput) + + ;(LLM as any).stream = prevStream + ;(SessionRetry as any).sleep = prevSleep + unsub() + + expect(result).toBe("stop") + + const reconnecting = statuses.filter((s): s is Reconnecting => s.type === "reconnecting") + expect(reconnecting.length).toBe(5) + expect(reconnecting.map((s) => s.attempt)).toStrictEqual([1, 2, 3, 4, 5]) + + expect(statuses.at(-1)?.type).toBe("idle") + + await Session.remove(session.id) + }, + }) + }, 30_000) +}) diff --git a/packages/opencode/test/session/retry.test.ts b/packages/opencode/test/session/retry.test.ts index 621ad99e9b4..13b49acac2d 100644 --- a/packages/opencode/test/session/retry.test.ts +++ b/packages/opencode/test/session/retry.test.ts @@ -157,7 +157,7 @@ describe("session.message-v2.fromError", () => { expect(MessageV2.APIError.isInstance(result)).toBe(true) expect((result as MessageV2.APIError).data.isRetryable).toBe(true) - expect((result as MessageV2.APIError).data.message).toBe("Connection reset by server") + expect((result as MessageV2.APIError).data.message).toBe("Network error") expect((result as MessageV2.APIError).data.metadata?.code).toBe("ECONNRESET") expect((result as MessageV2.APIError).data.metadata?.message).toInclude("socket connection") }, @@ -166,14 +166,14 @@ describe("session.message-v2.fromError", () => { test("ECONNRESET socket error is retryable", () => { const error = new MessageV2.APIError({ - message: "Connection reset by server", + message: "Network error", isRetryable: true, metadata: { code: "ECONNRESET", message: "The socket connection was closed unexpectedly" }, }).toObject() as MessageV2.APIError const retryable = SessionRetry.retryable(error) expect(retryable).toBeDefined() - expect(retryable).toBe("Connection reset by server") + expect(retryable).toBe("Network error") }) test("marks OpenAI 404 status codes as retryable", () => { diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 86a0c7e4256..766b830995c 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -588,6 +588,11 @@ export type SessionStatus = message: string next: number } + | { + type: "reconnecting" + attempt: number + message: string + } | { type: "busy" }