Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions packages/app/src/context/directory-sync-error.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { describe, expect, test } from "bun:test"
import { isSessionNotFoundError, recoverSessionNotFound } from "./directory-sync-error"

describe("isSessionNotFoundError", () => {
test("matches SDK-wrapped session not found errors", () => {
const error = new Error("Session not found: ses_1", {
cause: {
body: {
name: "NotFoundError",
data: {
message: "Session not found: ses_1",
},
},
status: 404,
},
})

expect(isSessionNotFoundError(error)).toBe(true)
})

test("matches tagged session not found errors", () => {
const error = new Error("Session not found", {
cause: {
body: {
name: "SessionNotFoundError",
},
status: 404,
},
})

expect(isSessionNotFoundError(error)).toBe(true)
})

test("does not match unrelated not found errors", () => {
const error = new Error("Provider not found", {
cause: {
body: {
name: "NotFoundError",
data: {
message: "Provider not found: openai",
},
},
status: 404,
},
})

expect(isSessionNotFoundError(error)).toBe(false)
})

test("does not match non-404 session errors", () => {
const error = new Error("Session not found: ses_1", {
cause: {
body: {
name: "NotFoundError",
data: {
message: "Session not found: ses_1",
},
},
status: 500,
},
})

expect(isSessionNotFoundError(error)).toBe(false)
})

test("recovers session not found rejections", async () => {
let recovered = false

await recoverSessionNotFound(
Promise.reject(
new Error("Session not found: ses_1", {
cause: {
body: {
name: "NotFoundError",
data: {
message: "Session not found: ses_1",
},
},
status: 404,
},
}),
),
() => {
recovered = true
},
)

expect(recovered).toBe(true)
})

test("rethrows unrelated rejections", async () => {
const error = new Error("Provider not found", {
cause: {
body: {
name: "NotFoundError",
data: {
message: "Provider not found: openai",
},
},
status: 404,
},
})
let result: unknown

try {
await recoverSessionNotFound(Promise.reject(error), () => {
result = "recovered"
})
} catch (caught) {
result = caught
}

expect(result).toBe(error)
})
})
26 changes: 26 additions & 0 deletions packages/app/src/context/directory-sync-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null
}

export function isSessionNotFoundError(error: unknown) {
if (!(error instanceof Error)) return false
if (!isRecord(error.cause)) return false
if (error.cause.status !== 404) return false
if (!isRecord(error.cause.body)) return false

if (error.cause.body.name === "SessionNotFoundError") return true
if (error.cause.body._tag === "SessionNotFoundError") return true
if (error.cause.body.name !== "NotFoundError") return false

const data = isRecord(error.cause.body.data) ? error.cause.body.data : undefined
return typeof data?.message === "string" && data.message.startsWith("Session not found:")
}

export async function recoverSessionNotFound<T>(promise: Promise<T>, recover: () => void) {
try {
return await promise
} catch (error) {
if (!isSessionNotFoundError(error)) throw error
recover()
}
}
169 changes: 97 additions & 72 deletions packages/app/src/context/directory-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { Message, OpencodeClient, Part } from "@opencode-ai/sdk/v2/client"
import { SESSION_CACHE_LIMIT, dropSessionCaches, pickSessionCacheEvictions } from "./global-sync/session-cache"
import { diffs as list, message as clean } from "@/utils/diffs"
import { useServerSDK } from "./server-sdk"
import { recoverSessionNotFound } from "./directory-sync-error"

const SKIP_PARTS = new Set(["patch", "step-start", "step-finish"])

Expand Down Expand Up @@ -279,6 +280,18 @@ export const createDirSyncContext = (directory: string, serverSync: ReturnType<t
clearMeta(directory, sessionIDs)
}

const forgetMissingSession = (directory: string, setStore: Setter, sessionID: string) => {
seenFor(directory).delete(sessionID)
evict(directory, setStore, [sessionID])
setStore(
"session",
produce((draft) => {
const match = Binary.search(draft, sessionID, (s) => s.id)
if (match.found) draft.splice(match.index, 1)
}),
)
}

const touch = (directory: string, setStore: Setter, sessionID: string) => {
const stale = pickSessionCacheEvictions({
seen: seenFor(directory),
Expand Down Expand Up @@ -435,71 +448,77 @@ export const createDirSyncContext = (directory: string, serverSync: ReturnType<t
})
}

return runInflight(inflight, key, async () => {
const pending = getSessionPrefetchPromise(directory, sessionID)
if (pending) {
await pending
const seeded = getSessionPrefetch(directory, sessionID)
if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) {
batch(() => {
setMeta("limit", key, seeded.limit)
setMeta("cursor", key, seeded.cursor)
setMeta("complete", key, seeded.complete)
setMeta("loading", key, false)
})
}
}

const hasSession = Binary.search(store.session, sessionID, (s) => s.id).found
const cached = store.message[sessionID] !== undefined && meta.limit[key] !== undefined
if (cached && hasSession && !opts?.force) return

const limit = meta.limit[key] ?? initialMessagePageSize
const sessionReq =
hasSession && !opts?.force
? Promise.resolve()
: retry(() => client.session.get({ sessionID })).then((session) => {
if (!tracked(directory, sessionID)) return
const data = session.data
if (!data) return
setStore(
"session",
produce((draft) => {
const match = Binary.search(draft, sessionID, (s) => s.id)
if (match.found) {
draft[match.index] = data
return
}
draft.splice(match.index, 0, data)
}),
)
})

const messagesReq =
cached && !opts?.force
? Promise.resolve()
: loadMessages({
directory,
client,
setStore,
sessionID,
limit,
return recoverSessionNotFound(
runInflight(inflight, key, async () => {
const pending = getSessionPrefetchPromise(directory, sessionID)
if (pending) {
await pending
const seeded = getSessionPrefetch(directory, sessionID)
if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) {
batch(() => {
setMeta("limit", key, seeded.limit)
setMeta("cursor", key, seeded.cursor)
setMeta("complete", key, seeded.complete)
setMeta("loading", key, false)
})
}
}

await Promise.all([sessionReq, messagesReq])
})
const hasSession = Binary.search(store.session, sessionID, (s) => s.id).found
const cached = store.message[sessionID] !== undefined && meta.limit[key] !== undefined
if (cached && hasSession && !opts?.force) return

const limit = meta.limit[key] ?? initialMessagePageSize
const sessionReq =
hasSession && !opts?.force
? Promise.resolve()
: retry(() => client.session.get({ sessionID })).then((session) => {
if (!tracked(directory, sessionID)) return
const data = session.data
if (!data) return
setStore(
"session",
produce((draft) => {
const match = Binary.search(draft, sessionID, (s) => s.id)
if (match.found) {
draft[match.index] = data
return
}
draft.splice(match.index, 0, data)
}),
)
})

const messagesReq =
cached && !opts?.force
? Promise.resolve()
: loadMessages({
directory,
client,
setStore,
sessionID,
limit,
})

await Promise.all([sessionReq, messagesReq])
}),
() => forgetMissingSession(directory, setStore, sessionID),
)
},
async diff(sessionID: string, opts?: { force?: boolean }) {
const [store, setStore] = serverSync.child(directory)
touch(directory, setStore, sessionID)
if (store.session_diff[sessionID] !== undefined && !opts?.force) return

const key = keyFor(directory, sessionID)
return runInflight(inflightDiff, key, () =>
retry(() => client.session.diff({ sessionID })).then((diff) => {
if (!tracked(directory, sessionID)) return
setStore("session_diff", sessionID, reconcile(list(diff.data), { key: "file" }))
}),
return recoverSessionNotFound(
runInflight(inflightDiff, key, () =>
retry(() => client.session.diff({ sessionID })).then((diff) => {
if (!tracked(directory, sessionID)) return
setStore("session_diff", sessionID, reconcile(list(diff.data), { key: "file" }))
}),
),
() => forgetMissingSession(directory, setStore, sessionID),
)
},
async todo(sessionID: string, opts?: { force?: boolean }) {
Expand All @@ -519,13 +538,16 @@ export const createDirSyncContext = (directory: string, serverSync: ReturnType<t
}

const key = keyFor(directory, sessionID)
return runInflight(inflightTodo, key, () =>
retry(() => client.session.todo({ sessionID })).then((todo) => {
if (!tracked(directory, sessionID)) return
const list = todo.data ?? []
setStore("todo", sessionID, reconcile(list, { key: "id" }))
serverSync.todo.set(sessionID, list)
}),
return recoverSessionNotFound(
runInflight(inflightTodo, key, () =>
retry(() => client.session.todo({ sessionID })).then((todo) => {
if (!tracked(directory, sessionID)) return
const list = todo.data ?? []
setStore("todo", sessionID, reconcile(list, { key: "id" }))
serverSync.todo.set(sessionID, list)
}),
),
() => forgetMissingSession(directory, setStore, sessionID),
)
},
history: {
Expand All @@ -551,15 +573,18 @@ export const createDirSyncContext = (directory: string, serverSync: ReturnType<t
const before = meta.cursor[key]
if (!before) return

await loadMessages({
directory,
client,
setStore,
sessionID,
limit: step,
before,
mode: "prepend",
})
await recoverSessionNotFound(
loadMessages({
directory,
client,
setStore,
sessionID,
limit: step,
before,
mode: "prepend",
}),
() => forgetMissingSession(directory, setStore, sessionID),
)
},
},
evict(sessionID: string, _directory = directory) {
Expand Down
Loading