Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio

### Graph Execution Runtime Boundary

The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claims, leases, evidence expectations, and task-graph/readiness event records.
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.

Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.

Expand Down
105 changes: 96 additions & 9 deletions src/domain/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ export interface TaskClaim {
token: string;
workerId: string;
expiresAt: string;
claimedAt?: string;
recoveredFromToken?: string;
}

export interface ClaimLease extends TaskClaim {
taskId: string;
claimedAt: string;
}

export interface TaskNode {
Expand Down Expand Up @@ -52,7 +59,12 @@ export interface TaskGraphStatusProjection {

export type TaskGraphRuntimeEvent =
| { type: "task_graph.created"; taskIds: string[]; topologicalOrder: string[] }
| { type: "task.ready"; taskId: string; reason: string };
| { type: "task.ready"; taskId: string; reason: string }
| { type: "claim.created"; taskId: string; workerId: string; token: string; expiresAt: string }
| { type: "lease.renewed"; taskId: string; workerId: string; token: string; expiresAt: string }
| { type: "lease.expired"; taskId: string; workerId: string; token: string; expiredAt: string }
| { type: "claim.released"; taskId: string; workerId: string; token: string; releasedAt: string }
| { type: "claim.recovered"; taskId: string; workerId: string; token: string; recoveredFromToken: string; expiresAt: string };

export interface PolicyGraphSketch {
id: string;
Expand Down Expand Up @@ -247,28 +259,97 @@ export function computeBlockedTaskIds(graph: TaskGraph): string[] {
return computeTaskReadiness(graph).filter((item) => item.status === "blocked").map((item) => item.taskId);
}

export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token: string; now: string; leaseExpiresAt: string }): TaskGraph {
const now = parseTimestamp(request.now);
const leaseExpiresAt = parseTimestamp(request.leaseExpiresAt);
if (leaseExpiresAt <= now) throw new Error("claim lease must expire after now");
export function createClaimLease(input: { taskId: string; workerId: string; claimedAt: string; leaseExpiresAt: string; token?: string; recoveredFromToken?: string }): ClaimLease {
const claimedAt = parseTimestamp(input.claimedAt);
const leaseExpiresAt = parseTimestamp(input.leaseExpiresAt);
if (leaseExpiresAt <= claimedAt) throw new Error("claim lease must expire after claimedAt");
return {
taskId: input.taskId,
workerId: input.workerId,
claimedAt: input.claimedAt,
expiresAt: input.leaseExpiresAt,
token: input.token ?? createClaimLeaseToken(input),
...(input.recoveredFromToken ? { recoveredFromToken: input.recoveredFromToken } : {}),
};
}

export function createClaimLeaseToken(input: { taskId: string; workerId: string; claimedAt: string; leaseExpiresAt: string }): string {
const source = `${input.taskId}\u001f${input.workerId}\u001f${input.claimedAt}\u001f${input.leaseExpiresAt}`;
let hash = 2166136261;
for (let index = 0; index < source.length; index += 1) {
hash ^= source.charCodeAt(index);
hash = Math.imul(hash, 16777619) >>> 0;
}
return `claim-${hash.toString(36)}`;
}

export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token?: string; now: string; leaseExpiresAt: string }): TaskGraph {
assertValidGraph(graph);
const lease = createClaimLease({ taskId: request.taskId, workerId: request.workerId, claimedAt: request.now, leaseExpiresAt: request.leaseExpiresAt, token: request.token });
return updateTask(graph, request.taskId, (task) => {
if (task.status !== "ready") throw new Error(`task ${task.id} is not ready to claim`);
if (task.claim && parseTimestamp(task.claim.expiresAt) > now) throw new Error(`task ${task.id} already has an active claim`);
if (task.claim) throw new Error(`task ${task.id} already has a claim; use explicit recovery for expired leases`);
return {
...task,
status: "claimed",
assignedWorker: request.workerId,
attempts: (task.attempts ?? 0) + 1,
claim: { token: request.token, workerId: request.workerId, expiresAt: request.leaseExpiresAt },
claim: lease,
};
});
}

export function renewClaimLease(graph: TaskGraph, request: { taskId: string; claimToken: string; now: string; leaseExpiresAt: string }): TaskGraph {
const now = parseTimestamp(request.now);
const leaseExpiresAt = parseTimestamp(request.leaseExpiresAt);
if (leaseExpiresAt <= now) throw new Error("claim lease must expire after now");
return updateTask(graph, request.taskId, (task) => {
const claim = requireActiveClaim(task, request.claimToken, now);
return { ...task, claim: { ...claim, expiresAt: request.leaseExpiresAt } };
});
}

export function releaseClaim(graph: TaskGraph, request: { taskId: string; claimToken: string; now: string }): TaskGraph {
const now = parseTimestamp(request.now);
return updateTask(graph, request.taskId, (task) => {
requireActiveClaim(task, request.claimToken, now);
return { ...task, status: "ready", assignedWorker: undefined, claim: undefined };
});
}

export function recoverExpiredClaim(graph: TaskGraph, request: { taskId: string; workerId: string; now: string; leaseExpiresAt: string; token?: string }): TaskGraph {
const now = parseTimestamp(request.now);
return updateTask(graph, request.taskId, (task) => {
if (!task.claim) throw new Error(`task ${task.id} has no claim to recover`);
if (parseTimestamp(task.claim.expiresAt) > now) throw new Error(`task ${task.id} claim is still active`);
const leaseInput = {
taskId: task.id,
workerId: request.workerId,
claimedAt: request.now,
leaseExpiresAt: request.leaseExpiresAt,
token: request.token,
recoveredFromToken: task.claim.token,
};
const lease = createClaimLease(leaseInput);
return { ...task, status: "claimed", assignedWorker: request.workerId, attempts: (task.attempts ?? 0) + 1, claim: lease };
});
}

export function createClaimLeaseRuntimeEvent(type: "claim.created" | "lease.renewed", lease: ClaimLease): TaskGraphRuntimeEvent;
export function createClaimLeaseRuntimeEvent(type: "lease.expired", lease: ClaimLease, occurredAt: string): TaskGraphRuntimeEvent;
export function createClaimLeaseRuntimeEvent(type: "claim.released", lease: ClaimLease, occurredAt: string): TaskGraphRuntimeEvent;
export function createClaimLeaseRuntimeEvent(type: "claim.recovered", lease: ClaimLease): TaskGraphRuntimeEvent;
export function createClaimLeaseRuntimeEvent(type: "claim.created" | "lease.renewed" | "lease.expired" | "claim.released" | "claim.recovered", lease: ClaimLease, occurredAt?: string): TaskGraphRuntimeEvent {
if (type === "lease.expired") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, expiredAt: occurredAt ?? lease.expiresAt };
if (type === "claim.released") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, releasedAt: occurredAt ?? lease.expiresAt };
if (type === "claim.recovered") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, recoveredFromToken: lease.recoveredFromToken ?? "", expiresAt: lease.expiresAt };
return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, expiresAt: lease.expiresAt };
}

export function completeTask(graph: TaskGraph, request: { taskId: string; claimToken: string; evidenceRefs: string[]; now: string }): TaskGraph {
const now = parseTimestamp(request.now);
return updateTask(graph, request.taskId, (task) => {
if (!task.claim || task.claim.token !== request.claimToken) throw new Error(`task ${task.id} requires a matching claim token`);
if (parseTimestamp(task.claim.expiresAt) <= now) throw new Error(`task ${task.id} claim expired`);
requireActiveClaim(task, request.claimToken, now);
if (request.evidenceRefs.length === 0) throw new Error(`task ${task.id} requires evidence refs before completion`);
return { ...task, status: "completed", evidenceRefs: [...request.evidenceRefs] };
});
Expand Down Expand Up @@ -342,6 +423,12 @@ function getCompletedOrRepairedTaskIds(graph: TaskGraph): Set<string> {
return completedOrRepaired;
}

function requireActiveClaim(task: TaskNode, claimToken: string, now: number): TaskClaim {
if (!task.claim || task.claim.token !== claimToken) throw new Error(`task ${task.id} requires a matching claim token`);
if (parseTimestamp(task.claim.expiresAt) <= now) throw new Error(`task ${task.id} claim expired`);
return task.claim;
}

function updateTask(graph: TaskGraph, taskId: string, update: (task: TaskNode) => TaskNode): TaskGraph {
let found = false;
const tasks = graph.tasks.map((task) => {
Expand Down
72 changes: 71 additions & 1 deletion test/task-graph.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ import {
computeBlockedTaskIds,
computeReadyTaskIds,
computeTaskReadiness,
createClaimLease,
createClaimLeaseRuntimeEvent,
createClaimLeaseToken,
createTaskGraphRuntimeEvents,
getStaleWorkerIds,
markWorkerStale,
markWorkerTerminal,
projectTaskGraphStatus,
recoverExpiredClaim,
releaseClaim,
renewClaimLease,
topologicalTaskIds,
transitionReadyTasks,
validateTaskGraph,
Expand Down Expand Up @@ -65,6 +71,31 @@ test("task graph exposes topological order, readiness reasons, projection, and e
]);
});

test("claim leases generate tokens, reject non-ready tasks, renew, release, and record events", () => {
const token = createClaimLeaseToken({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" });
assert.equal(token, createClaimLeaseToken({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }));

const lease = createClaimLease({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" });
assert.deepEqual(createClaimLeaseRuntimeEvent("claim.created", lease), { type: "claim.created", taskId: "build", workerId: "w1", token, expiresAt: "2026-01-01T00:10:00.000Z" });

const graph = { tasks: [{ id: "build", status: "ready" as const, dependencies: [] }] };
const claimed = claimTask(graph, { taskId: "build", workerId: "w1", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" });
assert.equal(claimed.tasks[0]?.status, "claimed");
assert.equal(claimed.tasks[0]?.claim?.token, token);

const renewed = renewClaimLease(claimed, { taskId: "build", claimToken: token, now: "2026-01-01T00:05:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
assert.equal(renewed.tasks[0]?.claim?.expiresAt, "2026-01-01T00:20:00.000Z");
assert.deepEqual(createClaimLeaseRuntimeEvent("lease.renewed", { ...lease, expiresAt: "2026-01-01T00:20:00.000Z" }), { type: "lease.renewed", taskId: "build", workerId: "w1", token, expiresAt: "2026-01-01T00:20:00.000Z" });

const released = releaseClaim(renewed, { taskId: "build", claimToken: token, now: "2026-01-01T00:06:00.000Z" });
assert.equal(released.tasks[0]?.status, "ready");
assert.equal(released.tasks[0]?.claim, undefined);

for (const status of ["pending", "blocked", "completed"] as const) {
assert.throws(() => claimTask({ tasks: [{ id: status, status, dependencies: [] }] }, { taskId: status, workerId: "w1", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }), /not ready/);
}
});

test("claim manager rejects duplicate active ownership", () => {
const graph = {
tasks: [
Expand All @@ -86,7 +117,7 @@ test("claim manager rejects duplicate active ownership", () => {
now: "2026-01-01T00:00:00.000Z",
leaseExpiresAt: "2026-01-01T00:10:00.000Z",
}),
/active claim/,
/already has a claim/,
);
});

Expand Down Expand Up @@ -121,6 +152,45 @@ test("task completion requires matching unexpired claim and evidence refs", () =
assert.deepEqual(completed.tasks[0]?.evidenceRefs, ["verify.md#pass"]);
});

test("expired leases block normal completion and recover through explicit recovery", () => {
const graph = {
tasks: [
{
id: "build",
status: "claimed" as const,
dependencies: [],
assignedWorker: "w1",
attempts: 1,
claim: { token: "old", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", expiresAt: "2026-01-01T00:10:00.000Z" },
},
],
};

assert.throws(() => completeTask(graph, { taskId: "build", claimToken: "old", evidenceRefs: ["verify.md#pass"], now: "2026-01-01T00:11:00.000Z" }), /expired/);
assert.deepEqual(createClaimLeaseRuntimeEvent("lease.expired", { ...graph.tasks[0]!.claim!, taskId: "build", claimedAt: "2026-01-01T00:00:00.000Z" }, "2026-01-01T00:11:00.000Z"), {
type: "lease.expired",
taskId: "build",
workerId: "w1",
token: "old",
expiredAt: "2026-01-01T00:11:00.000Z",
});

const recovered = recoverExpiredClaim(graph, { taskId: "build", workerId: "w2", token: "new", now: "2026-01-01T00:11:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
assert.equal(recovered.tasks[0]?.assignedWorker, "w2");
assert.equal(recovered.tasks[0]?.attempts, 2);
assert.deepEqual(recovered.tasks[0]?.claim, { taskId: "build", workerId: "w2", token: "new", claimedAt: "2026-01-01T00:11:00.000Z", expiresAt: "2026-01-01T00:20:00.000Z", recoveredFromToken: "old" });
assert.deepEqual(createClaimLeaseRuntimeEvent("claim.recovered", recovered.tasks[0]!.claim! as typeof recovered.tasks[0]["claim"] & { taskId: string; claimedAt: string }), {
type: "claim.recovered",
taskId: "build",
workerId: "w2",
token: "new",
recoveredFromToken: "old",
expiresAt: "2026-01-01T00:20:00.000Z",
});
assert.doesNotThrow(() => completeTask(recovered, { taskId: "build", claimToken: "new", evidenceRefs: ["verify.md#pass"], now: "2026-01-01T00:12:00.000Z" }));
assert.throws(() => recoverExpiredClaim(recovered, { taskId: "build", workerId: "w3", now: "2026-01-01T00:12:00.000Z", leaseExpiresAt: "2026-01-01T00:30:00.000Z" }), /still active/);
});

test("failed dependencies block downstream tasks unless a repair supersedes them", () => {
const graph = {
tasks: [
Expand Down
Loading