Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions packages/opencode/src/background/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import { InstanceState } from "@/effect/instance-state"
import { Identifier } from "@/id/id"
import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope, SynchronizedRef } from "effect"

export type Status = "running" | "completed" | "error" | "cancelled"

export type Info = {
id: string
type: string
title?: string
status: Status
started_at: number
completed_at?: number
output?: string
error?: string
metadata?: Record<string, unknown>
}

type Active = {
info: Info
done: Deferred.Deferred<Info>
fiber?: Fiber.Fiber<void, unknown>
}

type State = {
jobs: SynchronizedRef.SynchronizedRef<Map<string, Active>>
scope: Scope.Scope
}

type FinishResult = {
info?: Info
done?: Deferred.Deferred<Info>
}

export type StartInput = {
id?: string
type: string
title?: string
metadata?: Record<string, unknown>
run: Effect.Effect<string, unknown>
}

export type WaitInput = {
id: string
timeout?: number
}

export type WaitResult = {
info?: Info
timedOut: boolean
}

export interface Interface {
readonly list: () => Effect.Effect<Info[]>
readonly get: (id: string) => Effect.Effect<Info | undefined>
readonly start: (input: StartInput) => Effect.Effect<Info>
readonly wait: (input: WaitInput) => Effect.Effect<WaitResult>
readonly cancel: (id: string) => Effect.Effect<Info | undefined>
}

export class Service extends Context.Service<Service, Interface>()("@opencode/BackgroundJob") {}

function snapshot(job: Active): Info {
return {
...job.info,
...(job.info.metadata ? { metadata: { ...job.info.metadata } } : {}),
}
}

function errorText(error: unknown) {
if (error instanceof Error) return error.message
return String(error)
}

export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const state = yield* InstanceState.make<State>(
Effect.fn("BackgroundJob.state")(function* () {
return {
jobs: yield* SynchronizedRef.make(new Map()),
scope: yield* Scope.Scope,
}
}),
)

const finish = Effect.fn("BackgroundJob.finish")(function* (
id: string,
status: Exclude<Status, "running">,
data?: { output?: string; error?: string },
) {
const completed_at = yield* Clock.currentTimeMillis
const result = yield* SynchronizedRef.modify(
(yield* InstanceState.get(state)).jobs,
(jobs): readonly [FinishResult, Map<string, Active>] => {
const job = jobs.get(id)
if (!job) return [{}, jobs]
if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs]
const next = {
...job,
fiber: undefined,
info: {
...job.info,
status,
completed_at,
...(data?.output !== undefined ? { output: data.output } : {}),
...(data?.error !== undefined ? { error: data.error } : {}),
},
}
return [{ info: snapshot(next), done: job.done }, new Map(jobs).set(id, next)]
},
)
if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore)
return result.info
})

const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () {
return Array.from((yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).values())
.map(snapshot)
.toSorted((a, b) => a.started_at - b.started_at)
})

const get: Interface["get"] = Effect.fn("BackgroundJob.get")(function* (id) {
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id)
if (!job) return
return snapshot(job)
})

const start: Interface["start"] = Effect.fn("BackgroundJob.start")(function* (input) {
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const s = yield* InstanceState.get(state)
const id = input.id ?? Identifier.ascending("job")
const started_at = yield* Clock.currentTimeMillis
const done = yield* Deferred.make<Info>()
return yield* SynchronizedRef.modifyEffect(
s.jobs,
Effect.fnUntraced(function* (jobs) {
const existing = jobs.get(id)
if (existing?.info.status === "running") return [snapshot(existing), jobs] as const
const fiber = yield* restore(input.run).pipe(
Effect.matchCauseEffect({
onSuccess: (output) => finish(id, "completed", { output }),
onFailure: (cause) =>
finish(id, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", {
error: errorText(Cause.squash(cause)),
}),
}),
Effect.asVoid,
Effect.forkIn(s.scope, { startImmediately: true }),
)
const job = {
info: {
id,
type: input.type,
title: input.title,
status: "running" as const,
started_at,
metadata: input.metadata,
},
done,
fiber,
}
return [snapshot(job), new Map(jobs).set(id, job)] as const
}),
)
}),
)
})

const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) {
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(input.id)
if (!job) return { timedOut: false }
if (job.info.status !== "running") return { info: snapshot(job), timedOut: false }
if (input.timeout === undefined) return { info: yield* Deferred.await(job.done), timedOut: false }
if (input.timeout <= 0) return { info: snapshot(job), timedOut: true }
const info = yield* Deferred.await(job.done).pipe(Effect.timeoutOption(input.timeout))
if (info._tag === "Some") return { info: info.value, timedOut: false }
return { info: snapshot(job), timedOut: true }
})

const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) {
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id)
if (!job) return
if (job.info.status !== "running") return snapshot(job)
if (job.fiber) {
yield* Fiber.interrupt(job.fiber).pipe(Effect.ignore)
yield* Fiber.await(job.fiber).pipe(Effect.ignore)
}
const info = yield* finish(id, "cancelled")
return info
})

return Service.of({ list, get, start, wait, cancel })
}),
)

export const defaultLayer = layer

export * as BackgroundJob from "./job"
2 changes: 2 additions & 0 deletions packages/opencode/src/effect/app-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import { SyncEvent } from "@/sync"
import { Npm } from "@opencode-ai/core/npm"
import { memoMap } from "@opencode-ai/core/effect/memo-map"
import { DataMigration } from "@/data-migration"
import { BackgroundJob } from "@/background/job"

export const AppLayer = Layer.mergeAll(
Npm.defaultLayer,
Expand All @@ -81,6 +82,7 @@ export const AppLayer = Layer.mergeAll(
Todo.defaultLayer,
Session.defaultLayer,
SessionStatus.defaultLayer,
BackgroundJob.defaultLayer,
SessionRunState.defaultLayer,
SessionProcessor.defaultLayer,
SessionCompaction.defaultLayer,
Expand Down
1 change: 1 addition & 0 deletions packages/opencode/src/id/id.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { randomBytes } from "crypto"

const prefixes = {
job: "job",
event: "evt",
session: "ses",
message: "msg",
Expand Down
127 changes: 127 additions & 0 deletions packages/opencode/test/background/job.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { describe, expect } from "bun:test"
import { Deferred, Effect } from "effect"
import { BackgroundJob } from "@/background/job"
import { testEffect } from "../lib/effect"

const it = testEffect(BackgroundJob.defaultLayer)

describe("background.job", () => {
it.instance("tracks started jobs through completion", () =>
Effect.gen(function* () {
const jobs = yield* BackgroundJob.Service
const latch = yield* Deferred.make<void>()
const job = yield* jobs.start({
type: "test",
title: "test job",
run: Deferred.await(latch).pipe(Effect.as("done")),
})

expect(job.id.startsWith("job_")).toBe(true)
expect(job.status).toBe("running")
expect(job.title).toBe("test job")

yield* Deferred.succeed(latch, undefined)
const done = yield* jobs.wait({ id: job.id })

expect(done.timedOut).toBe(false)
expect(done.info?.status).toBe("completed")
expect(done.info?.output).toBe("done")
expect((yield* jobs.list()).map((item) => item.id)).toEqual([job.id])
}),
)

it.instance("returns a running snapshot when wait times out", () =>
Effect.gen(function* () {
const jobs = yield* BackgroundJob.Service
const job = yield* jobs.start({
type: "test",
run: Effect.never,
})

const result = yield* jobs.wait({ id: job.id, timeout: 1 })

expect(result.timedOut).toBe(true)
expect(result.info?.status).toBe("running")
}),
)

it.instance("deduplicates concurrent starts for a running id", () =>
Effect.gen(function* () {
const jobs = yield* BackgroundJob.Service
const started = yield* Deferred.make<void>()
const id = "job_test"
const [first, second] = yield* Effect.all(
[
jobs.start({
id,
type: "test",
run: Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)),
}),
jobs.start({
id,
type: "test",
run: Effect.fail(new Error("duplicate started")),
}),
],
{ concurrency: "unbounded" },
)

yield* Deferred.await(started)

expect(first.id).toBe(id)
expect(second.id).toBe(id)
expect(first.status).toBe("running")
expect(second.status).toBe("running")
expect((yield* jobs.list()).map((item) => item.id)).toEqual([id])

yield* jobs.cancel(id)
}),
)

it.instance("records failed jobs", () =>
Effect.gen(function* () {
const jobs = yield* BackgroundJob.Service
const job = yield* jobs.start({
type: "test",
run: Effect.fail(new Error("boom")),
})

const result = yield* jobs.wait({ id: job.id })

expect(result.info?.status).toBe("error")
expect(result.info?.error).toBe("boom")
}),
)

it.instance("can cancel running jobs", () =>
Effect.gen(function* () {
const jobs = yield* BackgroundJob.Service
const interrupted = yield* Deferred.make<void>()
const job = yield* jobs.start({
type: "test",
run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(interrupted, undefined))),
})

const cancelled = yield* jobs.cancel(job.id)

expect(cancelled?.status).toBe("cancelled")
yield* Deferred.await(interrupted).pipe(Effect.timeout("1 second"))
expect((yield* jobs.get(job.id))?.status).toBe("cancelled")
}),
)

it.instance("returns immutable snapshots", () =>
Effect.gen(function* () {
const jobs = yield* BackgroundJob.Service
const job = yield* jobs.start({
type: "test",
metadata: { value: "initial" },
run: Effect.succeed("done"),
})

if (job.metadata) job.metadata.value = "changed"

expect((yield* jobs.get(job.id))?.metadata?.value).toBe("initial")
}),
)
})
Loading