Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,47 @@ describe("orchestration watcher resilience", () => {
await svc.dispose();
});

it("blocks manifest writes after an external manifest runId swap", async () => {
const svc = createOrchestrationService({ resolveLaneWorktree: () => lane });
const { manifest, etag } = await svc.runCreate({
laneId: "L-1",
leadSessionId: "S-lead",
bundleRoot: lane,
title: "Original run",
});
await svc.subscribe(manifest.runId, manifest.bundlePath);
const manifestPath = path.join(manifest.bundlePath, "manifest.json");
const foreign = {
...JSON.parse(await fsp.readFile(manifestPath, "utf-8")),
runId: "R-foreign-checkout",
etag: "etag-foreign",
title: "Foreign branch manifest",
};
// Wait past the self-write suppression window used by persistManifest.
await new Promise((resolve) => setTimeout(resolve, 1_100));
await fsp.writeFile(manifestPath, JSON.stringify(foreign, null, 2));
await new Promise((resolve) => setTimeout(resolve, 120));

const patch = await svc.manifestPatch(
{
runId: manifest.runId,
ifMatchEtag: etag,
actorRole: "lead",
actorSessionId: "S-lead",
patches: [{ op: "replace", path: "/title", value: "Stale write attempt" }],
},
manifest.bundlePath,
);
expect(patch.ok).toBe(false);
if (patch.ok) return;
expect(patch.message).toContain("suspended");

const onDisk = JSON.parse(await fsp.readFile(manifestPath, "utf-8"));
expect(onDisk.runId).toBe("R-foreign-checkout");
expect(onDisk.title).toBe("Foreign branch manifest");
await svc.dispose();
});

it("planAppend produces an event with the new contents", async () => {
const svc = createOrchestrationService({ resolveLaneWorktree: () => lane });
const { manifest } = await svc.runCreate({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const WATCHER_IDLE_CLOSE_MS = 30_000;
const ORCHESTRATION_INDEX_VERSION = 1;
const RUN_LIST_DEFAULT_LIMIT = 100;
const RUN_LIST_MAX_LIMIT = 250;
const RUN_SUSPENDED_MESSAGE =
"orchestration run is suspended (bundle changed externally); re-open the run or restore the correct branch";

export type OrchestrationServiceEvents = {
event: (payload: OrchestrationEventPayload) => void;
Expand Down Expand Up @@ -449,9 +451,10 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
);
}
if (manifest.runId !== runtime.runId) {
throw new Error(
`manifest.runId ${manifest.runId} does not match expected ${runtime.runId}`,
);
runtime.suspended = true;
runtime.manifest = null;
runtime.planMd = null;
return;
}
runtime.manifest = normalizeManifestShape(manifest);
} catch (err) {
Expand Down Expand Up @@ -565,6 +568,8 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
// file), do not blindly etag-bump; mark suspended and ignore.
if (next.runId !== runtime.runId) {
runtime.suspended = true;
runtime.manifest = null;
runtime.planMd = null;
emit({
runId: runtime.runId,
kind: "lifecycle",
Expand Down Expand Up @@ -626,6 +631,12 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
runtime.planMd = plan;
}

function assertRunWritable(runtime: RunRuntime): void {
if (runtime.suspended) {
throw new Error(RUN_SUSPENDED_MESSAGE);
}
}

// --------------------------------------------------------------------------
// Public API
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -731,6 +742,13 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
if (runtime.suspended) {
return {
ok: false,
error: "validation_failed",
message: RUN_SUSPENDED_MESSAGE,
};
}
const current = runtime.manifest;
if (!current) {
return {
Expand Down Expand Up @@ -849,6 +867,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
assertRunWritable(runtime);
if (!runtime.manifest) throw new Error(`run ${req.runId} not found`);
const prev = runtime.planMd ?? "";
const heading = req.section.startsWith("#")
Expand Down Expand Up @@ -876,6 +895,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
assertRunWritable(runtime);
if (!runtime.manifest) throw new Error(`run ${req.runId} not found`);
if (runtime.manifest.etag !== req.ifMatchEtag) {
return { error: "etag_conflict", etag: runtime.manifest.etag };
Expand All @@ -900,6 +920,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
assertRunWritable(runtime);
if (!runtime.manifest) throw new Error(`run ${req.runId} not found`);
const id = `A-${runtime.manifest.assets.length + 1}-${shortRand()}`;
const asset: OrchestrationAsset = {
Expand Down Expand Up @@ -935,6 +956,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
assertRunWritable(runtime);
const manifest = runtime.manifest!;
const registeredAgent = manifest.agents.find(
(agent) => agent.sessionId === req.sessionId,
Expand Down Expand Up @@ -1014,6 +1036,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
assertRunWritable(runtime);
const manifest = runtime.manifest;
if (!manifest) throw new Error(`run ${req.runId} not found`);
const task = manifest.tasks.find((entry) => entry.id === req.taskId);
Expand Down Expand Up @@ -1114,6 +1137,13 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
if (runtime.suspended) {
return {
ok: false,
error: "validation_failed",
message: RUN_SUSPENDED_MESSAGE,
};
}
const manifest = runtime.manifest;
if (!manifest) {
return {
Expand Down Expand Up @@ -1267,6 +1297,9 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(req.runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
if (runtime.suspended) {
return { ok: false, reason: RUN_SUSPENDED_MESSAGE };
}
const manifest = runtime.manifest;
if (!manifest) return { ok: false, reason: `run ${req.runId} not found` };
if (!manifest.agents.some((agent) => agent.sessionId === req.sessionId)) {
Expand Down Expand Up @@ -1401,6 +1434,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
patches: readonly ManifestPatchOp[],
summary: string,
): Promise<{ manifest: OrchestrationManifest; etag: string }> {
assertRunWritable(runtime);
if (!runtime.manifest) throw new Error("manifest not loaded");
const next = normalizeManifestShape(applyPatches(runtime.manifest, patches));
const updatedAt = nowIso();
Expand Down Expand Up @@ -1439,6 +1473,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
const runtime = getOrCreateRuntime(runId, bundlePath);
return runtime.mutex.run(async () => {
await loadIntoRuntime(runtime);
assertRunWritable(runtime);
if (!runtime.manifest) {
return { ok: false, error: "run_not_found", message: `run ${runId} not found` };
}
Expand Down
Loading