From 7a5dc9b6032ea502f769d18e38dbeefeaf66d8bf Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 30 Mar 2026 17:20:37 -0400 Subject: [PATCH 1/5] refactor(storage): effectify Storage service Migrate from promise-based to Effect-based implementation using ServiceMap.Service, TxReentrantLock, and AppFileSystem. Add Zod schemas for migration validation and storage tests. --- packages/opencode/src/storage/storage.ts | 396 ++++++++++++------ .../opencode/test/storage/storage.test.ts | 59 +++ 2 files changed, 325 insertions(+), 130 deletions(-) create mode 100644 packages/opencode/test/storage/storage.test.ts diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts index a78607cdfd57..ba4232c7d96f 100644 --- a/packages/opencode/src/storage/storage.ts +++ b/packages/opencode/src/storage/storage.ts @@ -1,19 +1,17 @@ import { Log } from "../util/log" import path from "path" -import fs from "fs/promises" import { Global } from "../global" -import { Filesystem } from "../util/filesystem" -import { lazy } from "../util/lazy" -import { Lock } from "../util/lock" import { NamedError } from "@opencode-ai/util/error" import z from "zod" -import { Glob } from "../util/glob" import { git } from "@/util/git" +import { AppFileSystem } from "@/filesystem" +import { makeRuntime } from "@/effect/run-service" +import { Effect, Layer, ServiceMap, SynchronizedRef, TxReentrantLock } from "effect" export namespace Storage { const log = Log.create({ service: "storage" }) - type Migration = (dir: string) => Promise + type Migration = (dir: string, fs: AppFileSystem.Interface) => Effect.Effect export const NotFoundError = NamedError.create( "NotFoundError", @@ -22,36 +20,101 @@ export namespace Storage { }), ) + export type Error = AppFileSystem.Error | InstanceType + + const RootFile = z + .object({ + path: z + .object({ + root: z.string().optional(), + }) + .optional(), + }) + .passthrough() + + const SessionFile = z + .object({ + id: z.string(), + }) + .passthrough() + + const MessageFile = z + .object({ + id: z.string(), + }) + .passthrough() + + const DiffFile = z + .object({ + additions: z.number(), + deletions: z.number(), + }) + .passthrough() + + const SummaryFile = z + .object({ + id: z.string(), + projectID: z.string(), + summary: z.object({ diffs: z.array(DiffFile) }), + }) + .passthrough() + + export interface Interface { + readonly remove: (key: string[]) => Effect.Effect + readonly read: (key: string[]) => Effect.Effect + readonly update: (key: string[], fn: (draft: T) => void) => Effect.Effect + readonly write: (key: string[], content: T) => Effect.Effect + readonly list: (prefix: string[]) => Effect.Effect + } + + export class Service extends ServiceMap.Service()("@opencode/Storage") {} + + function file(dir: string, key: string[]) { + return path.join(dir, ...key) + ".json" + } + + function missing(err: unknown) { + if (!err || typeof err !== "object") return false + if ("code" in err && err.code === "ENOENT") return true + if ("reason" in err && err.reason && typeof err.reason === "object" && "_tag" in err.reason) { + return err.reason._tag === "NotFound" + } + return false + } + const MIGRATIONS: Migration[] = [ - async (dir) => { + Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface) { const project = path.resolve(dir, "../project") - if (!(await Filesystem.isDir(project))) return - const projectDirs = await Glob.scan("*", { + if (!(yield* fs.isDir(project))) return + const projectDirs = yield* fs.glob("*", { cwd: project, include: "all", }) for (const projectDir of projectDirs) { - const fullPath = path.join(project, projectDir) - if (!(await Filesystem.isDir(fullPath))) continue + const full = path.join(project, projectDir) + if (!(yield* fs.isDir(full))) continue log.info(`migrating project ${projectDir}`) let projectID = projectDir - const fullProjectDir = path.join(project, projectDir) let worktree = "/" if (projectID !== "global") { - for (const msgFile of await Glob.scan("storage/session/message/*/*.json", { - cwd: path.join(project, projectDir), + for (const msgFile of yield* fs.glob("storage/session/message/*/*.json", { + cwd: full, absolute: true, })) { - const json = await Filesystem.readJson(msgFile) - worktree = json.path?.root - if (worktree) break + const json = RootFile.parse(yield* fs.readJson(msgFile)) + const root = json.path?.root + if (!root) continue + worktree = root + break } if (!worktree) continue - if (!(await Filesystem.isDir(worktree))) continue - const result = await git(["rev-list", "--max-parents=0", "--all"], { - cwd: worktree, - }) + if (!(yield* fs.isDir(worktree))) continue + const result = yield* Effect.promise(() => + git(["rev-list", "--max-parents=0", "--all"], { + cwd: worktree, + }), + ) const [id] = result .text() .split("\n") @@ -61,157 +124,230 @@ export namespace Storage { if (!id) continue projectID = id - await Filesystem.writeJson(path.join(dir, "project", projectID + ".json"), { - id, - vcs: "git", - worktree, - time: { - created: Date.now(), - initialized: Date.now(), - }, - }) + yield* fs.writeWithDirs( + path.join(dir, "project", projectID + ".json"), + JSON.stringify( + { + id, + vcs: "git", + worktree, + time: { + created: Date.now(), + initialized: Date.now(), + }, + }, + null, + 2, + ), + ) log.info(`migrating sessions for project ${projectID}`) - for (const sessionFile of await Glob.scan("storage/session/info/*.json", { - cwd: fullProjectDir, + for (const sessionFile of yield* fs.glob("storage/session/info/*.json", { + cwd: full, absolute: true, })) { const dest = path.join(dir, "session", projectID, path.basename(sessionFile)) - log.info("copying", { - sessionFile, - dest, - }) - const session = await Filesystem.readJson(sessionFile) - await Filesystem.writeJson(dest, session) + log.info("copying", { sessionFile, dest }) + const session = SessionFile.parse(yield* fs.readJson(sessionFile)) + yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2)) log.info(`migrating messages for session ${session.id}`) - for (const msgFile of await Glob.scan(`storage/session/message/${session.id}/*.json`, { - cwd: fullProjectDir, + for (const msgFile of yield* fs.glob(`storage/session/message/${session.id}/*.json`, { + cwd: full, absolute: true, })) { - const dest = path.join(dir, "message", session.id, path.basename(msgFile)) + const next = path.join(dir, "message", session.id, path.basename(msgFile)) log.info("copying", { msgFile, - dest, + dest: next, }) - const message = await Filesystem.readJson(msgFile) - await Filesystem.writeJson(dest, message) + const message = MessageFile.parse(yield* fs.readJson(msgFile)) + yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2)) log.info(`migrating parts for message ${message.id}`) - for (const partFile of await Glob.scan(`storage/session/part/${session.id}/${message.id}/*.json`, { - cwd: fullProjectDir, + for (const partFile of yield* fs.glob(`storage/session/part/${session.id}/${message.id}/*.json`, { + cwd: full, absolute: true, })) { - const dest = path.join(dir, "part", message.id, path.basename(partFile)) - const part = await Filesystem.readJson(partFile) + const out = path.join(dir, "part", message.id, path.basename(partFile)) + const part = yield* fs.readJson(partFile) log.info("copying", { partFile, - dest, + dest: out, }) - await Filesystem.writeJson(dest, part) + yield* fs.writeWithDirs(out, JSON.stringify(part, null, 2)) } } } } } - }, - async (dir) => { - for (const item of await Glob.scan("session/*/*.json", { + }), + Effect.fn("Storage.migration.2")(function* (dir: string, fs: AppFileSystem.Interface) { + for (const item of yield* fs.glob("session/*/*.json", { cwd: dir, absolute: true, })) { - const session = await Filesystem.readJson(item) - if (!session.projectID) continue - if (!session.summary?.diffs) continue - const { diffs } = session.summary - await Filesystem.write(path.join(dir, "session_diff", session.id + ".json"), JSON.stringify(diffs)) - await Filesystem.writeJson(path.join(dir, "session", session.projectID, session.id + ".json"), { - ...session, - summary: { - additions: diffs.reduce((sum: any, x: any) => sum + x.additions, 0), - deletions: diffs.reduce((sum: any, x: any) => sum + x.deletions, 0), - }, - }) + const session = SummaryFile.safeParse(yield* fs.readJson(item)) + if (!session.success) continue + const diffs = session.data.summary.diffs + yield* fs.writeWithDirs( + path.join(dir, "session_diff", session.data.id + ".json"), + JSON.stringify(diffs, null, 2), + ) + yield* fs.writeWithDirs( + path.join(dir, "session", session.data.projectID, session.data.id + ".json"), + JSON.stringify( + { + ...session.data, + summary: { + additions: diffs.reduce((sum, x) => sum + x.additions, 0), + deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), + }, + }, + null, + 2, + ), + ) } - }, + }), ] - const state = lazy(async () => { - const dir = path.join(Global.Path.data, "storage") - const migration = await Filesystem.readJson(path.join(dir, "migration")) - .then((x) => parseInt(x)) - .catch(() => 0) - for (let index = migration; index < MIGRATIONS.length; index++) { - log.info("running migration", { index }) - const migration = MIGRATIONS[index] - await migration(dir).catch(() => log.error("failed to run migration", { index })) - await Filesystem.write(path.join(dir, "migration"), (index + 1).toString()) - } - return { - dir, - } - }) + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const locks = yield* SynchronizedRef.make(new Map()) + const load = Effect.fn("Storage.state")(function* () { + const dir = path.join(Global.Path.data, "storage") + const marker = path.join(dir, "migration") + const migration = yield* fs.readFileString(marker).pipe( + Effect.map((x) => Number.parseInt(x, 10)), + Effect.catchIf(missing, () => Effect.succeed(0)), + Effect.orElseSucceed(() => 0), + ) + for (let i = migration; i < MIGRATIONS.length; i++) { + log.info("running migration", { index: i }) + const step = MIGRATIONS[i]! + yield* step(dir, fs).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("failed to run migration", { index: i, cause }) + }), + ), + ) + yield* fs.writeWithDirs(marker, String(i + 1)) + } + return { dir } + }) + const state = yield* Effect.cached(load()) + + const get = Effect.fn("Storage.lock")(function* (key: string) { + return yield* SynchronizedRef.modifyEffect(locks, (map) => + Effect.gen(function* () { + const existing = map.get(key) + if (existing) return [existing, map] + const next = yield* TxReentrantLock.make() + map.set(key, next) + return [next, map] + }), + ) + }) + + const fail = (target: string): Effect.Effect> => + Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` })) + + const wrap = (target: string, body: Effect.Effect) => + body.pipe(Effect.catchIf(missing, () => fail(target))) + + const writeJson = Effect.fnUntraced(function* (target: string, content: unknown) { + yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2)) + }) + + const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) { + const dir = (yield* state).dir + const target = file(dir, key) + const rw = yield* get(target) + yield* TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))) + }) + + const read: Interface["read"] = (key: string[]) => + Effect.gen(function* () { + const dir = (yield* state).dir + const target = file(dir, key) + const rw = yield* get(target) + const value = yield* TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))) + return value as T + }) + + const update: Interface["update"] = (key: string[], fn: (draft: T) => void) => + Effect.gen(function* () { + const dir = (yield* state).dir + const target = file(dir, key) + const rw = yield* get(target) + const value = yield* TxReentrantLock.withWriteLock( + rw, + Effect.gen(function* () { + const content = yield* wrap(target, fs.readJson(target)) + fn(content as T) + yield* writeJson(target, content) + return content + }), + ) + return value as T + }) + + const write: Interface["write"] = (key: string[], content: unknown) => + Effect.gen(function* () { + const dir = (yield* state).dir + const target = file(dir, key) + const rw = yield* get(target) + yield* TxReentrantLock.withWriteLock(rw, writeJson(target, content)) + }) + + const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) { + const dir = (yield* state).dir + const cwd = path.join(dir, ...prefix) + const result = yield* fs + .glob("**/*", { + cwd, + include: "file", + }) + .pipe(Effect.catch(() => Effect.succeed([]))) + return result + .map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]) + .toSorted((a, b) => a.join("/").localeCompare(b.join("/"))) + }) + + return Service.of({ + remove, + read, + update, + write, + list, + }) + }), + ) + + export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer)) + + const { runPromise } = makeRuntime(Service, defaultLayer) export async function remove(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - await fs.unlink(target).catch(() => {}) - }) + return runPromise((svc) => svc.remove(key)) } export async function read(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.read(target) - const result = await Filesystem.readJson(target) - return result as T - }) + return runPromise((svc) => svc.read(key)) } export async function update(key: string[], fn: (draft: T) => void) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.write(target) - const content = await Filesystem.readJson(target) - fn(content as T) - await Filesystem.writeJson(target, content) - return content - }) + return runPromise((svc) => svc.update(key, fn)) } export async function write(key: string[], content: T) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.write(target) - await Filesystem.writeJson(target, content) - }) - } - - async function withErrorHandling(body: () => Promise) { - return body().catch((e) => { - if (!(e instanceof Error)) throw e - const errnoException = e as NodeJS.ErrnoException - if (errnoException.code === "ENOENT") { - throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` }) - } - throw e - }) + return runPromise((svc) => svc.write(key, content)) } export async function list(prefix: string[]) { - const dir = await state().then((x) => x.dir) - try { - const result = await Glob.scan("**/*", { - cwd: path.join(dir, ...prefix), - include: "file", - }).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])) - result.sort() - return result - } catch { - return [] - } + return runPromise((svc) => svc.list(prefix)) } } diff --git a/packages/opencode/test/storage/storage.test.ts b/packages/opencode/test/storage/storage.test.ts new file mode 100644 index 000000000000..ab55e77c7fd9 --- /dev/null +++ b/packages/opencode/test/storage/storage.test.ts @@ -0,0 +1,59 @@ +import { afterAll, beforeEach, describe, expect, test } from "bun:test" +import fs from "fs/promises" +import path from "path" +import { Global } from "../../src/global" +import { Storage } from "../../src/storage/storage" + +const dir = path.join(Global.Path.data, "storage") + +async function reset() { + await fs.rm(dir, { recursive: true, force: true }) +} + +describe("Storage", () => { + beforeEach(reset) + afterAll(reset) + + test("round-trips JSON content", async () => { + const key = ["session_diff", "roundtrip"] + const value = [{ file: "a.ts", additions: 2, deletions: 1 }] + + await Storage.write(key, value) + + expect(await Storage.read(key)).toEqual(value) + }) + + test("maps missing reads to NotFoundError", async () => { + await expect(Storage.read(["missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" }) + }) + + test("serializes concurrent updates for the same key", async () => { + const key = ["counter", "shared"] + await Storage.write(key, { value: 0 }) + + await Promise.all( + Array.from({ length: 25 }, () => + Storage.update<{ value: number }>(key, (draft) => { + draft.value += 1 + }), + ), + ) + + expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 }) + }) + + test("lists and removes stored entries", async () => { + await Storage.write(["list", "b"], { value: 2 }) + await Storage.write(["list", "a"], { value: 1 }) + + expect(await Storage.list(["list"])).toEqual([ + ["list", "a"], + ["list", "b"], + ]) + + await Storage.remove(["list", "a"]) + + expect(await Storage.list(["list"])).toEqual([["list", "b"]]) + await expect(Storage.read(["list", "a"])).rejects.toMatchObject({ name: "NotFoundError" }) + }) +}) From 8c8d888cc02a0056ac040201b27414e132284d6f Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 30 Mar 2026 17:41:15 -0400 Subject: [PATCH 2/5] refactor(storage): extract resolve helper to deduplicate CRUD setup Extract repeated dir/target/lock resolution into a single `resolve` helper, removing 3-line boilerplate from each CRUD operation. --- packages/opencode/src/storage/storage.ts | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts index ba4232c7d96f..c9b3f7e41518 100644 --- a/packages/opencode/src/storage/storage.ts +++ b/packages/opencode/src/storage/storage.ts @@ -262,27 +262,27 @@ export namespace Storage { yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2)) }) - const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) { + const resolve = Effect.fnUntraced(function* (key: string[]) { const dir = (yield* state).dir const target = file(dir, key) - const rw = yield* get(target) + return [target, yield* get(target)] as const + }) + + const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) { + const [target, rw] = yield* resolve(key) yield* TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))) }) const read: Interface["read"] = (key: string[]) => Effect.gen(function* () { - const dir = (yield* state).dir - const target = file(dir, key) - const rw = yield* get(target) + const [target, rw] = yield* resolve(key) const value = yield* TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))) return value as T }) const update: Interface["update"] = (key: string[], fn: (draft: T) => void) => Effect.gen(function* () { - const dir = (yield* state).dir - const target = file(dir, key) - const rw = yield* get(target) + const [target, rw] = yield* resolve(key) const value = yield* TxReentrantLock.withWriteLock( rw, Effect.gen(function* () { @@ -297,9 +297,7 @@ export namespace Storage { const write: Interface["write"] = (key: string[], content: unknown) => Effect.gen(function* () { - const dir = (yield* state).dir - const target = file(dir, key) - const rw = yield* get(target) + const [target, rw] = yield* resolve(key) yield* TxReentrantLock.withWriteLock(rw, writeJson(target, content)) }) From 81a68150f5924d06a2622bc07d0c9044c43047c2 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 30 Mar 2026 17:54:42 -0400 Subject: [PATCH 3/5] refactor(storage): simplify state init, add comprehensive tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop Effect.fn wrapper from cached state (runs once, no trace needed). Rename get→lock for clarity, use fnUntraced for internal helper. Add tests for update-on-missing, write-overwrite, remove-noop, list-empty, concurrent reads, and nested keys. --- packages/opencode/src/storage/storage.ts | 49 ++++++++++--------- .../opencode/test/storage/storage.test.ts | 45 ++++++++++++++++- 2 files changed, 69 insertions(+), 25 deletions(-) diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts index c9b3f7e41518..199529c3b640 100644 --- a/packages/opencode/src/storage/storage.ts +++ b/packages/opencode/src/storage/storage.ts @@ -216,31 +216,32 @@ export namespace Storage { Effect.gen(function* () { const fs = yield* AppFileSystem.Service const locks = yield* SynchronizedRef.make(new Map()) - const load = Effect.fn("Storage.state")(function* () { - const dir = path.join(Global.Path.data, "storage") - const marker = path.join(dir, "migration") - const migration = yield* fs.readFileString(marker).pipe( - Effect.map((x) => Number.parseInt(x, 10)), - Effect.catchIf(missing, () => Effect.succeed(0)), - Effect.orElseSucceed(() => 0), - ) - for (let i = migration; i < MIGRATIONS.length; i++) { - log.info("running migration", { index: i }) - const step = MIGRATIONS[i]! - yield* step(dir, fs).pipe( - Effect.catchCause((cause) => - Effect.sync(() => { - log.error("failed to run migration", { index: i, cause }) - }), - ), + const state = yield* Effect.cached( + Effect.gen(function* () { + const dir = path.join(Global.Path.data, "storage") + const marker = path.join(dir, "migration") + const migration = yield* fs.readFileString(marker).pipe( + Effect.map((x) => Number.parseInt(x, 10)), + Effect.catchIf(missing, () => Effect.succeed(0)), + Effect.orElseSucceed(() => 0), ) - yield* fs.writeWithDirs(marker, String(i + 1)) - } - return { dir } - }) - const state = yield* Effect.cached(load()) + for (let i = migration; i < MIGRATIONS.length; i++) { + log.info("running migration", { index: i }) + const step = MIGRATIONS[i]! + yield* step(dir, fs).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("failed to run migration", { index: i, cause }) + }), + ), + ) + yield* fs.writeWithDirs(marker, String(i + 1)) + } + return { dir } + }), + ) - const get = Effect.fn("Storage.lock")(function* (key: string) { + const lock = Effect.fnUntraced(function* (key: string) { return yield* SynchronizedRef.modifyEffect(locks, (map) => Effect.gen(function* () { const existing = map.get(key) @@ -265,7 +266,7 @@ export namespace Storage { const resolve = Effect.fnUntraced(function* (key: string[]) { const dir = (yield* state).dir const target = file(dir, key) - return [target, yield* get(target)] as const + return [target, yield* lock(target)] as const }) const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) { diff --git a/packages/opencode/test/storage/storage.test.ts b/packages/opencode/test/storage/storage.test.ts index ab55e77c7fd9..10e4d4335223 100644 --- a/packages/opencode/test/storage/storage.test.ts +++ b/packages/opencode/test/storage/storage.test.ts @@ -27,6 +27,30 @@ describe("Storage", () => { await expect(Storage.read(["missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" }) }) + test("update on missing key throws NotFoundError", async () => { + await expect( + Storage.update<{ value: number }>(["missing", "key"], (draft) => { + draft.value += 1 + }), + ).rejects.toMatchObject({ name: "NotFoundError" }) + }) + + test("write overwrites existing value", async () => { + const key = ["overwrite", "test"] + await Storage.write<{ v: number }>(key, { v: 1 }) + await Storage.write<{ v: number }>(key, { v: 2 }) + + expect(await Storage.read<{ v: number }>(key)).toEqual({ v: 2 }) + }) + + test("remove on missing key is a no-op", async () => { + await expect(Storage.remove(["nonexistent", "key"])).resolves.toBeUndefined() + }) + + test("list on missing prefix returns empty", async () => { + expect(await Storage.list(["nonexistent"])).toEqual([]) + }) + test("serializes concurrent updates for the same key", async () => { const key = ["counter", "shared"] await Storage.write(key, { value: 0 }) @@ -42,6 +66,25 @@ describe("Storage", () => { expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 }) }) + test("concurrent reads do not block each other", async () => { + const key = ["concurrent", "reads"] + await Storage.write(key, { ok: true }) + + const results = await Promise.all(Array.from({ length: 10 }, () => Storage.read(key))) + + expect(results).toHaveLength(10) + for (const r of results) expect(r).toEqual({ ok: true }) + }) + + test("nested keys create deep paths", async () => { + const key = ["a", "b", "c", "deep"] + await Storage.write<{ nested: boolean }>(key, { nested: true }) + + expect(await Storage.read<{ nested: boolean }>(key)).toEqual({ nested: true }) + const items = await Storage.list(["a"]) + expect(items).toEqual([["a", "b", "c", "deep"]]) + }) + test("lists and removes stored entries", async () => { await Storage.write(["list", "b"], { value: 2 }) await Storage.write(["list", "a"], { value: 1 }) @@ -56,4 +99,4 @@ describe("Storage", () => { expect(await Storage.list(["list"])).toEqual([["list", "b"]]) await expect(Storage.read(["list", "a"])).rejects.toMatchObject({ name: "NotFoundError" }) }) -}) +}) \ No newline at end of file From 5592e09bf5ca3266c6234d441531c3eea1d31c7f Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 30 Mar 2026 20:01:32 -0400 Subject: [PATCH 4/5] fix(test): stabilize Windows session and storage tests Scope storage tests to unique directories so they do not race other package tests. Disable the file watcher only inside session message server tests on Windows to avoid the Bun watcher crash without breaking watcher-dependent coverage. --- .../test/server/session-messages.test.ts | 156 ++++++++++-------- .../opencode/test/storage/storage.test.ts | 130 +++++++++------ 2 files changed, 164 insertions(+), 122 deletions(-) diff --git a/packages/opencode/test/server/session-messages.test.ts b/packages/opencode/test/server/session-messages.test.ts index d7e44cbecc36..89e6fba5c5fd 100644 --- a/packages/opencode/test/server/session-messages.test.ts +++ b/packages/opencode/test/server/session-messages.test.ts @@ -13,6 +13,18 @@ afterEach(async () => { await Instance.disposeAll() }) +async function withoutWatcher(fn: () => Promise) { + if (process.platform !== "win32") return fn() + const prev = process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER + process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = "true" + try { + return await fn() + } finally { + if (prev === undefined) delete process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER + else process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = prev + } +} + async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) { const ids = [] as MessageID[] for (let i = 0; i < count; i++) { @@ -42,86 +54,94 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D describe("session messages endpoint", () => { test("returns cursor headers for older pages", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - const ids = await fill(session.id, 5) - const app = Server.Default() - - const a = await app.request(`/session/${session.id}/message?limit=2`) - expect(a.status).toBe(200) - const aBody = (await a.json()) as MessageV2.WithParts[] - expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2)) - const cursor = a.headers.get("x-next-cursor") - expect(cursor).toBeTruthy() - expect(a.headers.get("link")).toContain('rel="next"') - - const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`) - expect(b.status).toBe(200) - const bBody = (await b.json()) as MessageV2.WithParts[] - expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) - - await Session.remove(session.id) - }, - }) + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 5) + const app = Server.Default() + + const a = await app.request(`/session/${session.id}/message?limit=2`) + expect(a.status).toBe(200) + const aBody = (await a.json()) as MessageV2.WithParts[] + expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2)) + const cursor = a.headers.get("x-next-cursor") + expect(cursor).toBeTruthy() + expect(a.headers.get("link")).toContain('rel="next"') + + const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`) + expect(b.status).toBe(200) + const bBody = (await b.json()) as MessageV2.WithParts[] + expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) + + await Session.remove(session.id) + }, + }), + ) }) test("keeps full-history responses when limit is omitted", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - const ids = await fill(session.id, 3) - const app = Server.Default() - - const res = await app.request(`/session/${session.id}/message`) - expect(res.status).toBe(200) - const body = (await res.json()) as MessageV2.WithParts[] - expect(body.map((item) => item.info.id)).toEqual(ids) - - await Session.remove(session.id) - }, - }) + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 3) + const app = Server.Default() + + const res = await app.request(`/session/${session.id}/message`) + expect(res.status).toBe(200) + const body = (await res.json()) as MessageV2.WithParts[] + expect(body.map((item) => item.info.id)).toEqual(ids) + + await Session.remove(session.id) + }, + }), + ) }) test("rejects invalid cursors and missing sessions", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - const app = Server.Default() - - const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`) - expect(bad.status).toBe(400) - - const miss = await app.request(`/session/ses_missing/message?limit=2`) - expect(miss.status).toBe(404) - - await Session.remove(session.id) - }, - }) + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const app = Server.Default() + + const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`) + expect(bad.status).toBe(400) + + const miss = await app.request(`/session/ses_missing/message?limit=2`) + expect(miss.status).toBe(404) + + await Session.remove(session.id) + }, + }), + ) }) test("does not truncate large legacy limit requests", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - await fill(session.id, 520) - const app = Server.Default() - - const res = await app.request(`/session/${session.id}/message?limit=510`) - expect(res.status).toBe(200) - const body = (await res.json()) as MessageV2.WithParts[] - expect(body).toHaveLength(510) - - await Session.remove(session.id) - }, - }) + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 520) + const app = Server.Default() + + const res = await app.request(`/session/${session.id}/message?limit=510`) + expect(res.status).toBe(200) + const body = (await res.json()) as MessageV2.WithParts[] + expect(body).toHaveLength(510) + + await Session.remove(session.id) + }, + }), + ) }) }) diff --git a/packages/opencode/test/storage/storage.test.ts b/packages/opencode/test/storage/storage.test.ts index 10e4d4335223..ccc836d66896 100644 --- a/packages/opencode/test/storage/storage.test.ts +++ b/packages/opencode/test/storage/storage.test.ts @@ -1,4 +1,4 @@ -import { afterAll, beforeEach, describe, expect, test } from "bun:test" +import { describe, expect, test } from "bun:test" import fs from "fs/promises" import path from "path" import { Global } from "../../src/global" @@ -6,97 +6,119 @@ import { Storage } from "../../src/storage/storage" const dir = path.join(Global.Path.data, "storage") -async function reset() { - await fs.rm(dir, { recursive: true, force: true }) +async function withScope(fn: (root: string[]) => Promise) { + const root = ["storage_test", crypto.randomUUID()] + try { + return await fn(root) + } finally { + await fs.rm(path.join(dir, ...root), { recursive: true, force: true }) + } } describe("Storage", () => { - beforeEach(reset) - afterAll(reset) - test("round-trips JSON content", async () => { - const key = ["session_diff", "roundtrip"] - const value = [{ file: "a.ts", additions: 2, deletions: 1 }] + await withScope(async (root) => { + const key = [...root, "session_diff", "roundtrip"] + const value = [{ file: "a.ts", additions: 2, deletions: 1 }] - await Storage.write(key, value) + await Storage.write(key, value) - expect(await Storage.read(key)).toEqual(value) + expect(await Storage.read(key)).toEqual(value) + }) }) test("maps missing reads to NotFoundError", async () => { - await expect(Storage.read(["missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" }) + await withScope(async (root) => { + await expect(Storage.read([...root, "missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" }) + }) }) test("update on missing key throws NotFoundError", async () => { - await expect( - Storage.update<{ value: number }>(["missing", "key"], (draft) => { - draft.value += 1 - }), - ).rejects.toMatchObject({ name: "NotFoundError" }) + await withScope(async (root) => { + await expect( + Storage.update<{ value: number }>([...root, "missing", "key"], (draft) => { + draft.value += 1 + }), + ).rejects.toMatchObject({ name: "NotFoundError" }) + }) }) test("write overwrites existing value", async () => { - const key = ["overwrite", "test"] - await Storage.write<{ v: number }>(key, { v: 1 }) - await Storage.write<{ v: number }>(key, { v: 2 }) + await withScope(async (root) => { + const key = [...root, "overwrite", "test"] + await Storage.write<{ v: number }>(key, { v: 1 }) + await Storage.write<{ v: number }>(key, { v: 2 }) - expect(await Storage.read<{ v: number }>(key)).toEqual({ v: 2 }) + expect(await Storage.read<{ v: number }>(key)).toEqual({ v: 2 }) + }) }) test("remove on missing key is a no-op", async () => { - await expect(Storage.remove(["nonexistent", "key"])).resolves.toBeUndefined() + await withScope(async (root) => { + await expect(Storage.remove([...root, "nonexistent", "key"])).resolves.toBeUndefined() + }) }) test("list on missing prefix returns empty", async () => { - expect(await Storage.list(["nonexistent"])).toEqual([]) + await withScope(async (root) => { + expect(await Storage.list([...root, "nonexistent"])).toEqual([]) + }) }) test("serializes concurrent updates for the same key", async () => { - const key = ["counter", "shared"] - await Storage.write(key, { value: 0 }) - - await Promise.all( - Array.from({ length: 25 }, () => - Storage.update<{ value: number }>(key, (draft) => { - draft.value += 1 - }), - ), - ) - - expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 }) + await withScope(async (root) => { + const key = [...root, "counter", "shared"] + await Storage.write(key, { value: 0 }) + + await Promise.all( + Array.from({ length: 25 }, () => + Storage.update<{ value: number }>(key, (draft) => { + draft.value += 1 + }), + ), + ) + + expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 }) + }) }) test("concurrent reads do not block each other", async () => { - const key = ["concurrent", "reads"] - await Storage.write(key, { ok: true }) + await withScope(async (root) => { + const key = [...root, "concurrent", "reads"] + await Storage.write(key, { ok: true }) - const results = await Promise.all(Array.from({ length: 10 }, () => Storage.read(key))) + const results = await Promise.all(Array.from({ length: 10 }, () => Storage.read(key))) - expect(results).toHaveLength(10) - for (const r of results) expect(r).toEqual({ ok: true }) + expect(results).toHaveLength(10) + for (const r of results) expect(r).toEqual({ ok: true }) + }) }) test("nested keys create deep paths", async () => { - const key = ["a", "b", "c", "deep"] - await Storage.write<{ nested: boolean }>(key, { nested: true }) + await withScope(async (root) => { + const key = [...root, "a", "b", "c", "deep"] + await Storage.write<{ nested: boolean }>(key, { nested: true }) - expect(await Storage.read<{ nested: boolean }>(key)).toEqual({ nested: true }) - const items = await Storage.list(["a"]) - expect(items).toEqual([["a", "b", "c", "deep"]]) + expect(await Storage.read<{ nested: boolean }>(key)).toEqual({ nested: true }) + expect(await Storage.list([...root, "a"])).toEqual([key]) + }) }) test("lists and removes stored entries", async () => { - await Storage.write(["list", "b"], { value: 2 }) - await Storage.write(["list", "a"], { value: 1 }) + await withScope(async (root) => { + const a = [...root, "list", "a"] + const b = [...root, "list", "b"] + const prefix = [...root, "list"] + + await Storage.write(b, { value: 2 }) + await Storage.write(a, { value: 1 }) - expect(await Storage.list(["list"])).toEqual([ - ["list", "a"], - ["list", "b"], - ]) + expect(await Storage.list(prefix)).toEqual([a, b]) - await Storage.remove(["list", "a"]) + await Storage.remove(a) - expect(await Storage.list(["list"])).toEqual([["list", "b"]]) - await expect(Storage.read(["list", "a"])).rejects.toMatchObject({ name: "NotFoundError" }) + expect(await Storage.list(prefix)).toEqual([b]) + await expect(Storage.read(a)).rejects.toMatchObject({ name: "NotFoundError" }) + }) }) -}) \ No newline at end of file +}) From d421faa5d9a9e6a7858e1364791b897c735bc1cd Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 30 Mar 2026 21:09:18 -0400 Subject: [PATCH 5/5] fix(storage): harden legacy migration handling Handle invalid migration markers and malformed legacy records without skipping or corrupting migrations. Add focused migration coverage and switch the storage lock cache to RcMap so per-key locks are cleaned up automatically. --- packages/opencode/src/storage/storage.ts | 189 +++++++++--------- .../opencode/test/storage/storage.test.ts | 171 ++++++++++++++++ 2 files changed, 266 insertions(+), 94 deletions(-) diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts index 199529c3b640..268b18f6876b 100644 --- a/packages/opencode/src/storage/storage.ts +++ b/packages/opencode/src/storage/storage.ts @@ -6,7 +6,7 @@ import z from "zod" import { git } from "@/util/git" import { AppFileSystem } from "@/filesystem" import { makeRuntime } from "@/effect/run-service" -import { Effect, Layer, ServiceMap, SynchronizedRef, TxReentrantLock } from "effect" +import { Effect, Exit, Layer, Option, RcMap, Schema, ServiceMap, TxReentrantLock } from "effect" export namespace Storage { const log = Log.create({ service: "storage" }) @@ -22,42 +22,37 @@ export namespace Storage { export type Error = AppFileSystem.Error | InstanceType - const RootFile = z - .object({ - path: z - .object({ - root: z.string().optional(), - }) - .optional(), - }) - .passthrough() - - const SessionFile = z - .object({ - id: z.string(), - }) - .passthrough() - - const MessageFile = z - .object({ - id: z.string(), - }) - .passthrough() - - const DiffFile = z - .object({ - additions: z.number(), - deletions: z.number(), - }) - .passthrough() - - const SummaryFile = z - .object({ - id: z.string(), - projectID: z.string(), - summary: z.object({ diffs: z.array(DiffFile) }), - }) - .passthrough() + const RootFile = Schema.Struct({ + path: Schema.optional( + Schema.Struct({ + root: Schema.optional(Schema.String), + }), + ), + }) + + const SessionFile = Schema.Struct({ + id: Schema.String, + }) + + const MessageFile = Schema.Struct({ + id: Schema.String, + }) + + const DiffFile = Schema.Struct({ + additions: Schema.Number, + deletions: Schema.Number, + }) + + const SummaryFile = Schema.Struct({ + id: Schema.String, + projectID: Schema.String, + summary: Schema.Struct({ diffs: Schema.Array(DiffFile) }), + }) + + const decodeRoot = Schema.decodeUnknownOption(RootFile) + const decodeSession = Schema.decodeUnknownOption(SessionFile) + const decodeMessage = Schema.decodeUnknownOption(MessageFile) + const decodeSummary = Schema.decodeUnknownOption(SummaryFile) export interface Interface { readonly remove: (key: string[]) => Effect.Effect @@ -82,6 +77,11 @@ export namespace Storage { return false } + function parseMigration(text: string) { + const value = Number.parseInt(text, 10) + return Number.isNaN(value) ? 0 : value + } + const MIGRATIONS: Migration[] = [ Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface) { const project = path.resolve(dir, "../project") @@ -102,8 +102,8 @@ export namespace Storage { cwd: full, absolute: true, })) { - const json = RootFile.parse(yield* fs.readJson(msgFile)) - const root = json.path?.root + const json = decodeRoot(yield* fs.readJson(msgFile), { onExcessProperty: "preserve" }) + const root = Option.isSome(json) ? json.value.path?.root : undefined if (!root) continue worktree = root break @@ -148,27 +148,31 @@ export namespace Storage { })) { const dest = path.join(dir, "session", projectID, path.basename(sessionFile)) log.info("copying", { sessionFile, dest }) - const session = SessionFile.parse(yield* fs.readJson(sessionFile)) + const session = yield* fs.readJson(sessionFile) + const info = decodeSession(session, { onExcessProperty: "preserve" }) yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2)) - log.info(`migrating messages for session ${session.id}`) - for (const msgFile of yield* fs.glob(`storage/session/message/${session.id}/*.json`, { + if (Option.isNone(info)) continue + log.info(`migrating messages for session ${info.value.id}`) + for (const msgFile of yield* fs.glob(`storage/session/message/${info.value.id}/*.json`, { cwd: full, absolute: true, })) { - const next = path.join(dir, "message", session.id, path.basename(msgFile)) + const next = path.join(dir, "message", info.value.id, path.basename(msgFile)) log.info("copying", { msgFile, dest: next, }) - const message = MessageFile.parse(yield* fs.readJson(msgFile)) + const message = yield* fs.readJson(msgFile) + const item = decodeMessage(message, { onExcessProperty: "preserve" }) yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2)) + if (Option.isNone(item)) continue - log.info(`migrating parts for message ${message.id}`) - for (const partFile of yield* fs.glob(`storage/session/part/${session.id}/${message.id}/*.json`, { + log.info(`migrating parts for message ${item.value.id}`) + for (const partFile of yield* fs.glob(`storage/session/part/${info.value.id}/${item.value.id}/*.json`, { cwd: full, absolute: true, })) { - const out = path.join(dir, "part", message.id, path.basename(partFile)) + const out = path.join(dir, "part", item.value.id, path.basename(partFile)) const part = yield* fs.readJson(partFile) log.info("copying", { partFile, @@ -186,18 +190,19 @@ export namespace Storage { cwd: dir, absolute: true, })) { - const session = SummaryFile.safeParse(yield* fs.readJson(item)) - if (!session.success) continue - const diffs = session.data.summary.diffs + const raw = yield* fs.readJson(item) + const session = decodeSummary(raw, { onExcessProperty: "preserve" }) + if (Option.isNone(session)) continue + const diffs = session.value.summary.diffs yield* fs.writeWithDirs( - path.join(dir, "session_diff", session.data.id + ".json"), + path.join(dir, "session_diff", session.value.id + ".json"), JSON.stringify(diffs, null, 2), ) yield* fs.writeWithDirs( - path.join(dir, "session", session.data.projectID, session.data.id + ".json"), + path.join(dir, "session", session.value.projectID, session.value.id + ".json"), JSON.stringify( { - ...session.data, + ...(raw as Record), summary: { additions: diffs.reduce((sum, x) => sum + x.additions, 0), deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), @@ -215,44 +220,33 @@ export namespace Storage { Service, Effect.gen(function* () { const fs = yield* AppFileSystem.Service - const locks = yield* SynchronizedRef.make(new Map()) + const locks = yield* RcMap.make({ + lookup: () => TxReentrantLock.make(), + idleTimeToLive: 0, + }) const state = yield* Effect.cached( Effect.gen(function* () { const dir = path.join(Global.Path.data, "storage") const marker = path.join(dir, "migration") const migration = yield* fs.readFileString(marker).pipe( - Effect.map((x) => Number.parseInt(x, 10)), + Effect.map(parseMigration), Effect.catchIf(missing, () => Effect.succeed(0)), Effect.orElseSucceed(() => 0), ) for (let i = migration; i < MIGRATIONS.length; i++) { log.info("running migration", { index: i }) const step = MIGRATIONS[i]! - yield* step(dir, fs).pipe( - Effect.catchCause((cause) => - Effect.sync(() => { - log.error("failed to run migration", { index: i, cause }) - }), - ), - ) + const exit = yield* Effect.exit(step(dir, fs)) + if (Exit.isFailure(exit)) { + log.error("failed to run migration", { index: i, cause: exit.cause }) + break + } yield* fs.writeWithDirs(marker, String(i + 1)) } return { dir } }), ) - const lock = Effect.fnUntraced(function* (key: string) { - return yield* SynchronizedRef.modifyEffect(locks, (map) => - Effect.gen(function* () { - const existing = map.get(key) - if (existing) return [existing, map] - const next = yield* TxReentrantLock.make() - map.set(key, next) - return [next, map] - }), - ) - }) - const fail = (target: string): Effect.Effect> => Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` })) @@ -263,43 +257,50 @@ export namespace Storage { yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2)) }) - const resolve = Effect.fnUntraced(function* (key: string[]) { - const dir = (yield* state).dir - const target = file(dir, key) - return [target, yield* lock(target)] as const - }) + const withResolved = ( + key: string[], + fn: (target: string, rw: TxReentrantLock.TxReentrantLock) => Effect.Effect, + ): Effect.Effect => + Effect.scoped( + Effect.gen(function* () { + const target = file((yield* state).dir, key) + return yield* fn(target, yield* RcMap.get(locks, target)) + }), + ) const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) { - const [target, rw] = yield* resolve(key) - yield* TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))) + yield* withResolved(key, (target, rw) => + TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))), + ) }) const read: Interface["read"] = (key: string[]) => Effect.gen(function* () { - const [target, rw] = yield* resolve(key) - const value = yield* TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))) + const value = yield* withResolved(key, (target, rw) => + TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))), + ) return value as T }) const update: Interface["update"] = (key: string[], fn: (draft: T) => void) => Effect.gen(function* () { - const [target, rw] = yield* resolve(key) - const value = yield* TxReentrantLock.withWriteLock( - rw, - Effect.gen(function* () { - const content = yield* wrap(target, fs.readJson(target)) - fn(content as T) - yield* writeJson(target, content) - return content - }), + const value = yield* withResolved(key, (target, rw) => + TxReentrantLock.withWriteLock( + rw, + Effect.gen(function* () { + const content = yield* wrap(target, fs.readJson(target)) + fn(content as T) + yield* writeJson(target, content) + return content + }), + ), ) return value as T }) const write: Interface["write"] = (key: string[], content: unknown) => Effect.gen(function* () { - const [target, rw] = yield* resolve(key) - yield* TxReentrantLock.withWriteLock(rw, writeJson(target, content)) + yield* withResolved(key, (target, rw) => TxReentrantLock.withWriteLock(rw, writeJson(target, content))) }) const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) { diff --git a/packages/opencode/test/storage/storage.test.ts b/packages/opencode/test/storage/storage.test.ts index ccc836d66896..e5a04c082dc5 100644 --- a/packages/opencode/test/storage/storage.test.ts +++ b/packages/opencode/test/storage/storage.test.ts @@ -1,8 +1,11 @@ import { describe, expect, test } from "bun:test" import fs from "fs/promises" import path from "path" +import { Effect, Layer, ManagedRuntime } from "effect" +import { AppFileSystem } from "../../src/filesystem" import { Global } from "../../src/global" import { Storage } from "../../src/storage/storage" +import { tmpdir } from "../fixture/fixture" const dir = path.join(Global.Path.data, "storage") @@ -15,6 +18,60 @@ async function withScope(fn: (root: string[]) => Promise) { } } +function map(root: string, file: string) { + if (file === Global.Path.data) return root + if (file.startsWith(Global.Path.data + path.sep)) return path.join(root, path.relative(Global.Path.data, file)) + return file +} + +function layer(root: string) { + return Layer.effect( + AppFileSystem.Service, + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + return AppFileSystem.Service.of({ + ...fs, + isDir: (file) => fs.isDir(map(root, file)), + readJson: (file) => fs.readJson(map(root, file)), + writeWithDirs: (file, content, mode) => fs.writeWithDirs(map(root, file), content, mode), + readFileString: (file) => fs.readFileString(map(root, file)), + remove: (file) => fs.remove(map(root, file)), + glob: (pattern, options) => + fs.glob(pattern, options?.cwd ? { ...options, cwd: map(root, options.cwd) } : options), + }) + }), + ).pipe(Layer.provide(AppFileSystem.defaultLayer)) +} + +async function withStorage( + root: string, + fn: (run: (body: Effect.Effect) => Promise) => Promise, +) { + const rt = ManagedRuntime.make(Storage.layer.pipe(Layer.provide(layer(root)))) + try { + return await fn((body) => rt.runPromise(body)) + } finally { + await rt.dispose() + } +} + +async function write(file: string, value: unknown) { + await fs.mkdir(path.dirname(file), { recursive: true }) + await Bun.write(file, JSON.stringify(value, null, 2)) +} + +async function text(file: string, value: string) { + await fs.mkdir(path.dirname(file), { recursive: true }) + await Bun.write(file, value) +} + +async function exists(file: string) { + return fs + .stat(file) + .then(() => true) + .catch(() => false) +} + describe("Storage", () => { test("round-trips JSON content", async () => { await withScope(async (root) => { @@ -121,4 +178,118 @@ describe("Storage", () => { await expect(Storage.read(a)).rejects.toMatchObject({ name: "NotFoundError" }) }) }) + + test("migration 2 runs when marker contents are invalid", async () => { + await using tmp = await tmpdir() + const storage = path.join(tmp.path, "storage") + const diffs = [ + { additions: 2, deletions: 1 }, + { additions: 3, deletions: 4 }, + ] + + await text(path.join(storage, "migration"), "wat") + await write(path.join(storage, "session", "proj_test", "ses_test.json"), { + id: "ses_test", + projectID: "proj_test", + title: "legacy", + summary: { diffs }, + }) + + await withStorage(tmp.path, async (run) => { + expect(await run(Storage.Service.use((svc) => svc.list(["session_diff"])))).toEqual([ + ["session_diff", "ses_test"], + ]) + expect(await run(Storage.Service.use((svc) => svc.read(["session_diff", "ses_test"])))).toEqual( + diffs, + ) + expect( + await run( + Storage.Service.use((svc) => + svc.read<{ + id: string + projectID: string + title: string + summary: { + additions: number + deletions: number + } + }>(["session", "proj_test", "ses_test"]), + ), + ), + ).toEqual({ + id: "ses_test", + projectID: "proj_test", + title: "legacy", + summary: { + additions: 5, + deletions: 5, + }, + }) + }) + + expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2") + }) + + test("migration 1 tolerates malformed legacy records", async () => { + await using tmp = await tmpdir({ git: true }) + const storage = path.join(tmp.path, "storage") + const legacy = path.join(tmp.path, "project", "legacy") + + await write(path.join(legacy, "storage", "session", "message", "probe", "0.json"), []) + await write(path.join(legacy, "storage", "session", "message", "probe", "1.json"), { + path: { root: tmp.path }, + }) + await write(path.join(legacy, "storage", "session", "info", "ses_legacy.json"), { + id: "ses_legacy", + title: "legacy", + }) + await write(path.join(legacy, "storage", "session", "message", "ses_legacy", "msg_legacy.json"), { + role: "user", + text: "hello", + }) + + await withStorage(tmp.path, async (run) => { + const projects = await run(Storage.Service.use((svc) => svc.list(["project"]))) + expect(projects).toHaveLength(1) + const project = projects[0]![1] + + expect(await run(Storage.Service.use((svc) => svc.list(["session", project])))).toEqual([ + ["session", project, "ses_legacy"], + ]) + expect( + await run( + Storage.Service.use((svc) => svc.read<{ id: string; title: string }>(["session", project, "ses_legacy"])), + ), + ).toEqual({ + id: "ses_legacy", + title: "legacy", + }) + expect( + await run( + Storage.Service.use((svc) => + svc.read<{ role: string; text: string }>(["message", "ses_legacy", "msg_legacy"]), + ), + ), + ).toEqual({ + role: "user", + text: "hello", + }) + }) + + expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2") + }) + + test("failed migrations do not advance the marker", async () => { + await using tmp = await tmpdir() + const storage = path.join(tmp.path, "storage") + const legacy = path.join(tmp.path, "project", "legacy") + + await text(path.join(legacy, "storage", "session", "message", "probe", "0.json"), "{") + + await withStorage(tmp.path, async (run) => { + expect(await run(Storage.Service.use((svc) => svc.list(["project"])))).toEqual([]) + }) + + expect(await exists(path.join(storage, "migration"))).toBe(false) + }) })