diff --git a/README.md b/README.md index 5ce93a1..fe5efdb 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,7 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio ### Graph Execution Runtime Boundary -The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claims, leases, evidence expectations, and task-graph/readiness event records. +The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records. Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed. diff --git a/src/domain/task-graph.ts b/src/domain/task-graph.ts index ccc373c..2bf1a71 100644 --- a/src/domain/task-graph.ts +++ b/src/domain/task-graph.ts @@ -4,6 +4,13 @@ export interface TaskClaim { token: string; workerId: string; expiresAt: string; + claimedAt?: string; + recoveredFromToken?: string; +} + +export interface ClaimLease extends TaskClaim { + taskId: string; + claimedAt: string; } export interface TaskNode { @@ -52,7 +59,12 @@ export interface TaskGraphStatusProjection { export type TaskGraphRuntimeEvent = | { type: "task_graph.created"; taskIds: string[]; topologicalOrder: string[] } - | { type: "task.ready"; taskId: string; reason: string }; + | { type: "task.ready"; taskId: string; reason: string } + | { type: "claim.created"; taskId: string; workerId: string; token: string; expiresAt: string } + | { type: "lease.renewed"; taskId: string; workerId: string; token: string; expiresAt: string } + | { type: "lease.expired"; taskId: string; workerId: string; token: string; expiredAt: string } + | { type: "claim.released"; taskId: string; workerId: string; token: string; releasedAt: string } + | { type: "claim.recovered"; taskId: string; workerId: string; token: string; recoveredFromToken: string; expiresAt: string }; export interface PolicyGraphSketch { id: string; @@ -247,28 +259,97 @@ export function computeBlockedTaskIds(graph: TaskGraph): string[] { return computeTaskReadiness(graph).filter((item) => item.status === "blocked").map((item) => item.taskId); } -export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token: string; now: string; leaseExpiresAt: string }): TaskGraph { - const now = parseTimestamp(request.now); - const leaseExpiresAt = parseTimestamp(request.leaseExpiresAt); - if (leaseExpiresAt <= now) throw new Error("claim lease must expire after now"); +export function createClaimLease(input: { taskId: string; workerId: string; claimedAt: string; leaseExpiresAt: string; token?: string; recoveredFromToken?: string }): ClaimLease { + const claimedAt = parseTimestamp(input.claimedAt); + const leaseExpiresAt = parseTimestamp(input.leaseExpiresAt); + if (leaseExpiresAt <= claimedAt) throw new Error("claim lease must expire after claimedAt"); + return { + taskId: input.taskId, + workerId: input.workerId, + claimedAt: input.claimedAt, + expiresAt: input.leaseExpiresAt, + token: input.token ?? createClaimLeaseToken(input), + ...(input.recoveredFromToken ? { recoveredFromToken: input.recoveredFromToken } : {}), + }; +} + +export function createClaimLeaseToken(input: { taskId: string; workerId: string; claimedAt: string; leaseExpiresAt: string }): string { + const source = `${input.taskId}\u001f${input.workerId}\u001f${input.claimedAt}\u001f${input.leaseExpiresAt}`; + let hash = 2166136261; + for (let index = 0; index < source.length; index += 1) { + hash ^= source.charCodeAt(index); + hash = Math.imul(hash, 16777619) >>> 0; + } + return `claim-${hash.toString(36)}`; +} + +export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token?: string; now: string; leaseExpiresAt: string }): TaskGraph { + assertValidGraph(graph); + const lease = createClaimLease({ taskId: request.taskId, workerId: request.workerId, claimedAt: request.now, leaseExpiresAt: request.leaseExpiresAt, token: request.token }); return updateTask(graph, request.taskId, (task) => { if (task.status !== "ready") throw new Error(`task ${task.id} is not ready to claim`); - if (task.claim && parseTimestamp(task.claim.expiresAt) > now) throw new Error(`task ${task.id} already has an active claim`); + if (task.claim) throw new Error(`task ${task.id} already has a claim; use explicit recovery for expired leases`); return { ...task, status: "claimed", assignedWorker: request.workerId, attempts: (task.attempts ?? 0) + 1, - claim: { token: request.token, workerId: request.workerId, expiresAt: request.leaseExpiresAt }, + claim: lease, + }; + }); +} + +export function renewClaimLease(graph: TaskGraph, request: { taskId: string; claimToken: string; now: string; leaseExpiresAt: string }): TaskGraph { + const now = parseTimestamp(request.now); + const leaseExpiresAt = parseTimestamp(request.leaseExpiresAt); + if (leaseExpiresAt <= now) throw new Error("claim lease must expire after now"); + return updateTask(graph, request.taskId, (task) => { + const claim = requireActiveClaim(task, request.claimToken, now); + return { ...task, claim: { ...claim, expiresAt: request.leaseExpiresAt } }; + }); +} + +export function releaseClaim(graph: TaskGraph, request: { taskId: string; claimToken: string; now: string }): TaskGraph { + const now = parseTimestamp(request.now); + return updateTask(graph, request.taskId, (task) => { + requireActiveClaim(task, request.claimToken, now); + return { ...task, status: "ready", assignedWorker: undefined, claim: undefined }; + }); +} + +export function recoverExpiredClaim(graph: TaskGraph, request: { taskId: string; workerId: string; now: string; leaseExpiresAt: string; token?: string }): TaskGraph { + const now = parseTimestamp(request.now); + return updateTask(graph, request.taskId, (task) => { + if (!task.claim) throw new Error(`task ${task.id} has no claim to recover`); + if (parseTimestamp(task.claim.expiresAt) > now) throw new Error(`task ${task.id} claim is still active`); + const leaseInput = { + taskId: task.id, + workerId: request.workerId, + claimedAt: request.now, + leaseExpiresAt: request.leaseExpiresAt, + token: request.token, + recoveredFromToken: task.claim.token, }; + const lease = createClaimLease(leaseInput); + return { ...task, status: "claimed", assignedWorker: request.workerId, attempts: (task.attempts ?? 0) + 1, claim: lease }; }); } +export function createClaimLeaseRuntimeEvent(type: "claim.created" | "lease.renewed", lease: ClaimLease): TaskGraphRuntimeEvent; +export function createClaimLeaseRuntimeEvent(type: "lease.expired", lease: ClaimLease, occurredAt: string): TaskGraphRuntimeEvent; +export function createClaimLeaseRuntimeEvent(type: "claim.released", lease: ClaimLease, occurredAt: string): TaskGraphRuntimeEvent; +export function createClaimLeaseRuntimeEvent(type: "claim.recovered", lease: ClaimLease): TaskGraphRuntimeEvent; +export function createClaimLeaseRuntimeEvent(type: "claim.created" | "lease.renewed" | "lease.expired" | "claim.released" | "claim.recovered", lease: ClaimLease, occurredAt?: string): TaskGraphRuntimeEvent { + if (type === "lease.expired") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, expiredAt: occurredAt ?? lease.expiresAt }; + if (type === "claim.released") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, releasedAt: occurredAt ?? lease.expiresAt }; + if (type === "claim.recovered") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, recoveredFromToken: lease.recoveredFromToken ?? "", expiresAt: lease.expiresAt }; + return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, expiresAt: lease.expiresAt }; +} + export function completeTask(graph: TaskGraph, request: { taskId: string; claimToken: string; evidenceRefs: string[]; now: string }): TaskGraph { const now = parseTimestamp(request.now); return updateTask(graph, request.taskId, (task) => { - if (!task.claim || task.claim.token !== request.claimToken) throw new Error(`task ${task.id} requires a matching claim token`); - if (parseTimestamp(task.claim.expiresAt) <= now) throw new Error(`task ${task.id} claim expired`); + requireActiveClaim(task, request.claimToken, now); if (request.evidenceRefs.length === 0) throw new Error(`task ${task.id} requires evidence refs before completion`); return { ...task, status: "completed", evidenceRefs: [...request.evidenceRefs] }; }); @@ -342,6 +423,12 @@ function getCompletedOrRepairedTaskIds(graph: TaskGraph): Set { return completedOrRepaired; } +function requireActiveClaim(task: TaskNode, claimToken: string, now: number): TaskClaim { + if (!task.claim || task.claim.token !== claimToken) throw new Error(`task ${task.id} requires a matching claim token`); + if (parseTimestamp(task.claim.expiresAt) <= now) throw new Error(`task ${task.id} claim expired`); + return task.claim; +} + function updateTask(graph: TaskGraph, taskId: string, update: (task: TaskNode) => TaskNode): TaskGraph { let found = false; const tasks = graph.tasks.map((task) => { diff --git a/test/task-graph.test.ts b/test/task-graph.test.ts index db90277..d3274a7 100644 --- a/test/task-graph.test.ts +++ b/test/task-graph.test.ts @@ -6,11 +6,17 @@ import { computeBlockedTaskIds, computeReadyTaskIds, computeTaskReadiness, + createClaimLease, + createClaimLeaseRuntimeEvent, + createClaimLeaseToken, createTaskGraphRuntimeEvents, getStaleWorkerIds, markWorkerStale, markWorkerTerminal, projectTaskGraphStatus, + recoverExpiredClaim, + releaseClaim, + renewClaimLease, topologicalTaskIds, transitionReadyTasks, validateTaskGraph, @@ -65,6 +71,31 @@ test("task graph exposes topological order, readiness reasons, projection, and e ]); }); +test("claim leases generate tokens, reject non-ready tasks, renew, release, and record events", () => { + const token = createClaimLeaseToken({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }); + assert.equal(token, createClaimLeaseToken({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" })); + + const lease = createClaimLease({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }); + assert.deepEqual(createClaimLeaseRuntimeEvent("claim.created", lease), { type: "claim.created", taskId: "build", workerId: "w1", token, expiresAt: "2026-01-01T00:10:00.000Z" }); + + const graph = { tasks: [{ id: "build", status: "ready" as const, dependencies: [] }] }; + const claimed = claimTask(graph, { taskId: "build", workerId: "w1", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }); + assert.equal(claimed.tasks[0]?.status, "claimed"); + assert.equal(claimed.tasks[0]?.claim?.token, token); + + const renewed = renewClaimLease(claimed, { taskId: "build", claimToken: token, now: "2026-01-01T00:05:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" }); + assert.equal(renewed.tasks[0]?.claim?.expiresAt, "2026-01-01T00:20:00.000Z"); + assert.deepEqual(createClaimLeaseRuntimeEvent("lease.renewed", { ...lease, expiresAt: "2026-01-01T00:20:00.000Z" }), { type: "lease.renewed", taskId: "build", workerId: "w1", token, expiresAt: "2026-01-01T00:20:00.000Z" }); + + const released = releaseClaim(renewed, { taskId: "build", claimToken: token, now: "2026-01-01T00:06:00.000Z" }); + assert.equal(released.tasks[0]?.status, "ready"); + assert.equal(released.tasks[0]?.claim, undefined); + + for (const status of ["pending", "blocked", "completed"] as const) { + assert.throws(() => claimTask({ tasks: [{ id: status, status, dependencies: [] }] }, { taskId: status, workerId: "w1", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }), /not ready/); + } +}); + test("claim manager rejects duplicate active ownership", () => { const graph = { tasks: [ @@ -86,7 +117,7 @@ test("claim manager rejects duplicate active ownership", () => { now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z", }), - /active claim/, + /already has a claim/, ); }); @@ -121,6 +152,45 @@ test("task completion requires matching unexpired claim and evidence refs", () = assert.deepEqual(completed.tasks[0]?.evidenceRefs, ["verify.md#pass"]); }); +test("expired leases block normal completion and recover through explicit recovery", () => { + const graph = { + tasks: [ + { + id: "build", + status: "claimed" as const, + dependencies: [], + assignedWorker: "w1", + attempts: 1, + claim: { token: "old", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", expiresAt: "2026-01-01T00:10:00.000Z" }, + }, + ], + }; + + assert.throws(() => completeTask(graph, { taskId: "build", claimToken: "old", evidenceRefs: ["verify.md#pass"], now: "2026-01-01T00:11:00.000Z" }), /expired/); + assert.deepEqual(createClaimLeaseRuntimeEvent("lease.expired", { ...graph.tasks[0]!.claim!, taskId: "build", claimedAt: "2026-01-01T00:00:00.000Z" }, "2026-01-01T00:11:00.000Z"), { + type: "lease.expired", + taskId: "build", + workerId: "w1", + token: "old", + expiredAt: "2026-01-01T00:11:00.000Z", + }); + + const recovered = recoverExpiredClaim(graph, { taskId: "build", workerId: "w2", token: "new", now: "2026-01-01T00:11:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" }); + assert.equal(recovered.tasks[0]?.assignedWorker, "w2"); + assert.equal(recovered.tasks[0]?.attempts, 2); + assert.deepEqual(recovered.tasks[0]?.claim, { taskId: "build", workerId: "w2", token: "new", claimedAt: "2026-01-01T00:11:00.000Z", expiresAt: "2026-01-01T00:20:00.000Z", recoveredFromToken: "old" }); + assert.deepEqual(createClaimLeaseRuntimeEvent("claim.recovered", recovered.tasks[0]!.claim! as typeof recovered.tasks[0]["claim"] & { taskId: string; claimedAt: string }), { + type: "claim.recovered", + taskId: "build", + workerId: "w2", + token: "new", + recoveredFromToken: "old", + expiresAt: "2026-01-01T00:20:00.000Z", + }); + assert.doesNotThrow(() => completeTask(recovered, { taskId: "build", claimToken: "new", evidenceRefs: ["verify.md#pass"], now: "2026-01-01T00:12:00.000Z" })); + assert.throws(() => recoverExpiredClaim(recovered, { taskId: "build", workerId: "w3", now: "2026-01-01T00:12:00.000Z", leaseExpiresAt: "2026-01-01T00:30:00.000Z" }), /still active/); +}); + test("failed dependencies block downstream tasks unless a repair supersedes them", () => { const graph = { tasks: [