Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/opencode/src/cli/cmd/tui/context/sync-v2.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ export const { use: useSyncV2, provider: SyncProviderV2 } = createSimpleContext(
})
break
case "session.next.tool.input.ended":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (match?.state.status === "pending") match.state.input = event.properties.text
})
break
case "session.next.tool.called":
update(event.properties.sessionID, (draft) => {
Expand Down
29 changes: 24 additions & 5 deletions packages/opencode/src/server/routes/instance/httpapi/event.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { Bus } from "@/bus"
import { GlobalBus, type GlobalEvent as GlobalBusEvent } from "@/bus/global"
import * as InstanceState from "@/effect/instance-state"
import type { WorkspaceID } from "@/control-plane/schema"
import type { InstanceContext } from "@/project/instance"
import * as Log from "@opencode-ai/core/util/log"
import { Effect, Schema } from "effect"
import { Effect, Queue, Schema } from "effect"
import * as Stream from "effect/Stream"
import { HttpServerResponse } from "effect/unstable/http"
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
Expand Down Expand Up @@ -39,8 +43,21 @@ function eventData(data: unknown): Sse.Event {
}
}

function eventResponse(bus: Bus.Interface) {
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
function eventResponse(context: { instance: InstanceContext; workspace: WorkspaceID | undefined }) {
const events = Stream.callback<GlobalBusEvent>((queue) => {
const handler = (event: GlobalBusEvent) => {
if (event.directory !== context.instance.directory) return
if (event.workspace !== context.workspace) return
Queue.offerUnsafe(queue, event)
}
return Effect.acquireRelease(
Effect.sync(() => GlobalBus.on("event", handler)),
() => Effect.sync(() => GlobalBus.off("event", handler)),
)
}).pipe(
Stream.map((event) => event.payload),
Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type),
)
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ id: Bus.createID(), type: "server.heartbeat", properties: {} })),
Expand Down Expand Up @@ -68,11 +85,13 @@ function eventResponse(bus: Bus.Interface) {

export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers) =>
Effect.gen(function* () {
const bus = yield* Bus.Service
return handlers.handleRaw(
"subscribe",
Effect.fn("EventHttpApi.subscribe")(function* () {
return eventResponse(bus)
return eventResponse({
instance: yield* InstanceState.context,
workspace: yield* InstanceState.workspaceID,
})
}),
)
}),
Expand Down
30 changes: 27 additions & 3 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type ToolCall = {
messageID: MessageV2.ToolPart["messageID"]
sessionID: MessageV2.ToolPart["sessionID"]
done: Deferred.Deferred<void>
raw: string
}

interface ProcessorContext extends Input {
Expand Down Expand Up @@ -299,20 +300,44 @@ export const layer: Layer.Layer<
partID: part.id,
messageID: part.messageID,
sessionID: part.sessionID,
raw: ctx.toolcalls[value.id]?.raw ?? "",
}
return

case "tool-input-delta":
case "tool-input-delta": {
if (!value.delta) return
const call = ctx.toolcalls[value.id]
if (!call) return
call.raw += value.delta
EventV2.run(SessionEvent.Tool.Input.Delta.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
delta: value.delta,
timestamp: DateTime.makeUnsafe(Date.now()),
})
return
}

case "tool-input-end": {
const raw = ctx.toolcalls[value.id]?.raw ?? ""
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Input.Ended.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
text: "",
text: raw,
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* updateToolCall(value.id, (match) =>
match.state.status === "pending"
? {
...match,
state: {
...match.state,
raw,
},
}
: match,
)
return
}

Expand All @@ -337,7 +362,6 @@ export const layer: Layer.Layer<
...match,
tool: value.toolName,
state: {
...match.state,
status: "running",
input: value.input,
time: { start: Date.now() },
Expand Down
2 changes: 0 additions & 2 deletions packages/opencode/src/v2/event.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Identifier } from "@/id/id"
import { SyncEvent } from "@/sync"
import { withStatics } from "@opencode-ai/core/schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Schema from "effect/Schema"

export const ID = Schema.String.pipe(
Expand Down Expand Up @@ -46,7 +45,6 @@ export function run<Def extends SyncEvent.Definition>(
data: SyncEvent.Event<Def>["data"],
options?: { publish?: boolean },
) {
if (!Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) return
SyncEvent.run(def, data, options)
}

Expand Down
11 changes: 10 additions & 1 deletion packages/opencode/src/v2/session-message-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,16 @@ export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Eve
)
}
},
"session.next.tool.input.ended": () => {},
"session.next.tool.input.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "pending") match.state.input = event.data.text
}),
)
}
},
"session.next.tool.called": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
Expand Down
30 changes: 28 additions & 2 deletions packages/opencode/test/provider/copilot/copilot-chat-model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ describe("doStream", () => {

// Check tool calls
const toolParts = parts.filter(
(p) => p.type === "tool-input-start" || p.type === "tool-call" || p.type === "tool-input-end",
(p) =>
p.type === "tool-input-start" ||
p.type === "tool-input-delta" ||
p.type === "tool-call" ||
p.type === "tool-input-end",
)

expect(toolParts).toContainEqual({
Expand All @@ -196,6 +200,15 @@ describe("doStream", () => {
id: "call_def456",
toolName: "read_file",
})
expect(
toolParts.some(
(part) =>
part.type === "tool-input-delta" &&
part.id === "call_abc123" &&
typeof part.delta === "string" &&
part.delta.length > 0,
),
).toBe(true)

// Check finish
const finish = parts.find((p) => p.type === "finish")
Expand Down Expand Up @@ -367,7 +380,11 @@ describe("doStream", () => {

// Check tool call
const toolParts = parts.filter(
(p) => p.type === "tool-input-start" || p.type === "tool-call" || p.type === "tool-input-end",
(p) =>
p.type === "tool-input-start" ||
p.type === "tool-input-delta" ||
p.type === "tool-call" ||
p.type === "tool-input-end",
)

expect(toolParts).toContainEqual({
Expand All @@ -383,6 +400,15 @@ describe("doStream", () => {
toolName: "list_project_files",
}),
)
expect(
toolParts.some(
(part) =>
part.type === "tool-input-delta" &&
part.id === "call_MHxqRDd5WVo3NU8wUXRaMmc0MFE" &&
typeof part.delta === "string" &&
part.delta.length > 0,
),
).toBe(true)

// Check finish
const finish = parts.find((p) => p.type === "finish")
Expand Down
142 changes: 142 additions & 0 deletions packages/opencode/test/session/llm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,46 @@ function createChatStream(text: string) {
})
}

function createToolChatStream(toolName: string, input: Record<string, unknown>) {
const args = JSON.stringify(input)
return createEventStream(
[
{
id: "chatcmpl-tool",
object: "chat.completion.chunk",
choices: [{ delta: { role: "assistant" } }],
},
{
id: "chatcmpl-tool",
object: "chat.completion.chunk",
choices: [
{
delta: {
tool_calls: [
{
index: 0,
id: "call_1",
type: "function",
function: {
name: toolName,
arguments: args,
},
},
],
},
},
],
},
{
id: "chatcmpl-tool",
object: "chat.completion.chunk",
choices: [{ delta: {}, finish_reason: "tool_calls" }],
},
],
true,
)
}

async function loadFixture(providerID: string, modelID: string) {
const fixturePath = path.join(import.meta.dir, "../tool/fixtures/models-api.json")
const data = await Filesystem.readJson<Record<string, ModelsDev.Provider>>(fixturePath)
Expand Down Expand Up @@ -300,6 +340,108 @@ function createEventResponse(chunks: unknown[], includeDone = false) {
}

describe("session.llm.stream", () => {
test("emits tool input streaming events for openai-compatible tool calls", async () => {
const server = state.server
if (!server) {
throw new Error("Server not initialized")
}

const providerID = "vivgrid"
const modelID = "gemini-3.1-pro-preview"
const fixture = await loadFixture(providerID, modelID)
const model = fixture.model

waitRequest(
"/chat/completions",
new Response(createToolChatStream("bash", { command: "pwd" }), {
status: 200,
headers: { "Content-Type": "text/event-stream" },
}),
)

await using tmp = await tmpdir({
init: async (dir) => {
await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify({
provider: {
[providerID]: {
...fixture.provider,
models: { [model.id]: model },
options: {
apiKey: "test-key",
baseURL: server.url.origin,
},
},
},
}),
)
},
})

const events = await WithInstance.provide({
directory: tmp.path,
fn: async () => {
const resolved = await getModel(ProviderID.make(providerID), ModelID.make(model.id))
const sessionID = SessionID.make("session-test-tool-stream")
const agent = {
name: "test",
mode: "primary",
options: {},
permission: [{ permission: "*", pattern: "*", action: "allow" }],
} satisfies Agent.Info
const user = {
id: MessageID.make("msg_user-tool-stream"),
sessionID,
role: "user",
time: { created: Date.now() },
agent: agent.name,
model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
} satisfies MessageV2.User

return await llm.runPromise((svc) =>
svc
.stream({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
messages: [{ role: "user", content: "Run bash" }],
tools: {
bash: tool({
description: "Stub bash tool",
inputSchema: z.object({ command: z.string() }),
execute: async () => ({ output: "pwd" }),
}),
},
})
.pipe(
Stream.runCollect,
Effect.map((chunks) => [...chunks]),
),
)
},
})

expect(events.some((event) => event.type === "tool-input-start" && event.id === "call_1")).toBe(true)
expect(
events.some(
(event) =>
event.type === "tool-input-delta" && event.id === "call_1" && event.delta === '{"command":"pwd"}',
),
).toBe(true)
expect(events.some((event) => event.type === "tool-input-end" && event.id === "call_1")).toBe(true)
expect(
events.some(
(event) =>
event.type === "tool-call" &&
event.toolCallId === "call_1" &&
JSON.stringify(event.input) === '{"command":"pwd"}',
),
).toBe(true)
})

test("sends temperature, tokens, and reasoning options for openai-compatible models", async () => {
const server = state.server
if (!server) {
Expand Down
Loading
Loading