Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 39 additions & 2 deletions packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,9 @@ export function Prompt(props: PromptProps) {
flexDirection="row"
gap={1}
flexGrow={1}
justifyContent={status().type === "retry" ? "space-between" : "flex-start"}
justifyContent={
status().type === "retry" || status().type === "reconnecting" ? "space-between" : "flex-start"
}
>
<box flexShrink={0} flexDirection="row" gap={1}>
<box marginLeft={1}>
Expand Down Expand Up @@ -1128,6 +1130,41 @@ export function Prompt(props: PromptProps) {
</Show>
)
})()}
{(() => {
const reconnecting = createMemo(() => {
const s = status()
if (s.type !== "reconnecting") return
return s
})
const [visible, setVisible] = createSignal(false)
let timer: ReturnType<typeof setTimeout> | undefined
createEffect(() => {
const r = reconnecting()
if (r) {
timer = setTimeout(() => setVisible(true), 1000)
} else {
clearTimeout(timer)
setVisible(false)
}
})
onCleanup(() => clearTimeout(timer))
const msg = createMemo(() => {
const r = reconnecting()
if (!r) return
if (r.message.length > 80) return r.message.slice(0, 80) + "..."
return r.message
})

return (
<Show when={visible() && reconnecting()}>
<box>
<text fg={theme.warning}>
{msg()} [reconnecting attempt #{reconnecting()?.attempt}]
</text>
</box>
</Show>
)
})()}
</box>
</box>
<text fg={store.interrupt > 0 ? theme.primary : theme.text}>
Expand All @@ -1138,7 +1175,7 @@ export function Prompt(props: PromptProps) {
</text>
</box>
</Show>
<Show when={status().type !== "retry"}>
<Show when={status().type !== "retry" && status().type !== "reconnecting"}>
<box gap={2} flexDirection="row">
<Switch>
<Match when={store.mode === "normal"}>
Expand Down
79 changes: 66 additions & 13 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,29 @@ export namespace MCP {
})
}

function isNetworkError(err: unknown): boolean {
if (err instanceof UnauthorizedError) return false
if (!(err instanceof Error)) return false
if ("code" in err && typeof (err as { code: unknown }).code === "number") return false
const msg = err.message.toLowerCase()
return (
msg.includes("econnreset") ||
msg.includes("econnrefused") ||
msg.includes("etimedout") ||
msg.includes("fetch failed") ||
msg.includes("socket") ||
msg.includes("network") ||
msg.includes("connection")
)
}

// Convert MCP tool definition to AI SDK Tool type
async function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Promise<Tool> {
async function convertMcpTool(
mcpTool: MCPToolDef,
client: MCPClient,
timeout?: number,
reconnect?: () => Promise<MCPClient | undefined>,
): Promise<Tool> {
const inputSchema = mcpTool.inputSchema

// Spread first, then override type to ensure it's always "object"
Expand All @@ -133,17 +154,27 @@ export namespace MCP {
description: mcpTool.description ?? "",
inputSchema: jsonSchema(schema),
execute: async (args: unknown) => {
return client.callTool(
{
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
},
CallToolResultSchema,
{
resetTimeoutOnProgress: true,
timeout,
},
)
const call = (c: MCPClient) =>
c.callTool(
{
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
},
CallToolResultSchema,
{
resetTimeoutOnProgress: true,
timeout,
},
)
if (!reconnect) return call(client)
try {
return await call(client)
} catch (err) {
if (!isNetworkError(err)) throw err
const fresh = await reconnect().catch(() => undefined)
if (!fresh) throw err
return call(fresh)
}
},
})
}
Expand Down Expand Up @@ -636,10 +667,32 @@ export namespace MCP {
const mcpConfig = config[clientName]
const entry = isMcpConfigured(mcpConfig) ? mcpConfig : undefined
const timeout = entry?.timeout ?? defaultTimeout
const reconnect: (() => Promise<MCPClient | undefined>) | undefined =
entry && entry.type === "remote"
? async () => {
const cur = await state()
const old = cur.clients[clientName]
if (old) {
await old.close().catch(() => {})
delete cur.clients[clientName]
}
log.info("reconnecting remote mcp server after tool call failure", { clientName })
const r = await create(clientName, entry).catch(() => undefined)
if (!r?.mcpClient) return undefined
cur.clients[clientName] = r.mcpClient
cur.status[clientName] = r.status
return r.mcpClient
}
: undefined
for (const mcpTool of toolsResult.tools) {
const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
result[sanitizedClientName + "_" + sanitizedToolName] = await convertMcpTool(mcpTool, client, timeout)
result[sanitizedClientName + "_" + sanitizedToolName] = await convertMcpTool(
mcpTool,
client,
timeout,
reconnect,
)
}
}
return result
Expand Down
8 changes: 4 additions & 4 deletions packages/opencode/src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1237,18 +1237,19 @@ export namespace Provider {
if (existing) return existing

const customFetch = options["fetch"]
const chunkTimeout = options["chunkTimeout"]
const chunkTimeoutRaw = options["chunkTimeout"]
delete options["chunkTimeout"]
const chunkTimeout = typeof chunkTimeoutRaw === "number" && chunkTimeoutRaw > 0 ? chunkTimeoutRaw : 30_000

options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
// Preserve custom fetch if it exists, wrap it with timeout logic
const fetchFn = customFetch ?? fetch
const opts = init ?? {}
const chunkAbortCtl = typeof chunkTimeout === "number" && chunkTimeout > 0 ? new AbortController() : undefined
const chunkAbortCtl = new AbortController()
const signals: AbortSignal[] = []

if (opts.signal) signals.push(opts.signal)
if (chunkAbortCtl) signals.push(chunkAbortCtl.signal)
signals.push(chunkAbortCtl.signal)
if (options["timeout"] !== undefined && options["timeout"] !== null && options["timeout"] !== false)
signals.push(AbortSignal.timeout(options["timeout"]))

Expand Down Expand Up @@ -1279,7 +1280,6 @@ export namespace Provider {
timeout: false,
})

if (!chunkAbortCtl) return res
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
}

Expand Down
25 changes: 23 additions & 2 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ import type { Provider } from "@/provider/provider"
import { ModelID, ProviderID } from "@/provider/schema"

export namespace MessageV2 {
const NETWORK_ERROR_CODES = new Set([
"ECONNRESET",
"ETIMEDOUT",
"ENETUNREACH",
"EHOSTUNREACH",
"ENOTFOUND",
"EPIPE",
"ECONNREFUSED",
])

export function isMedia(mime: string) {
return mime.startsWith("image/") || mime === "application/pdf"
}
Expand Down Expand Up @@ -916,10 +926,10 @@ export namespace MessageV2 {
},
{ cause: e },
).toObject()
case (e as SystemError)?.code === "ECONNRESET":
case NETWORK_ERROR_CODES.has((e as SystemError)?.code ?? ""):
return new MessageV2.APIError(
{
message: "Connection reset by server",
message: "Network error",
isRetryable: true,
metadata: {
code: (e as SystemError).code ?? "",
Expand All @@ -929,6 +939,17 @@ export namespace MessageV2 {
},
{ cause: e },
).toObject()
case e instanceof Error && e.message === "SSE read timed out":
return new MessageV2.APIError(
{
message: "SSE read timed out",
isRetryable: true,
metadata: {
message: e.message,
},
},
{ cause: e },
).toObject()
case APICallError.isInstance(e):
const parsed = ProviderError.parseAPICallError({
providerID: ctx.providerID,
Expand Down
91 changes: 81 additions & 10 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type { SessionID, MessageID } from "./schema"

export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const MAX_NETWORK_RETRIES = 5
const log = Log.create({ service: "session.processor" })

export type Info = Awaited<ReturnType<typeof create>>
Expand All @@ -34,7 +35,48 @@ export namespace SessionProcessor {
let snapshot: string | undefined
let blocked = false
let attempt = 0
let networkAttempt = 0
let receivedChunk = false
let needsCompaction = false
const cleanup = async () => {
const parts = await MessageV2.parts(input.assistantMessage.id)
for (const part of parts) {
if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
await Session.removePart({
sessionID: input.sessionID,
messageID: input.assistantMessage.id,
partID: part.id,
})
continue
}
if (part.type === "text") {
await Session.updatePart({
...part,
text: "",
time: part.time
? {
start: part.time.start,
}
: undefined,
})
continue
}
if (part.type === "reasoning") {
await Session.updatePart({
...part,
text: "",
time: {
start: part.time.start,
},
})
}
}
Object.keys(toolcalls).forEach((id) => {
delete toolcalls[id]
})
input.assistantMessage.time.completed = undefined
await Session.updateMessage(input.assistantMessage)
}

const result = {
get message() {
Expand All @@ -49,11 +91,13 @@ export namespace SessionProcessor {
const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true
while (true) {
try {
receivedChunk = false
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const stream = await LLM.stream(streamInput)

for await (const value of stream.fullStream) {
receivedChunk = true
input.abort.throwIfAborted()
switch (value.type) {
case "start":
Expand Down Expand Up @@ -366,16 +410,43 @@ export namespace SessionProcessor {
} else {
const retry = SessionRetry.retryable(error)
if (retry !== undefined) {
attempt++
const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
await SessionStatus.set(input.sessionID, {
type: "retry",
attempt,
message: retry,
next: Date.now() + delay,
})
await SessionRetry.sleep(delay, input.abort).catch(() => {})
continue
const network =
MessageV2.APIError.isInstance(error) &&
error.data.isRetryable &&
(error.data.message.includes("Network error") ||
error.data.message.includes("SSE read timed out") ||
error.data.message.includes("Connection reset by server"))
if (network) {
networkAttempt++
if (networkAttempt <= MAX_NETWORK_RETRIES) {
const delay = Math.min(1000 * Math.pow(2, networkAttempt - 1), 5000)
await SessionStatus.set(input.sessionID, {
type: "reconnecting",
attempt: networkAttempt,
message: retry,
})
if (receivedChunk) {
await cleanup()
}
await SessionRetry.sleep(delay, input.abort).catch(() => {})
continue
}
}
if (!network) {
attempt++
const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
await SessionStatus.set(input.sessionID, {
type: "retry",
attempt,
message: retry,
next: Date.now() + delay,
})
if (receivedChunk) {
await cleanup()
}
await SessionRetry.sleep(delay, input.abort).catch(() => {})
continue
}
}
input.assistantMessage.error = error
Bus.publish(Session.Event.Error, {
Expand Down
5 changes: 5 additions & 0 deletions packages/opencode/src/session/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export namespace SessionStatus {
message: z.string(),
next: z.number(),
}),
z.object({
type: z.literal("reconnecting"),
attempt: z.number(),
message: z.string(),
}),
z.object({
type: z.literal("busy"),
}),
Expand Down
Loading
Loading