From 370783d5ce5a8fbcc879ce8c0bf2ef1171436536 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Tue, 26 May 2026 11:27:31 +0530 Subject: [PATCH] feat(acp): add acp-next session lifecycle --- packages/opencode/src/acp-next/agent.ts | 20 +++ packages/opencode/src/acp-next/service.ts | 154 ++++++++++++++++++ .../test/acp-next/service-session.test.ts | 154 +++++++++++++++++- .../cli/acp-next/acp-next-process.test.ts | 56 ++++++- 4 files changed, 378 insertions(+), 6 deletions(-) diff --git a/packages/opencode/src/acp-next/agent.ts b/packages/opencode/src/acp-next/agent.ts index a4ae00695659..040c1947e2ed 100644 --- a/packages/opencode/src/acp-next/agent.ts +++ b/packages/opencode/src/acp-next/agent.ts @@ -4,10 +4,14 @@ import { type AgentSideConnection, type AuthenticateRequest, type CancelNotification, + type CloseSessionRequest, + type ForkSessionRequest, type InitializeRequest, + type ListSessionsRequest, type LoadSessionRequest, type NewSessionRequest, type PromptRequest, + type ResumeSessionRequest, type SetSessionConfigOptionRequest, type SetSessionModelRequest, type SetSessionModeRequest, @@ -44,6 +48,22 @@ export class Agent implements ACPAgent { return run(this.service.loadSession(params)) } + listSessions(params: ListSessionsRequest) { + return run(this.service.listSessions(params)) + } + + resumeSession(params: ResumeSessionRequest) { + return run(this.service.resumeSession(params)) + } + + closeSession(params: CloseSessionRequest) { + return run(this.service.closeSession(params)) + } + + unstable_forkSession(params: ForkSessionRequest) { + return run(this.service.forkSession(params)) + } + setSessionConfigOption(params: SetSessionConfigOptionRequest) { return run(this.service.setSessionConfigOption(params)) } diff --git a/packages/opencode/src/acp-next/service.ts b/packages/opencode/src/acp-next/service.ts index ce1ea1808585..36352634c4a6 100644 --- a/packages/opencode/src/acp-next/service.ts +++ b/packages/opencode/src/acp-next/service.ts @@ -4,8 +4,14 @@ import { type AuthenticateResponse, type AuthMethod, type CancelNotification, + type CloseSessionRequest, + type CloseSessionResponse, + type ForkSessionRequest, + type ForkSessionResponse, type InitializeRequest, type InitializeResponse, + type ListSessionsRequest, + type ListSessionsResponse, type LoadSessionRequest, type LoadSessionResponse, type McpServer, @@ -13,6 +19,9 @@ import { type NewSessionResponse, type PromptRequest, type PromptResponse, + type ResumeSessionRequest, + type ResumeSessionResponse, + type SessionInfo, type SetSessionConfigOptionRequest, type SetSessionConfigOptionResponse, type SetSessionModelRequest, @@ -21,6 +30,7 @@ import { type SetSessionModeResponse, } from "@agentclientprotocol/sdk" import { InstallationVersion } from "@opencode-ai/core/installation/version" +import * as Log from "@opencode-ai/core/util/log" import type { OpencodeClient } from "@opencode-ai/sdk/v2" import { Context, Effect, Layer, ManagedRuntime } from "effect" import * as ACPNextError from "./error" @@ -32,6 +42,7 @@ import { Provider } from "@/provider/provider" import type { Command } from "@/command" export const AuthMethodID = "opencode-login" +const log = Log.create({ service: "acp-next-service" }) export type Error = ACPNextError.Error @@ -40,6 +51,10 @@ export type Interface = { readonly authenticate: (input: AuthenticateRequest) => Effect.Effect readonly newSession: (input: NewSessionRequest) => Effect.Effect readonly loadSession: (input: LoadSessionRequest) => Effect.Effect + readonly listSessions: (input: ListSessionsRequest) => Effect.Effect + readonly resumeSession: (input: ResumeSessionRequest) => Effect.Effect + readonly closeSession: (input: CloseSessionRequest) => Effect.Effect + readonly forkSession: (input: ForkSessionRequest) => Effect.Effect readonly setSessionConfigOption: ( input: SetSessionConfigOptionRequest, ) => Effect.Effect @@ -90,6 +105,12 @@ export function make(input: { embeddedContext: true, image: true, }, + sessionCapabilities: { + close: {}, + fork: {}, + list: {}, + resume: {}, + }, }, authMethods: [authMethod], agentInfo: { @@ -181,6 +202,135 @@ export function make(input: { yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers) yield* sendAvailableCommands(input.connection, state.id, snapshot) + return { + configOptions: configOptions(snapshot, { + model: state.model ?? model, + variant: state.variant, + modeId: state.modeId, + }), + } + }) + + const listSessions = Effect.fn("ACPNext.listSessions")(function* (params: ListSessionsRequest) { + const cursor = params.cursor ? Number(params.cursor) : undefined + const limit = 100 + const sessions = yield* request( + () => + input.sdk.session.list( + { + ...(params.cwd ? { directory: params.cwd } : {}), + roots: true, + }, + { throwOnError: true }, + ), + "session", + ) + const sorted = sessions.toSorted((a, b) => b.time.updated - a.time.updated) + const filtered = + cursor === undefined || !Number.isFinite(cursor) ? sorted : sorted.filter((item) => item.time.updated < cursor) + const page = filtered.slice(0, limit) + const last = page.at(-1) + return { + sessions: page.map((item): SessionInfo => ({ + sessionId: item.id, + cwd: item.directory, + title: item.title, + updatedAt: new Date(item.time.updated).toISOString(), + })), + ...(filtered.length > limit && last ? { nextCursor: String(last.time.updated) } : {}), + } + }) + + const resumeSession = Effect.fn("ACPNext.resumeSession")(function* (params: ResumeSessionRequest) { + const snapshot = yield* directorySnapshot(params.cwd) + yield* request( + () => input.sdk.session.get({ directory: params.cwd, sessionID: params.sessionId }, { throwOnError: true }), + "session", + ) + const messages = yield* request( + () => + input.sdk.session.messages( + { directory: params.cwd, sessionID: params.sessionId, limit: 20 }, + { throwOnError: true }, + ), + "session", + ) + const restored = restoreFromMessages(messages.map((item) => item.info)) + const model = restored.model ?? selectDefaultModel(snapshot) + const state = yield* session.load({ + id: params.sessionId, + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + model, + variant: restored.variant ?? selectVariant(snapshot, model), + modeId: restored.modeId ?? (snapshot.availableModes.length > 0 ? snapshot.defaultModeID : undefined), + }) + + yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? []) + yield* sendAvailableCommands(input.connection, state.id, snapshot) + + return { + configOptions: configOptions(snapshot, { + model: state.model ?? model, + variant: state.variant, + modeId: state.modeId, + }), + } + }) + + const closeSession = Effect.fn("ACPNext.closeSession")(function* (params: CloseSessionRequest) { + const removed = yield* session.remove(params.sessionId) + registeredMcp.delete(params.sessionId) + if (!removed) return {} + + yield* request( + () => input.sdk.session.abort({ directory: removed.cwd, sessionID: params.sessionId }, { throwOnError: true }), + "session", + ).pipe( + Effect.catch((error) => + Effect.sync(() => { + log.error("failed to abort session while closing ACP session", { error, sessionID: params.sessionId }) + }), + ), + ) + return {} + }) + + const forkSession = Effect.fn("ACPNext.forkSession")(function* (params: ForkSessionRequest) { + const snapshot = yield* directorySnapshot(params.cwd) + const forked = yield* request( + () => + input.sdk.session.fork( + { + directory: params.cwd, + sessionID: params.sessionId, + }, + { throwOnError: true }, + ), + "session", + ) + const messages = yield* request( + () => + input.sdk.session.messages( + { directory: params.cwd, sessionID: forked.id, limit: 20 }, + { throwOnError: true }, + ), + "session", + ) + const restored = restoreFromMessages(messages.map((item) => item.info)) + const model = restored.model ?? selectDefaultModel(snapshot) + const state = yield* session.load({ + id: forked.id, + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + model, + variant: restored.variant ?? selectVariant(snapshot, model), + modeId: restored.modeId ?? (snapshot.availableModes.length > 0 ? snapshot.defaultModeID : undefined), + }) + + yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? []) + yield* sendAvailableCommands(input.connection, state.id, snapshot) + return { sessionId: state.id, configOptions: configOptions(snapshot, { @@ -278,6 +428,10 @@ export function make(input: { authenticate, newSession, loadSession, + listSessions, + resumeSession, + closeSession, + forkSession, setSessionConfigOption, setSessionMode, setSessionModel, diff --git a/packages/opencode/test/acp-next/service-session.test.ts b/packages/opencode/test/acp-next/service-session.test.ts index a024758aafc0..96d4066f7ee5 100644 --- a/packages/opencode/test/acp-next/service-session.test.ts +++ b/packages/opencode/test/acp-next/service-session.test.ts @@ -1,16 +1,19 @@ import { describe, expect, it } from "bun:test" import type { AgentSideConnection, + ForkSessionResponse, LoadSessionResponse, NewSessionResponse, + ResumeSessionResponse, SessionConfigOption, SessionConfigSelectOption, SetSessionConfigOptionResponse, } from "@agentclientprotocol/sdk" import type { OpencodeClient } from "@opencode-ai/sdk/v2" -import { Effect } from "effect" +import { Effect, ManagedRuntime } from "effect" import * as ACPNextService from "@/acp-next/service" import * as ACPNextError from "@/acp-next/error" +import { ACPNextSession } from "@/acp-next/session" import { ModelID, ProviderID } from "@/provider/schema" import type { Provider } from "@/provider/provider" @@ -140,6 +143,14 @@ describe("ACP next service sessions", () => { const makeService = (messages: readonly { info: unknown; parts: readonly unknown[] }[] = []) => { const updates: unknown[] = [] const mcpAdds: string[] = [] + const aborts: string[] = [] + const forks: string[] = [] + const sessions = Array.from({ length: 102 }, (_, index) => ({ + id: `ses_${index + 1}`, + directory: index % 2 === 0 ? "/workspace" : "/other", + title: `Session ${index + 1}`, + time: { created: index + 1, updated: index + 1 }, + })) const sdk = { config: { providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }), @@ -168,8 +179,19 @@ describe("ACP next service sessions", () => { session: { create: () => Promise.resolve({ data: { id: "ses_new" } }), get: () => Promise.resolve({ data: { id: "ses_loaded" } }), - list: () => Promise.resolve({ data: [] }), + list: (input: { directory?: string }) => + Promise.resolve({ + data: input.directory ? sessions.filter((session) => session.directory === input.directory) : sessions, + }), messages: () => Promise.resolve({ data: messages }), + abort: (input: { sessionID: string }) => { + aborts.push(input.sessionID) + return Promise.resolve({ data: true }) + }, + fork: (input: { sessionID: string }) => { + forks.push(input.sessionID) + return Promise.resolve({ data: { id: `fork_${input.sessionID}` } }) + }, }, mcp: { add: (input: { name?: string }) => { @@ -185,7 +207,7 @@ describe("ACP next service sessions", () => { }, } as Pick - return { service: ACPNextService.make({ sdk, connection }), updates, mcpAdds } + return { service: ACPNextService.make({ sdk, connection }), updates, mcpAdds, aborts, forks } } it("creates a backed session with config options and command update", async () => { @@ -233,6 +255,125 @@ describe("ACP next service sessions", () => { expect(result.configOptions?.find((option) => option.id === "mode")?.currentValue).toBe("plan") }) + it("lists sessions sorted by updated time with cursor support", async () => { + const { service } = makeService() + const first = await Effect.runPromise(service.listSessions({ cwd: "/workspace" })) + const second = await Effect.runPromise(service.listSessions({ cwd: "/workspace", cursor: first.nextCursor })) + + expect(first.sessions).toHaveLength(51) + expect(first.sessions[0]?.sessionId).toBe("ses_101") + expect(first.sessions.at(-1)?.sessionId).toBe("ses_1") + expect(first.nextCursor).toBeUndefined() + expect(second.sessions).toEqual(first.sessions) + }) + + it("lists all sessions with next cursor when the first page is full", async () => { + const { service } = makeService() + const first = await Effect.runPromise(service.listSessions({})) + const second = await Effect.runPromise(service.listSessions({ cursor: first.nextCursor })) + + expect(first.sessions).toHaveLength(100) + expect(first.sessions[0]?.sessionId).toBe("ses_102") + expect(first.sessions.at(-1)?.sessionId).toBe("ses_3") + expect(first.nextCursor).toBe("3") + expect(second.sessions.map((session) => session.sessionId)).toEqual(["ses_2", "ses_1"]) + }) + + it("resumes a session and stores restored state", async () => { + const { service } = makeService([ + { + info: { + role: "user", + model: { providerID: "test", modelID: "test-model", variant: "high" }, + agent: "plan", + }, + parts: [], + }, + ]) + const resumed = await Effect.runPromise( + service.resumeSession({ cwd: "/workspace", sessionId: "ses_resume", mcpServers: [] }), + ) + const updated = await Effect.runPromise( + service.setSessionConfigOption({ sessionId: "ses_resume", configId: "effort", value: "default" }), + ) + + expect(select(resumed, "effort")?.currentValue).toBe("high") + expect(select(updated, "effort")?.currentValue).toBe("default") + }) + + it("closes local ACP state and aborts the backing session best-effort", async () => { + const { service, aborts } = makeService() + const created = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + + expect(await Effect.runPromise(service.closeSession({ sessionId: created.sessionId }))).toEqual({}) + const missing = await Effect.runPromise( + service + .setSessionConfigOption({ sessionId: created.sessionId, configId: "effort", value: "high" }) + .pipe(Effect.mapError(ACPNextError.toRequestError), Effect.flip), + ) + expect(missing.code).toBe(-32602) + expect(aborts).toEqual([created.sessionId]) + expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({}) + }) + + it("does not fail close when backing abort fails", async () => { + const sessionService = ManagedRuntime.make(ACPNextSession.defaultLayer).runSync( + ACPNextSession.Service.use((service) => Effect.succeed(service)), + ) + const { service } = makeService() + const sdk = { + config: { + providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }), + get: () => Promise.resolve({ data: {} }), + }, + app: { + agents: () => Promise.resolve({ data: [{ name: "build", mode: "primary", permission: [], options: {} }] }), + skills: () => Promise.resolve({ data: [] }), + }, + command: { + list: () => Promise.resolve({ data: [] }), + }, + session: { + abort: () => Promise.reject(new Error("nope")), + }, + mcp: { + add: () => Promise.resolve({ data: {} }), + }, + } as unknown as OpencodeClient + const closing = ACPNextService.make({ sdk, session: sessionService }) + await Effect.runPromise(sessionService.create({ id: "ses_close", cwd: "/workspace" })) + + expect(await Effect.runPromise(closing.closeSession({ sessionId: "ses_close" }))).toEqual({}) + expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({}) + }) + + it("forks a session, loads fork state, and returns config options", async () => { + const { service, forks } = makeService([ + { + info: { + role: "assistant", + providerID: "test", + modelID: "second-model", + variant: "medium", + mode: "plan", + }, + parts: [], + }, + ]) + const forked = await Effect.runPromise( + service.forkSession({ cwd: "/workspace", sessionId: "ses_parent", mcpServers: [] }), + ) + const updated = await Effect.runPromise( + service.setSessionConfigOption({ sessionId: forked.sessionId, configId: "effort", value: "low" }), + ) + + expect(forked.sessionId).toBe("fork_ses_parent") + expect(select(forked, "model")?.currentValue).toBe("test/second-model") + expect(select(forked, "effort")?.currentValue).toBe("medium") + expect(select(updated, "effort")?.currentValue).toBe("low") + expect(forks).toEqual(["ses_parent"]) + }) + it("restores model variant and mode from the latest user message", async () => { const { service } = makeService([ { @@ -522,8 +663,11 @@ function categories(result: NewSessionResponse | LoadSessionResponse) { return result.configOptions?.map((option) => option.category) ?? [] } -function select(result: SetSessionConfigOptionResponse, id: string) { - return result.configOptions.find( +function select( + result: SetSessionConfigOptionResponse | ResumeSessionResponse | NewSessionResponse | ForkSessionResponse, + id: string, +) { + return result.configOptions?.find( (option): option is Extract => option.id === id && option.type === "select", ) diff --git a/packages/opencode/test/cli/acp-next/acp-next-process.test.ts b/packages/opencode/test/cli/acp-next/acp-next-process.test.ts index c4e88fb74ecd..95b905543b47 100644 --- a/packages/opencode/test/cli/acp-next/acp-next-process.test.ts +++ b/packages/opencode/test/cli/acp-next/acp-next-process.test.ts @@ -1,9 +1,11 @@ import { describe, expect } from "bun:test" import type { AuthenticateResponse, + CloseSessionResponse, InitializeResponse, LoadSessionResponse, NewSessionResponse, + ResumeSessionResponse, SessionNotification, SetSessionConfigOptionResponse, } from "@agentclientprotocol/sdk" @@ -31,7 +33,10 @@ describe("opencode acp-next (subprocess)", () => { expect(initialized.agentCapabilities?.mcpCapabilities?.http).toBe(true) expect(initialized.agentCapabilities?.mcpCapabilities?.sse).toBe(true) expect(initialized.agentCapabilities?.loadSession).toBe(true) - expect(initialized.agentCapabilities?.sessionCapabilities).toBeUndefined() + expect(initialized.agentCapabilities?.sessionCapabilities?.close).toEqual({}) + expect(initialized.agentCapabilities?.sessionCapabilities?.fork).toEqual({}) + expect(initialized.agentCapabilities?.sessionCapabilities?.list).toEqual({}) + expect(initialized.agentCapabilities?.sessionCapabilities?.resume).toEqual({}) expect(initialized.agentInfo?.name).toBe("OpenCode") expect(initialized.authMethods?.[0]?.id).toBe("opencode-login") expect(initialized.authMethods?.[0]?._meta?.["terminal-auth"]).toBeDefined() @@ -159,6 +164,55 @@ describe("opencode acp-next (subprocess)", () => { 60_000, ) + cliIt.live( + "advertises and supports close behind OPENCODE_ACP_NEXT", + ({ home, llm, opencode }) => + Effect.gen(function* () { + const acp = createAcpClient( + yield* opencode.acp({ + env: { + OPENCODE_ACP_NEXT: "1", + OPENCODE_CONFIG_CONTENT: JSON.stringify(verifierConfig(llm.url)), + }, + }), + ) + const initialized = expectOk(yield* acp.request("initialize", { protocolVersion: 1 })) + expect(initialized.agentCapabilities?.sessionCapabilities?.close).toEqual({}) + const session = expectOk(yield* acp.request("session/new", { cwd: home, mcpServers: [] })) + + expectOk(yield* acp.request("session/close", { sessionId: session.sessionId })) + }), + 60_000, + ) + + cliIt.live( + "advertises and supports resume behind OPENCODE_ACP_NEXT", + ({ home, llm, opencode }) => + Effect.gen(function* () { + const acp = createAcpClient( + yield* opencode.acp({ + env: { + OPENCODE_ACP_NEXT: "1", + OPENCODE_CONFIG_CONTENT: JSON.stringify(verifierConfig(llm.url)), + }, + }), + ) + const initialized = expectOk(yield* acp.request("initialize", { protocolVersion: 1 })) + expect(initialized.agentCapabilities?.sessionCapabilities?.resume).toEqual({}) + const session = expectOk(yield* acp.request("session/new", { cwd: home, mcpServers: [] })) + const resumed = expectOk( + yield* acp.request("session/resume", { + cwd: home, + sessionId: session.sessionId, + mcpServers: [], + }), + ) + + expect(selectConfigOption(resumed.configOptions, "model")?.category).toBe("model") + }), + 60_000, + ) + cliIt.live( "exits cleanly when flagged stdin is closed", ({ opencode }) =>