From 4d49a72905bd45e077603ca6123dd9580ef75c47 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Tue, 12 May 2026 15:03:12 +0530 Subject: [PATCH 1/2] feat(background): add job service --- packages/opencode/src/background/job.ts | 172 ++++++++++++++++++ packages/opencode/src/effect/app-runtime.ts | 2 + packages/opencode/src/id/id.ts | 1 + packages/opencode/test/background/job.test.ts | 94 ++++++++++ 4 files changed, 269 insertions(+) create mode 100644 packages/opencode/src/background/job.ts create mode 100644 packages/opencode/test/background/job.test.ts diff --git a/packages/opencode/src/background/job.ts b/packages/opencode/src/background/job.ts new file mode 100644 index 000000000000..457d57eae17f --- /dev/null +++ b/packages/opencode/src/background/job.ts @@ -0,0 +1,172 @@ +import { InstanceState } from "@/effect/instance-state" +import { Identifier } from "@/id/id" +import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope } from "effect" + +export type Status = "running" | "completed" | "error" | "cancelled" + +export type Info = { + id: string + type: string + title?: string + status: Status + started_at: number + completed_at?: number + output?: string + error?: string + metadata?: Record +} + +type Active = { + info: Info + done: Deferred.Deferred + fiber?: Fiber.Fiber +} + +type State = { + jobs: Map + scope: Scope.Scope +} + +export type StartInput = { + id?: string + type: string + title?: string + metadata?: Record + run: Effect.Effect +} + +export type WaitInput = { + id: string + timeout?: number +} + +export type WaitResult = { + info?: Info + timedOut: boolean +} + +export interface Interface { + readonly list: () => Effect.Effect + readonly get: (id: string) => Effect.Effect + readonly start: (input: StartInput) => Effect.Effect + readonly wait: (input: WaitInput) => Effect.Effect + readonly cancel: (id: string) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/BackgroundJob") {} + +function snapshot(job: Active): Info { + return { + ...job.info, + ...(job.info.metadata ? { metadata: { ...job.info.metadata } } : {}), + } +} + +function errorText(error: unknown) { + if (error instanceof Error) return error.message + return String(error) +} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const state = yield* InstanceState.make( + Effect.fn("BackgroundJob.state")(function* () { + return { + jobs: new Map(), + scope: yield* Scope.Scope, + } + }), + ) + + const finish = Effect.fn("BackgroundJob.finish")(function* ( + job: Active, + status: Exclude, + data?: { output?: string; error?: string }, + ) { + if (job.info.status !== "running") return snapshot(job) + job.info.status = status + job.info.completed_at = yield* Clock.currentTimeMillis + if (data?.output !== undefined) job.info.output = data.output + if (data?.error !== undefined) job.info.error = data.error + job.fiber = undefined + const info = snapshot(job) + yield* Deferred.succeed(job.done, info).pipe(Effect.ignore) + return info + }) + + const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () { + return Array.from((yield* InstanceState.get(state)).jobs.values()) + .map(snapshot) + .toSorted((a, b) => a.started_at - b.started_at) + }) + + const get: Interface["get"] = Effect.fn("BackgroundJob.get")(function* (id) { + const job = (yield* InstanceState.get(state)).jobs.get(id) + if (!job) return + return snapshot(job) + }) + + const start: Interface["start"] = Effect.fn("BackgroundJob.start")(function* (input) { + const s = yield* InstanceState.get(state) + const id = input.id ?? Identifier.ascending("job") + const existing = s.jobs.get(id) + if (existing?.info.status === "running") return snapshot(existing) + + const job: Active = { + info: { + id, + type: input.type, + title: input.title, + status: "running", + started_at: yield* Clock.currentTimeMillis, + metadata: input.metadata, + }, + done: yield* Deferred.make(), + } + s.jobs.set(id, job) + job.fiber = yield* input.run.pipe( + Effect.matchCauseEffect({ + onSuccess: (output) => finish(job, "completed", { output }), + onFailure: (cause) => + finish(job, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", { + error: errorText(Cause.squash(cause)), + }), + }), + Effect.asVoid, + Effect.forkIn(s.scope, { startImmediately: true }), + ) + return snapshot(job) + }) + + const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) { + const job = (yield* InstanceState.get(state)).jobs.get(input.id) + if (!job) return { timedOut: false } + if (job.info.status !== "running") return { info: snapshot(job), timedOut: false } + if (input.timeout === undefined) return { info: yield* Deferred.await(job.done), timedOut: false } + if (input.timeout <= 0) return { info: snapshot(job), timedOut: true } + const info = yield* Deferred.await(job.done).pipe(Effect.timeoutOption(input.timeout)) + if (info._tag === "Some") return { info: info.value, timedOut: false } + return { info: snapshot(job), timedOut: true } + }) + + const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) { + const job = (yield* InstanceState.get(state)).jobs.get(id) + if (!job) return + if (job.info.status !== "running") return snapshot(job) + const fiber = job.fiber + const info = yield* finish(job, "cancelled") + if (fiber) { + yield* Fiber.interrupt(fiber).pipe(Effect.ignore) + yield* Fiber.await(fiber).pipe(Effect.ignore) + } + return info + }) + + return Service.of({ list, get, start, wait, cancel }) + }), +) + +export const defaultLayer = layer + +export * as BackgroundJob from "./job" diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index 4c1637006c92..b0efab1ae98f 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -55,6 +55,7 @@ import { SyncEvent } from "@/sync" import { Npm } from "@opencode-ai/core/npm" import { memoMap } from "@opencode-ai/core/effect/memo-map" import { DataMigration } from "@/data-migration" +import { BackgroundJob } from "@/background/job" export const AppLayer = Layer.mergeAll( Npm.defaultLayer, @@ -81,6 +82,7 @@ export const AppLayer = Layer.mergeAll( Todo.defaultLayer, Session.defaultLayer, SessionStatus.defaultLayer, + BackgroundJob.defaultLayer, SessionRunState.defaultLayer, SessionProcessor.defaultLayer, SessionCompaction.defaultLayer, diff --git a/packages/opencode/src/id/id.ts b/packages/opencode/src/id/id.ts index 9e163cd6b8c6..847a5c032924 100644 --- a/packages/opencode/src/id/id.ts +++ b/packages/opencode/src/id/id.ts @@ -1,6 +1,7 @@ import { randomBytes } from "crypto" const prefixes = { + job: "job", event: "evt", session: "ses", message: "msg", diff --git a/packages/opencode/test/background/job.test.ts b/packages/opencode/test/background/job.test.ts new file mode 100644 index 000000000000..240cb6cd6b9d --- /dev/null +++ b/packages/opencode/test/background/job.test.ts @@ -0,0 +1,94 @@ +import { describe, expect } from "bun:test" +import { Deferred, Effect } from "effect" +import { BackgroundJob } from "@/background/job" +import { testEffect } from "../lib/effect" + +const it = testEffect(BackgroundJob.defaultLayer) + +describe("background.job", () => { + it.instance("tracks started jobs through completion", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const latch = yield* Deferred.make() + const job = yield* jobs.start({ + type: "test", + title: "test job", + run: Deferred.await(latch).pipe(Effect.as("done")), + }) + + expect(job.id.startsWith("job_")).toBe(true) + expect(job.status).toBe("running") + expect(job.title).toBe("test job") + + yield* Deferred.succeed(latch, undefined) + const done = yield* jobs.wait({ id: job.id }) + + expect(done.timedOut).toBe(false) + expect(done.info?.status).toBe("completed") + expect(done.info?.output).toBe("done") + expect((yield* jobs.list()).map((item) => item.id)).toEqual([job.id]) + }), + ) + + it.instance("returns a running snapshot when wait times out", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const job = yield* jobs.start({ + type: "test", + run: Effect.never, + }) + + const result = yield* jobs.wait({ id: job.id, timeout: 1 }) + + expect(result.timedOut).toBe(true) + expect(result.info?.status).toBe("running") + }), + ) + + it.instance("records failed jobs", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const job = yield* jobs.start({ + type: "test", + run: Effect.fail(new Error("boom")), + }) + + const result = yield* jobs.wait({ id: job.id }) + + expect(result.info?.status).toBe("error") + expect(result.info?.error).toBe("boom") + }), + ) + + it.instance("can cancel running jobs", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const interrupted = yield* Deferred.make() + const job = yield* jobs.start({ + type: "test", + run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(interrupted, undefined))), + }) + + const cancelled = yield* jobs.cancel(job.id) + + expect(cancelled?.status).toBe("cancelled") + yield* Deferred.await(interrupted).pipe(Effect.timeout("1 second")) + expect((yield* jobs.get(job.id))?.status).toBe("cancelled") + }), + ) + + it.instance("returns immutable snapshots", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const job = yield* jobs.start({ + type: "test", + metadata: { value: "initial" }, + run: Effect.succeed("done"), + }) + + if (job.metadata) job.metadata.value = "changed" + + expect((yield* jobs.get(job.id))?.metadata?.value).toBe("initial") + }), + ) +}) From 048f70ead5d4c345ff9441bb74d4bcd5cb26ccb6 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Tue, 12 May 2026 15:12:31 +0530 Subject: [PATCH 2/2] fix(background): make job state transitions atomic --- packages/opencode/src/background/job.ts | 124 +++++++++++------- packages/opencode/test/background/job.test.ts | 33 +++++ 2 files changed, 109 insertions(+), 48 deletions(-) diff --git a/packages/opencode/src/background/job.ts b/packages/opencode/src/background/job.ts index 457d57eae17f..3ea228f048c5 100644 --- a/packages/opencode/src/background/job.ts +++ b/packages/opencode/src/background/job.ts @@ -1,6 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Identifier } from "@/id/id" -import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope } from "effect" +import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope, SynchronizedRef } from "effect" export type Status = "running" | "completed" | "error" | "cancelled" @@ -23,10 +23,15 @@ type Active = { } type State = { - jobs: Map + jobs: SynchronizedRef.SynchronizedRef> scope: Scope.Scope } +type FinishResult = { + info?: Info + done?: Deferred.Deferred +} + export type StartInput = { id?: string type: string @@ -73,74 +78,98 @@ export const layer = Layer.effect( const state = yield* InstanceState.make( Effect.fn("BackgroundJob.state")(function* () { return { - jobs: new Map(), + jobs: yield* SynchronizedRef.make(new Map()), scope: yield* Scope.Scope, } }), ) const finish = Effect.fn("BackgroundJob.finish")(function* ( - job: Active, + id: string, status: Exclude, data?: { output?: string; error?: string }, ) { - if (job.info.status !== "running") return snapshot(job) - job.info.status = status - job.info.completed_at = yield* Clock.currentTimeMillis - if (data?.output !== undefined) job.info.output = data.output - if (data?.error !== undefined) job.info.error = data.error - job.fiber = undefined - const info = snapshot(job) - yield* Deferred.succeed(job.done, info).pipe(Effect.ignore) - return info + const completed_at = yield* Clock.currentTimeMillis + const result = yield* SynchronizedRef.modify( + (yield* InstanceState.get(state)).jobs, + (jobs): readonly [FinishResult, Map] => { + const job = jobs.get(id) + if (!job) return [{}, jobs] + if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs] + const next = { + ...job, + fiber: undefined, + info: { + ...job.info, + status, + completed_at, + ...(data?.output !== undefined ? { output: data.output } : {}), + ...(data?.error !== undefined ? { error: data.error } : {}), + }, + } + return [{ info: snapshot(next), done: job.done }, new Map(jobs).set(id, next)] + }, + ) + if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore) + return result.info }) const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () { - return Array.from((yield* InstanceState.get(state)).jobs.values()) + return Array.from((yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).values()) .map(snapshot) .toSorted((a, b) => a.started_at - b.started_at) }) const get: Interface["get"] = Effect.fn("BackgroundJob.get")(function* (id) { - const job = (yield* InstanceState.get(state)).jobs.get(id) + const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id) if (!job) return return snapshot(job) }) const start: Interface["start"] = Effect.fn("BackgroundJob.start")(function* (input) { - const s = yield* InstanceState.get(state) - const id = input.id ?? Identifier.ascending("job") - const existing = s.jobs.get(id) - if (existing?.info.status === "running") return snapshot(existing) - - const job: Active = { - info: { - id, - type: input.type, - title: input.title, - status: "running", - started_at: yield* Clock.currentTimeMillis, - metadata: input.metadata, - }, - done: yield* Deferred.make(), - } - s.jobs.set(id, job) - job.fiber = yield* input.run.pipe( - Effect.matchCauseEffect({ - onSuccess: (output) => finish(job, "completed", { output }), - onFailure: (cause) => - finish(job, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", { - error: errorText(Cause.squash(cause)), + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const s = yield* InstanceState.get(state) + const id = input.id ?? Identifier.ascending("job") + const started_at = yield* Clock.currentTimeMillis + const done = yield* Deferred.make() + return yield* SynchronizedRef.modifyEffect( + s.jobs, + Effect.fnUntraced(function* (jobs) { + const existing = jobs.get(id) + if (existing?.info.status === "running") return [snapshot(existing), jobs] as const + const fiber = yield* restore(input.run).pipe( + Effect.matchCauseEffect({ + onSuccess: (output) => finish(id, "completed", { output }), + onFailure: (cause) => + finish(id, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", { + error: errorText(Cause.squash(cause)), + }), + }), + Effect.asVoid, + Effect.forkIn(s.scope, { startImmediately: true }), + ) + const job = { + info: { + id, + type: input.type, + title: input.title, + status: "running" as const, + started_at, + metadata: input.metadata, + }, + done, + fiber, + } + return [snapshot(job), new Map(jobs).set(id, job)] as const }), + ) }), - Effect.asVoid, - Effect.forkIn(s.scope, { startImmediately: true }), ) - return snapshot(job) }) const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) { - const job = (yield* InstanceState.get(state)).jobs.get(input.id) + const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(input.id) if (!job) return { timedOut: false } if (job.info.status !== "running") return { info: snapshot(job), timedOut: false } if (input.timeout === undefined) return { info: yield* Deferred.await(job.done), timedOut: false } @@ -151,15 +180,14 @@ export const layer = Layer.effect( }) const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) { - const job = (yield* InstanceState.get(state)).jobs.get(id) + const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id) if (!job) return if (job.info.status !== "running") return snapshot(job) - const fiber = job.fiber - const info = yield* finish(job, "cancelled") - if (fiber) { - yield* Fiber.interrupt(fiber).pipe(Effect.ignore) - yield* Fiber.await(fiber).pipe(Effect.ignore) + if (job.fiber) { + yield* Fiber.interrupt(job.fiber).pipe(Effect.ignore) + yield* Fiber.await(job.fiber).pipe(Effect.ignore) } + const info = yield* finish(id, "cancelled") return info }) diff --git a/packages/opencode/test/background/job.test.ts b/packages/opencode/test/background/job.test.ts index 240cb6cd6b9d..afc7260bb82c 100644 --- a/packages/opencode/test/background/job.test.ts +++ b/packages/opencode/test/background/job.test.ts @@ -45,6 +45,39 @@ describe("background.job", () => { }), ) + it.instance("deduplicates concurrent starts for a running id", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const started = yield* Deferred.make() + const id = "job_test" + const [first, second] = yield* Effect.all( + [ + jobs.start({ + id, + type: "test", + run: Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)), + }), + jobs.start({ + id, + type: "test", + run: Effect.fail(new Error("duplicate started")), + }), + ], + { concurrency: "unbounded" }, + ) + + yield* Deferred.await(started) + + expect(first.id).toBe(id) + expect(second.id).toBe(id) + expect(first.status).toBe("running") + expect(second.status).toBe("running") + expect((yield* jobs.list()).map((item) => item.id)).toEqual([id]) + + yield* jobs.cancel(id) + }), + ) + it.instance("records failed jobs", () => Effect.gen(function* () { const jobs = yield* BackgroundJob.Service