Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio

### Graph Execution Runtime Boundary

The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, dependencies, readiness, claims, leases, and evidence expectations.
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claims, leases, evidence expectations, and task-graph/readiness event records.

Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.

Expand Down
136 changes: 129 additions & 7 deletions src/domain/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export interface TaskClaim {

export interface TaskNode {
id: string;
title?: string;
description?: string;
status: TaskStatus;
dependencies: string[];
supersedes?: string[];
Expand All @@ -22,6 +24,36 @@ export interface TaskGraph {
tasks: TaskNode[];
}

export interface RuntimeTask extends TaskNode {
title: string;
description: string;
}

export type TaskReadinessStatus = "ready" | "waiting" | "blocked" | "not_pending";

export interface TaskReadiness {
taskId: string;
status: TaskReadinessStatus;
ready: boolean;
waitingFor: string[];
blockedBy: string[];
reasons: string[];
}

export interface TaskGraphStatusProjection {
total: number;
ready: string[];
blocked: string[];
waiting: string[];
completed: string[];
failed: string[];
readiness: TaskReadiness[];
}

export type TaskGraphRuntimeEvent =
| { type: "task_graph.created"; taskIds: string[]; topologicalOrder: string[] }
| { type: "task.ready"; taskId: string; reason: string };

export interface PolicyGraphSketch {
id: string;
purpose: string;
Expand Down Expand Up @@ -145,11 +177,64 @@ export function validateTaskGraph(graph: TaskGraph): TaskGraphValidationIssue[]
}

export function computeReadyTaskIds(graph: TaskGraph): string[] {
return computeTaskReadiness(graph).filter((item) => item.ready).map((item) => item.taskId);
}

export function computeTaskReadiness(graph: TaskGraph): TaskReadiness[] {
assertValidGraph(graph);
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
const completedOrRepaired = getCompletedOrRepairedTaskIds(graph);
const blocked = new Set(computeBlockedTaskIds(graph));
return graph.tasks
.filter((task) => task.status === "pending" && !blocked.has(task.id) && task.dependencies.every((dependency) => completedOrRepaired.has(dependency)))
.map((task) => task.id);
const failed = getFailedBlockingTaskIds(graph);
const failedBlockersByTask = computeFailedBlockersByTask(graph, failed);
return graph.tasks.map((task) => {
if (task.status !== "pending") {
return { taskId: task.id, status: "not_pending", ready: false, waitingFor: [], blockedBy: [], reasons: [`task is ${task.status}`] };
}
const blockedBy = failedBlockersByTask.get(task.id) ?? [];
const waitingFor = task.dependencies.filter((dependency) => !completedOrRepaired.has(dependency) && !failed.has(dependency) && (failedBlockersByTask.get(dependency) ?? []).length === 0);
if (blockedBy.length > 0) {
return { taskId: task.id, status: "blocked", ready: false, waitingFor: [], blockedBy, reasons: blockedBy.map((dependency) => describeFailedBlocker(task, dependency, tasksById, failed)) };
}
if (waitingFor.length > 0) return { taskId: task.id, status: "waiting", ready: false, waitingFor, blockedBy, reasons: waitingFor.map((dependency) => `waiting for dependency ${dependency}`) };
return { taskId: task.id, status: "ready", ready: true, waitingFor, blockedBy, reasons: [task.dependencies.length === 0 ? "no dependencies" : "all dependencies completed"] };
});
}

export function projectTaskGraphStatus(graph: TaskGraph): TaskGraphStatusProjection {
const readiness = computeTaskReadiness(graph);
const readinessById = new Map(readiness.map((item) => [item.taskId, item]));
return {
total: graph.tasks.length,
ready: readiness.filter((item) => item.ready).map((item) => item.taskId),
blocked: readiness.filter((item) => item.status === "blocked").map((item) => item.taskId),
waiting: readiness.filter((item) => item.status === "waiting").map((item) => item.taskId),
completed: graph.tasks.filter((task) => task.status === "completed").map((task) => task.id),
failed: graph.tasks.filter((task) => task.status === "failed" && readinessById.get(task.id)?.status !== "blocked").map((task) => task.id),
readiness,
};
}

export function topologicalTaskIds(graph: TaskGraph): string[] {
assertValidGraph(graph);
const remaining = new Map(graph.tasks.map((task) => [task.id, new Set(task.dependencies)]));
const ordered: string[] = [];
while (remaining.size > 0) {
const next = Array.from(remaining.entries()).filter(([, dependencies]) => dependencies.size === 0).map(([taskId]) => taskId).sort();
if (next.length === 0) throw new Error("invalid task graph: dependency_cycle");
for (const taskId of next) {
ordered.push(taskId);
remaining.delete(taskId);
for (const dependencies of Array.from(remaining.values())) dependencies.delete(taskId);
}
}
return ordered;
}

export function createTaskGraphRuntimeEvents(graph: TaskGraph): TaskGraphRuntimeEvent[] {
return [
{ type: "task_graph.created", taskIds: graph.tasks.map((task) => task.id), topologicalOrder: topologicalTaskIds(graph) },
...computeTaskReadiness(graph).filter((item) => item.ready).map((item) => ({ type: "task.ready" as const, taskId: item.taskId, reason: item.reasons.join("; ") })),
];
}

export function transitionReadyTasks(graph: TaskGraph): TaskGraph {
Expand All @@ -159,9 +244,7 @@ export function transitionReadyTasks(graph: TaskGraph): TaskGraph {
}

export function computeBlockedTaskIds(graph: TaskGraph): string[] {
const completedOrRepaired = getCompletedOrRepairedTaskIds(graph);
const failed = new Set(graph.tasks.filter((task) => task.status === "failed" && !completedOrRepaired.has(task.id)).map((task) => task.id));
return graph.tasks.filter((task) => task.status === "pending" && task.dependencies.some((dependency) => failed.has(dependency))).map((task) => task.id);
return computeTaskReadiness(graph).filter((item) => item.status === "blocked").map((item) => item.taskId);
}

export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token: string; now: string; leaseExpiresAt: string }): TaskGraph {
Expand Down Expand Up @@ -211,6 +294,45 @@ function assertValidGraph(graph: TaskGraph): void {
if (issues.length > 0) throw new Error(`invalid task graph: ${issues.map((issue) => issue.code).join(", ")}`);
}

function computeFailedBlockersByTask(graph: TaskGraph, failed: Set<string>): Map<string, string[]> {
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
const memo = new Map<string, string[]>();
const collect = (taskId: string): string[] => {
const cached = memo.get(taskId);
if (cached) return cached;
const task = tasksById.get(taskId);
if (!task) return [];
const blockers = new Set<string>();
for (const dependency of task.dependencies) {
if (failed.has(dependency)) blockers.add(dependency);
for (const upstream of collect(dependency)) blockers.add(upstream);
}
const result = Array.from(blockers).sort();
memo.set(taskId, result);
return result;
};
for (const task of graph.tasks) collect(task.id);
return memo;
}

function describeFailedBlocker(task: TaskNode, failedTaskId: string, tasksById: Map<string, TaskNode>, failed: Set<string>): string {
if (task.dependencies.includes(failedTaskId) && failed.has(failedTaskId)) return `dependency ${failedTaskId} failed`;
const via = task.dependencies.find((dependency) => dependsOnFailedTask(dependency, failedTaskId, tasksById));
return via ? `blocked by failed upstream dependency ${failedTaskId} via ${via}` : `blocked by failed upstream dependency ${failedTaskId}`;
}

function dependsOnFailedTask(taskId: string, failedTaskId: string, tasksById: Map<string, TaskNode>): boolean {
const task = tasksById.get(taskId);
if (!task) return false;
if (task.dependencies.includes(failedTaskId)) return true;
return task.dependencies.some((dependency) => dependsOnFailedTask(dependency, failedTaskId, tasksById));
}

function getFailedBlockingTaskIds(graph: TaskGraph): Set<string> {
const completedOrRepaired = getCompletedOrRepairedTaskIds(graph);
return new Set(graph.tasks.filter((task) => task.status === "failed" && !completedOrRepaired.has(task.id)).map((task) => task.id));
}

function getCompletedOrRepairedTaskIds(graph: TaskGraph): Set<string> {
const completedOrRepaired = new Set(graph.tasks.filter((task) => task.status === "completed").map((task) => task.id));
for (const task of graph.tasks) {
Expand Down
54 changes: 52 additions & 2 deletions test/task-graph.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import {
completeTask,
computeBlockedTaskIds,
computeReadyTaskIds,
computeTaskReadiness,
createTaskGraphRuntimeEvents,
getStaleWorkerIds,
markWorkerStale,
markWorkerTerminal,
projectTaskGraphStatus,
topologicalTaskIds,
transitionReadyTasks,
validateTaskGraph,
} from "../src/domain/task-graph.js";
Expand All @@ -24,6 +28,43 @@ test("task graph readiness waits for completed dependencies", () => {
assert.deepEqual(ready, ["build"]);
});

test("task graph exposes topological order, readiness reasons, projection, and events", () => {
const graph = {
tasks: [
{ id: "prepare", title: "Prepare", description: "Prepare inputs", status: "completed" as const, dependencies: [] },
{ id: "branch-a", title: "A", description: "Parallel A", status: "pending" as const, dependencies: ["prepare"] },
{ id: "branch-b", title: "B", description: "Parallel B", status: "pending" as const, dependencies: ["prepare"] },
{ id: "join", title: "Join", description: "Join branches", status: "pending" as const, dependencies: ["branch-a", "branch-b"] },
{ id: "seal", title: "Seal", description: "Seal run", status: "pending" as const, dependencies: ["join"] },
],
};

assert.deepEqual(topologicalTaskIds(graph), ["prepare", "branch-a", "branch-b", "join", "seal"]);
assert.deepEqual(computeReadyTaskIds(graph), ["branch-a", "branch-b"]);
assert.deepEqual(computeTaskReadiness(graph).find((item) => item.taskId === "join"), {
taskId: "join",
status: "waiting",
ready: false,
waitingFor: ["branch-a", "branch-b"],
blockedBy: [],
reasons: ["waiting for dependency branch-a", "waiting for dependency branch-b"],
});
assert.deepEqual(projectTaskGraphStatus(graph), {
total: 5,
ready: ["branch-a", "branch-b"],
blocked: [],
waiting: ["join", "seal"],
completed: ["prepare"],
failed: [],
readiness: computeTaskReadiness(graph),
});
assert.deepEqual(createTaskGraphRuntimeEvents(graph), [
{ type: "task_graph.created", taskIds: ["prepare", "branch-a", "branch-b", "join", "seal"], topologicalOrder: ["prepare", "branch-a", "branch-b", "join", "seal"] },
{ type: "task.ready", taskId: "branch-a", reason: "all dependencies completed" },
{ type: "task.ready", taskId: "branch-b", reason: "all dependencies completed" },
]);
});

test("claim manager rejects duplicate active ownership", () => {
const graph = {
tasks: [
Expand Down Expand Up @@ -86,13 +127,22 @@ test("failed dependencies block downstream tasks unless a repair supersedes them
{ id: "build", status: "failed" as const, dependencies: [] },
{ id: "repair-build", status: "completed" as const, dependencies: [], supersedes: ["build"] },
{ id: "verify", status: "pending" as const, dependencies: ["build"] },
{ id: "publish", status: "pending" as const, dependencies: ["missing-fix"] },
{ id: "missing-fix", status: "failed" as const, dependencies: [] },
{ id: "publish", status: "pending" as const, dependencies: ["missing-fix"] },
{ id: "announce", status: "pending" as const, dependencies: ["publish"] },
],
};

assert.deepEqual(computeReadyTaskIds(graph), ["verify"]);
assert.deepEqual(computeBlockedTaskIds(graph), ["publish"]);
assert.deepEqual(computeBlockedTaskIds(graph), ["publish", "announce"]);
assert.deepEqual(computeTaskReadiness(graph).find((item) => item.taskId === "announce"), {
taskId: "announce",
status: "blocked",
ready: false,
waitingFor: [],
blockedBy: ["missing-fix"],
reasons: ["blocked by failed upstream dependency missing-fix via publish"],
});
});

test("worker heartbeat stale detection and terminal retention are modeled", () => {
Expand Down
Loading