Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 72 additions & 32 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { dynamicTool, type Tool, jsonSchema, type JSONSchema7 } from "ai"
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"
import { isTransportError } from "./transport-error"
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js"
Expand Down Expand Up @@ -119,37 +120,6 @@ function remoteURL(key: string, value: string) {
log.warn("invalid remote mcp url", { key })
}

// Convert MCP tool definition to AI SDK Tool type
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
const inputSchema = mcpTool.inputSchema

// Spread first, then override type to ensure it's always "object"
const schema: JSONSchema7 = {
...(inputSchema as JSONSchema7),
type: "object",
properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"],
additionalProperties: false,
}

return dynamicTool({
description: mcpTool.description ?? "",
inputSchema: jsonSchema(schema),
execute: async (args: unknown) => {
return client.callTool(
{
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
},
CallToolResultSchema,
{
resetTimeoutOnProgress: true,
timeout,
},
)
},
})
}

function defs(key: string, client: MCPClient, timeout?: number) {
return Effect.tryPromise({
try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT),
Expand Down Expand Up @@ -243,6 +213,8 @@ export const layer = Layer.effect(
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const auth = yield* McpAuth.Service
const bus = yield* Bus.Service
const layerBridge = yield* EffectBridge.make()
const reconnecting = new Map<string, Promise<boolean>>()

type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport

Expand Down Expand Up @@ -635,6 +607,69 @@ export const layer = Layer.effect(
const config = cfg.mcp ?? {}
const defaultTimeout = cfg.experimental?.mcp_timeout

// Single-flight reconnect: concurrent tool calls for the same MCP name
// share one in-flight Promise instead of each triggering a new connect.
// The entry is removed on both success and failure.
const reconnectClient = (name: string): Promise<boolean> => {
const existing = reconnecting.get(name)
if (existing) return existing
const p = layerBridge
.promise(getMcpConfig(name))
.then((mcp) => {
if (!mcp) return false
return layerBridge
.promise(createAndStore(name, { ...mcp, enabled: true }))
.then((status) => status.status === "connected")
})
.catch((err) => {
log.error("mcp reconnect failed", { name, error: err instanceof Error ? err.message : String(err) })
return false
})
.finally(() => {
reconnecting.delete(name)
})
reconnecting.set(name, p)
return p
}

// Wraps an MCP tool as an AI SDK dynamicTool. The key piece is the
// catch branch in execute: on a transport error, call reconnectClient
// and retry once with the fresh client. Non-transport errors and
// failed reconnects are rethrown as-is so business errors stay visible.
const makeTool = (clientName: string, mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool => {
const schema: JSONSchema7 = {
...(mcpTool.inputSchema as JSONSchema7),
type: "object",
properties: (mcpTool.inputSchema.properties ?? {}) as JSONSchema7["properties"],
additionalProperties: false,
}
return dynamicTool({
description: mcpTool.description ?? "",
inputSchema: jsonSchema(schema),
execute: (args: unknown) => {
const payload = {
name: mcpTool.name,
arguments: (args || {}) as Record<string, unknown>,
}
const opts = { resetTimeoutOnProgress: true, timeout }
return client.callTool(payload, CallToolResultSchema, opts).catch(async (e) => {
if (!isTransportError(e)) throw e
log.warn("mcp transport error, attempting reconnect", {
clientName,
tool: mcpTool.name,
error: e instanceof Error ? e.message : String(e),
})
const ok = await reconnectClient(clientName)
if (!ok) throw e
const next = await layerBridge.promise(InstanceState.get(state))
const fresh = next.clients[clientName]
if (!fresh || next.status[clientName]?.status !== "connected") throw e
return fresh.callTool(payload, CallToolResultSchema, opts)
})
},
})
}

const connectedClients = Object.entries(s.clients).filter(
([clientName]) => s.status[clientName]?.status === "connected",
)
Expand All @@ -654,7 +689,12 @@ export const layer = Layer.effect(

const timeout = entry?.timeout ?? defaultTimeout
for (const mcpTool of listed) {
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = makeTool(
clientName,
mcpTool,
client,
timeout,
)
}
}),
{ concurrency: "unbounded" },
Expand Down
48 changes: 48 additions & 0 deletions packages/opencode/src/mcp/transport-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { StreamableHTTPError } from "@modelcontextprotocol/sdk/client/streamableHttp.js"

// Fast-path unit tests live at `test/mcp/transport-error.test.ts`; they feed
// synthetic error objects into `isTransportError`. Whenever this classifier
// changes, also run `bun run test:mcp-probe` (test/mcp/transport-error-probe.mjs)
// to verify those synthetic shapes still match what real servers / sockets
// emit under `bun` and `node`.

const TRANSPORT_ERROR_CODES = new Set([
// Node / Undici
"ECONNRESET",
"ECONNREFUSED",
"ETIMEDOUT",
"EHOSTUNREACH",
"ENOTFOUND",
"EPIPE",
"ECONNABORTED",
"UND_ERR_SOCKET",
"UND_ERR_CLOSED",
// Bun (uses PascalCase identifiers on err.code)
"ConnectionRefused",
"ConnectionReset",
"ConnectionAborted",
"ConnectionClosed",
"Timeout",
"SocketClosed",
"NotConnected",
"FailedToOpenSocket",
])

export function isTransportError(e: unknown): boolean {
if (e instanceof StreamableHTTPError) {
// -1 = SDK protocol-level breakage (unexpected content-type, etc.)
if (e.code === -1) return true
if (typeof e.code !== "number") return false
// 401/403 belong to auth flow, not transport
if (e.code === 401 || e.code === 403) return false
// Anything else 4xx (stale session 404, bad request 400, gone 410, ...) or 5xx counts
return e.code >= 400
}
if (!(e instanceof Error)) return false
const err = e as Error & { code?: string; cause?: { code?: string } }
if (err.cause?.code && TRANSPORT_ERROR_CODES.has(err.cause.code)) return true
if (err.code && TRANSPORT_ERROR_CODES.has(err.code)) return true
if (err.message.includes("fetch failed")) return true
if (err.message.includes("Unable to connect")) return true
return false
}
8 changes: 8 additions & 0 deletions packages/opencode/test/mcp/headers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({
throw new Error("Mock transport cannot connect")
}
},
// Re-export so `src/mcp/transport-error.ts` can still `instanceof`-check.
StreamableHTTPError: class StreamableHTTPError extends Error {
readonly code: number | undefined
constructor(code: number | undefined, message: string | undefined) {
super(message)
this.code = code
}
},
}))

void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({
Expand Down
Loading
Loading