From 4e218610b26f88d844dd30aa7f3603fb9e8f4b17 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 26 Apr 2026 21:08:33 -0400 Subject: [PATCH 1/4] fix(session): harden shell cancellation --- packages/opencode/src/effect/runner.ts | 35 ++-- packages/opencode/src/session/prompt.ts | 257 ++++++++++++------------ 2 files changed, 151 insertions(+), 141 deletions(-) diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 0e923b1194d0..e03cbffb11e5 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -18,6 +18,7 @@ interface RunHandle { interface ShellHandle { id: number + cancelled: Deferred.Deferred fiber: Fiber.Fiber } @@ -59,6 +60,9 @@ export const make = ( ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid) : Deferred.done(done, exit).pipe(Effect.asVoid) + const awaitDone = (done: Deferred.Deferred) => + Deferred.await(done).pipe(Effect.catchTag("RunnerCancelled", (e) => onInterrupt ?? Effect.die(e))) + const idleIfCurrent = () => SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten) @@ -89,7 +93,9 @@ export const make = ( SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { - if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const + if (st._tag === "Shell" && st.shell.id === id) { + return [idle, { _tag: "Idle" }] as const + } if (st._tag === "ShellThenRun" && st.shell.id === id) { const run = yield* startRun(st.run.work, st.run.done) return [Effect.void, { _tag: "Running", run }] as const @@ -98,7 +104,11 @@ export const make = ( }), ).pipe(Effect.flatten) - const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) + const stopShell = (shell: ShellHandle) => + Effect.gen(function* () { + yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid) + yield* Fiber.interrupt(shell.fiber) + }) const ensureRunning = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -107,28 +117,23 @@ export const make = ( switch (st._tag) { case "Running": case "ShellThenRun": - return [Deferred.await(st.run.done), st] as const + return [awaitDone(st.run.done), st] as const case "Shell": { const run = { id: next(), done: yield* Deferred.make(), work, } satisfies PendingHandle - return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const + return [awaitDone(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const } case "Idle": { const done = yield* Deferred.make() const run = yield* startRun(work, done) - return [Deferred.await(done), { _tag: "Running", run }] as const + return [awaitDone(done), { _tag: "Running", run }] as const } } }), - ).pipe( - Effect.flatten, - Effect.catch( - (e): Effect.Effect => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)), - ), - ) + ).pipe(Effect.flatten) const startShell = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -145,13 +150,17 @@ export const make = ( } yield* busy const id = next() + const cancelled = yield* Deferred.make() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber } satisfies ShellHandle + const shell = { id, cancelled, fiber } satisfies ShellHandle return [ Effect.gen(function* () { const exit = yield* Fiber.await(fiber) if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt + if ((yield* Deferred.isDone(cancelled)) || Cause.hasInterruptsOnly(exit.cause)) { + if (onInterrupt) return yield* onInterrupt + return yield* Effect.die(new Cancelled()) + } return yield* Effect.failCause(exit.cause) }), { _tag: "Shell", shell }, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index f7306280fef6..a94f520c30f1 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -721,142 +721,143 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { - const ctx = yield* InstanceState.context - const run = yield* runner() - const session = yield* sessions.get(input.sessionID) - if (session.revert) { - yield* revert.cleanup(session) - } - const agent = yield* agents.get(input.agent) - if (!agent) { - const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) - const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" - const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) - yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) - throw error - } - const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) - const userMsg: MessageV2.User = { - id: input.messageID ?? MessageID.ascending(), - sessionID: input.sessionID, - time: { created: Date.now() }, - role: "user", - agent: input.agent, - model: { providerID: model.providerID, modelID: model.modelID }, - } - yield* sessions.updateMessage(userMsg) - const userPart: MessageV2.Part = { - type: "text", - id: PartID.ascending(), - messageID: userMsg.id, - sessionID: input.sessionID, - text: "The following tool was executed by the user", - synthetic: true, - } - yield* sessions.updatePart(userPart) - - const msg: MessageV2.Assistant = { - id: MessageID.ascending(), - sessionID: input.sessionID, - parentID: userMsg.id, - mode: input.agent, - agent: input.agent, - cost: 0, - path: { cwd: ctx.directory, root: ctx.worktree }, - time: { created: Date.now() }, - role: "assistant", - tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, - modelID: model.modelID, - providerID: model.providerID, - } - yield* sessions.updateMessage(msg) - const part: MessageV2.ToolPart = { - type: "tool", - id: PartID.ascending(), - messageID: msg.id, - sessionID: input.sessionID, - tool: "bash", - callID: ulid(), - state: { - status: "running", - time: { start: Date.now() }, - input: { command: input.command }, - }, - } - yield* sessions.updatePart(part) - - const cfg = yield* config.get() - const sh = Shell.preferred(cfg.shell) - const cwd = ctx.directory - const args = Shell.args(sh, input.command, cwd) - const shellEnv = yield* plugin.trigger( - "shell.env", - { cwd, sessionID: input.sessionID, callID: part.callID }, - { env: {} }, - ) - - const cmd = ChildProcess.make(sh, args, { - cwd, - extendEnv: true, - env: { ...shellEnv.env, TERM: "dumb" }, - stdin: "ignore", - forceKillAfter: "3 seconds", - }) - - let output = "" - let aborted = false - const finish = Effect.uninterruptible( + return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") - } - if (!msg.time.completed) { - msg.time.completed = Date.now() + const { msg, part, cwd } = yield* Effect.gen(function* () { + const ctx = yield* InstanceState.context + const session = yield* sessions.get(input.sessionID) + if (session.revert) { + yield* revert.cleanup(session) + } + const agent = yield* agents.get(input.agent) + if (!agent) { + const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) + const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" + const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) + yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) + throw error + } + const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) + const userMsg: MessageV2.User = { + id: input.messageID ?? MessageID.ascending(), + sessionID: input.sessionID, + time: { created: Date.now() }, + role: "user", + agent: input.agent, + model: { providerID: model.providerID, modelID: model.modelID }, + } + yield* sessions.updateMessage(userMsg) + const userPart: MessageV2.Part = { + type: "text", + id: PartID.ascending(), + messageID: userMsg.id, + sessionID: input.sessionID, + text: "The following tool was executed by the user", + synthetic: true, + } + yield* sessions.updatePart(userPart) + + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: input.sessionID, + parentID: userMsg.id, + mode: input.agent, + agent: input.agent, + cost: 0, + path: { cwd: ctx.directory, root: ctx.worktree }, + time: { created: Date.now() }, + role: "assistant", + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: model.modelID, + providerID: model.providerID, + } yield* sessions.updateMessage(msg) - } - if (part.state.status === "running") { - part.state = { - status: "completed", - time: { ...part.state.time, end: Date.now() }, - input: part.state.input, - title: "", - metadata: { output, description: "" }, - output, + const part: MessageV2.ToolPart = { + type: "tool", + id: PartID.ascending(), + messageID: msg.id, + sessionID: input.sessionID, + tool: "bash", + callID: ulid(), + state: { + status: "running", + time: { start: Date.now() }, + input: { command: input.command }, + }, } yield* sessions.updatePart(part) - } - }), - ) + return { msg, part, cwd: ctx.directory } + }) - const exit = yield* Effect.gen(function* () { - const handle = yield* spawner.spawn(cmd) - yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => - Effect.sync(() => { - output += chunk - if (part.state.status === "running") { - part.state.metadata = { output, description: "" } - void run.fork(sessions.updatePart(part)) - } - }), - ) - yield* handle.exitCode - }).pipe( - Effect.scoped, - Effect.onInterrupt(() => - Effect.sync(() => { + const cfg = yield* config.get() + const sh = Shell.preferred(cfg.shell) + const args = Shell.args(sh, input.command, cwd) + let output = "" + let aborted = false + + const finish = Effect.uninterruptible( + Effect.gen(function* () { + if (aborted) { + output += "\n\n" + ["", "User aborted the command", ""].join("\n") + } + if (!msg.time.completed) { + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + } + if (part.state.status === "running") { + part.state = { + status: "completed", + time: { ...part.state.time, end: Date.now() }, + input: part.state.input, + title: "", + metadata: { output, description: "" }, + output, + } + yield* sessions.updatePart(part) + } + }), + ) + + const exit = yield* restore( + Effect.gen(function* () { + const shellEnv = yield* plugin.trigger( + "shell.env", + { cwd, sessionID: input.sessionID, callID: part.callID }, + { env: {} }, + ) + const cmd = ChildProcess.make(sh, args, { + cwd, + extendEnv: true, + env: { ...shellEnv.env, TERM: "dumb" }, + stdin: "ignore", + forceKillAfter: "3 seconds", + }) + const handle = yield* spawner.spawn(cmd) + yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => + Effect.gen(function* () { + output += chunk + if (part.state.status === "running") { + part.state.metadata = { output, description: "" } + yield* sessions.updatePart(part) + } + }), + ) + yield* handle.exitCode + }).pipe(Effect.scoped, Effect.orDie), + ).pipe(Effect.exit) + + if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)) { aborted = true - }), - ), - Effect.orDie, - Effect.ensuring(finish), - Effect.exit, - ) + } + yield* finish - if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) { - return yield* Effect.failCause(exit.cause) - } + if (Exit.isFailure(exit) && !aborted && !Cause.hasInterruptsOnly(exit.cause)) { + return yield* Effect.failCause(exit.cause) + } - return { info: msg, parts: [part] } + return { info: msg, parts: [part] } + }), + ) }) const getModel = Effect.fn("SessionPrompt.getModel")(function* ( From 042748a1b19aa1f7a069f2e7cc57429a5f017fda Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 27 Apr 2026 15:34:16 -0400 Subject: [PATCH 2/4] fix(session): close shell cancellation races --- packages/opencode/src/effect/runner.ts | 12 +++++++----- packages/opencode/src/session/prompt.ts | 11 +++++++---- packages/opencode/src/session/run-state.ts | 6 ++++-- packages/opencode/test/effect/runner.test.ts | 16 ++++++++++++++++ packages/opencode/test/session/prompt.test.ts | 4 ++++ 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index e03cbffb11e5..bbc85309e81b 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -4,7 +4,7 @@ export interface Runner { readonly state: State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect - readonly startShell: (work: Effect.Effect) => Effect.Effect + readonly startShell: (work: Effect.Effect, ready?: Deferred.Deferred) => Effect.Effect readonly cancel: Effect.Effect } @@ -19,6 +19,7 @@ interface RunHandle { interface ShellHandle { id: number cancelled: Deferred.Deferred + ready?: Deferred.Deferred fiber: Fiber.Fiber } @@ -106,6 +107,7 @@ export const make = ( const stopShell = (shell: ShellHandle) => Effect.gen(function* () { + if (shell.ready) yield* Deferred.await(shell.ready).pipe(Effect.exit, Effect.asVoid) yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid) yield* Fiber.interrupt(shell.fiber) }) @@ -135,7 +137,7 @@ export const make = ( }), ).pipe(Effect.flatten) - const startShell = (work: Effect.Effect) => + const startShell = (work: Effect.Effect, ready?: Deferred.Deferred) => SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { @@ -152,12 +154,12 @@ export const make = ( const id = next() const cancelled = yield* Deferred.make() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, cancelled, fiber } satisfies ShellHandle + const shell = { id, cancelled, ready, fiber } satisfies ShellHandle return [ Effect.gen(function* () { const exit = yield* Fiber.await(fiber) if (Exit.isSuccess(exit)) return exit.value - if ((yield* Deferred.isDone(cancelled)) || Cause.hasInterruptsOnly(exit.cause)) { + if (Cause.hasInterruptsOnly(exit.cause) || ((yield* Deferred.isDone(cancelled)) && !Cause.hasDies(exit.cause))) { if (onInterrupt) return yield* onInterrupt return yield* Effect.die(new Cancelled()) } @@ -192,8 +194,8 @@ export const make = ( case "ShellThenRun": return [ Effect.gen(function* () { - yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) yield* stopShell(st.shell) + yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) yield* idleIfCurrent() }), { _tag: "Idle" } as const, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index a94f520c30f1..2f359610a3cf 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -45,7 +45,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" -import { Cause, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect" +import { Cause, Deferred, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect" import { zod } from "@/util/effect-zod" import { withStatics } from "@/util/schema" import * as EffectLogger from "@opencode-ai/core/effect/logger" @@ -720,9 +720,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the } satisfies MessageV2.TextPart) }) - const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { + const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Deferred.Deferred) { return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { + const markReady = ready ? Deferred.succeed(ready, undefined).pipe(Effect.asVoid) : Effect.void const { msg, part, cwd } = yield* Effect.gen(function* () { const ctx = yield* InstanceState.context const session = yield* sessions.get(input.sessionID) @@ -786,8 +787,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, } yield* sessions.updatePart(part) + yield* markReady return { msg, part, cwd: ctx.directory } - }) + }).pipe(Effect.onExit(() => markReady)) const cfg = yield* config.get() const sh = Shell.preferred(cfg.shell) @@ -1508,7 +1510,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the const shell: (input: ShellInput) => Effect.Effect = Effect.fn("SessionPrompt.shell")( function* (input: ShellInput) { - return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input)) + const ready = yield* Deferred.make() + return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready) }, ) diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 4b210d63d717..5ad95c9c7d1f 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -1,6 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Runner } from "@/effect/runner" -import { Effect, Layer, Scope, Context } from "effect" +import { Deferred, Effect, Layer, Scope, Context } from "effect" import * as Session from "./session" import { MessageV2 } from "./message-v2" import { SessionID } from "./schema" @@ -18,6 +18,7 @@ export interface Interface { sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, + ready?: Deferred.Deferred, ) => Effect.Effect } @@ -95,8 +96,9 @@ export const layer = Layer.effect( sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, + ready?: Deferred.Deferred, ) { - return yield* (yield* runner(sessionID, onInterrupt)).startShell(work) + return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready) }) return Service.of({ assertNotBusy, cancel, ensureRunning, startShell }) diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index 4b0fbc1b51fa..ee99050a8c83 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -334,6 +334,22 @@ describe("Runner", () => { }), ) + it.live( + "cancel does not mask shell defects", + Effect.gen(function* () { + const s = yield* Scope.Scope + const runner = Runner.make(s, { onInterrupt: Effect.succeed("interrupted") }) + + const sh = yield* runner + .startShell(Effect.never.pipe(Effect.ensuring(Effect.die("boom")), Effect.as("ignored"))) + .pipe(Effect.forkChild) + yield* Effect.sleep("10 millis") + + yield* runner.cancel + expect(Exit.isFailure(yield* Fiber.await(sh))).toBe(true) + }), + ) + // --- shell→run handoff --- it.live( diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 422c1400c9fc..53305694018c 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -1470,6 +1470,10 @@ unix( const exit = yield* Fiber.await(loop) expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + const tool = completedTool(exit.value.parts) + expect(tool?.state.output).toContain("User aborted the command") + } yield* Fiber.await(sh) }), From 8df9f54c1def26c91f222868c969f4a9165695a7 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 27 Apr 2026 16:33:06 -0400 Subject: [PATCH 3/4] fix(session): narrow shell cancel fallback --- packages/opencode/src/effect/runner.ts | 5 ++++- packages/opencode/src/session/prompt.ts | 5 ++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index bbc85309e81b..cbd2359e912c 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -159,7 +159,10 @@ export const make = ( Effect.gen(function* () { const exit = yield* Fiber.await(fiber) if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) || ((yield* Deferred.isDone(cancelled)) && !Cause.hasDies(exit.cause))) { + if ( + Cause.hasInterruptsOnly(exit.cause) || + ((yield* Deferred.isDone(cancelled)) && Cause.hasInterrupts(exit.cause) && !Cause.hasDies(exit.cause)) + ) { if (onInterrupt) return yield* onInterrupt return yield* Effect.die(new Cancelled()) } diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 2f359610a3cf..22d1da4e8f6e 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -787,9 +787,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the }, } yield* sessions.updatePart(part) - yield* markReady return { msg, part, cwd: ctx.directory } - }).pipe(Effect.onExit(() => markReady)) + }).pipe(Effect.ensuring(markReady)) const cfg = yield* config.get() const sh = Shell.preferred(cfg.shell) @@ -848,7 +847,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }).pipe(Effect.scoped, Effect.orDie), ).pipe(Effect.exit) - if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)) { + if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause) && !Cause.hasDies(exit.cause)) { aborted = true } yield* finish From 9a362bd06de365ab61a9897673c308400c2790de Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 27 Apr 2026 16:39:13 -0400 Subject: [PATCH 4/4] refactor(session): use latch for shell readiness --- packages/opencode/src/effect/runner.ts | 10 +++++----- packages/opencode/src/session/prompt.ts | 8 ++++---- packages/opencode/src/session/run-state.ts | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index cbd2359e912c..1e7d4c2966c7 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -1,10 +1,10 @@ -import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect" +import { Cause, Deferred, Effect, Exit, Fiber, Latch, Schema, Scope, SynchronizedRef } from "effect" export interface Runner { readonly state: State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect - readonly startShell: (work: Effect.Effect, ready?: Deferred.Deferred) => Effect.Effect + readonly startShell: (work: Effect.Effect, ready?: Latch.Latch) => Effect.Effect readonly cancel: Effect.Effect } @@ -19,7 +19,7 @@ interface RunHandle { interface ShellHandle { id: number cancelled: Deferred.Deferred - ready?: Deferred.Deferred + ready?: Latch.Latch fiber: Fiber.Fiber } @@ -107,7 +107,7 @@ export const make = ( const stopShell = (shell: ShellHandle) => Effect.gen(function* () { - if (shell.ready) yield* Deferred.await(shell.ready).pipe(Effect.exit, Effect.asVoid) + if (shell.ready) yield* shell.ready.await.pipe(Effect.exit, Effect.asVoid) yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid) yield* Fiber.interrupt(shell.fiber) }) @@ -137,7 +137,7 @@ export const make = ( }), ).pipe(Effect.flatten) - const startShell = (work: Effect.Effect, ready?: Deferred.Deferred) => + const startShell = (work: Effect.Effect, ready?: Latch.Latch) => SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 22d1da4e8f6e..4c259e4aef5a 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -45,7 +45,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" -import { Cause, Deferred, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect" +import { Cause, Effect, Exit, Latch, Layer, Option, Scope, Context, Schema } from "effect" import { zod } from "@/util/effect-zod" import { withStatics } from "@/util/schema" import * as EffectLogger from "@opencode-ai/core/effect/logger" @@ -720,10 +720,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the } satisfies MessageV2.TextPart) }) - const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Deferred.Deferred) { + const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Latch.Latch) { return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - const markReady = ready ? Deferred.succeed(ready, undefined).pipe(Effect.asVoid) : Effect.void + const markReady = ready ? ready.open.pipe(Effect.asVoid) : Effect.void const { msg, part, cwd } = yield* Effect.gen(function* () { const ctx = yield* InstanceState.context const session = yield* sessions.get(input.sessionID) @@ -1509,7 +1509,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const shell: (input: ShellInput) => Effect.Effect = Effect.fn("SessionPrompt.shell")( function* (input: ShellInput) { - const ready = yield* Deferred.make() + const ready = yield* Latch.make() return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready) }, ) diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 5ad95c9c7d1f..9d4986f17436 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -1,6 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Runner } from "@/effect/runner" -import { Deferred, Effect, Layer, Scope, Context } from "effect" +import { Effect, Latch, Layer, Scope, Context } from "effect" import * as Session from "./session" import { MessageV2 } from "./message-v2" import { SessionID } from "./schema" @@ -18,7 +18,7 @@ export interface Interface { sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, - ready?: Deferred.Deferred, + ready?: Latch.Latch, ) => Effect.Effect } @@ -96,7 +96,7 @@ export const layer = Layer.effect( sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, - ready?: Deferred.Deferred, + ready?: Latch.Latch, ) { return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready) })