diff --git a/README.md b/README.md index 60f5f5f..5ce93a1 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,7 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio ### Graph Execution Runtime Boundary -The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, dependencies, readiness, claims, leases, and evidence expectations. +The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claims, leases, evidence expectations, and task-graph/readiness event records. Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed. diff --git a/src/domain/task-graph.ts b/src/domain/task-graph.ts index bc71428..ccc373c 100644 --- a/src/domain/task-graph.ts +++ b/src/domain/task-graph.ts @@ -8,6 +8,8 @@ export interface TaskClaim { export interface TaskNode { id: string; + title?: string; + description?: string; status: TaskStatus; dependencies: string[]; supersedes?: string[]; @@ -22,6 +24,36 @@ export interface TaskGraph { tasks: TaskNode[]; } +export interface RuntimeTask extends TaskNode { + title: string; + description: string; +} + +export type TaskReadinessStatus = "ready" | "waiting" | "blocked" | "not_pending"; + +export interface TaskReadiness { + taskId: string; + status: TaskReadinessStatus; + ready: boolean; + waitingFor: string[]; + blockedBy: string[]; + reasons: string[]; +} + +export interface TaskGraphStatusProjection { + total: number; + ready: string[]; + blocked: string[]; + waiting: string[]; + completed: string[]; + failed: string[]; + readiness: TaskReadiness[]; +} + +export type TaskGraphRuntimeEvent = + | { type: "task_graph.created"; taskIds: string[]; topologicalOrder: string[] } + | { type: "task.ready"; taskId: string; reason: string }; + export interface PolicyGraphSketch { id: string; purpose: string; @@ -145,11 +177,64 @@ export function validateTaskGraph(graph: TaskGraph): TaskGraphValidationIssue[] } export function computeReadyTaskIds(graph: TaskGraph): string[] { + return computeTaskReadiness(graph).filter((item) => item.ready).map((item) => item.taskId); +} + +export function computeTaskReadiness(graph: TaskGraph): TaskReadiness[] { + assertValidGraph(graph); + const tasksById = new Map(graph.tasks.map((task) => [task.id, task])); const completedOrRepaired = getCompletedOrRepairedTaskIds(graph); - const blocked = new Set(computeBlockedTaskIds(graph)); - return graph.tasks - .filter((task) => task.status === "pending" && !blocked.has(task.id) && task.dependencies.every((dependency) => completedOrRepaired.has(dependency))) - .map((task) => task.id); + const failed = getFailedBlockingTaskIds(graph); + const failedBlockersByTask = computeFailedBlockersByTask(graph, failed); + return graph.tasks.map((task) => { + if (task.status !== "pending") { + return { taskId: task.id, status: "not_pending", ready: false, waitingFor: [], blockedBy: [], reasons: [`task is ${task.status}`] }; + } + const blockedBy = failedBlockersByTask.get(task.id) ?? []; + const waitingFor = task.dependencies.filter((dependency) => !completedOrRepaired.has(dependency) && !failed.has(dependency) && (failedBlockersByTask.get(dependency) ?? []).length === 0); + if (blockedBy.length > 0) { + return { taskId: task.id, status: "blocked", ready: false, waitingFor: [], blockedBy, reasons: blockedBy.map((dependency) => describeFailedBlocker(task, dependency, tasksById, failed)) }; + } + if (waitingFor.length > 0) return { taskId: task.id, status: "waiting", ready: false, waitingFor, blockedBy, reasons: waitingFor.map((dependency) => `waiting for dependency ${dependency}`) }; + return { taskId: task.id, status: "ready", ready: true, waitingFor, blockedBy, reasons: [task.dependencies.length === 0 ? "no dependencies" : "all dependencies completed"] }; + }); +} + +export function projectTaskGraphStatus(graph: TaskGraph): TaskGraphStatusProjection { + const readiness = computeTaskReadiness(graph); + const readinessById = new Map(readiness.map((item) => [item.taskId, item])); + return { + total: graph.tasks.length, + ready: readiness.filter((item) => item.ready).map((item) => item.taskId), + blocked: readiness.filter((item) => item.status === "blocked").map((item) => item.taskId), + waiting: readiness.filter((item) => item.status === "waiting").map((item) => item.taskId), + completed: graph.tasks.filter((task) => task.status === "completed").map((task) => task.id), + failed: graph.tasks.filter((task) => task.status === "failed" && readinessById.get(task.id)?.status !== "blocked").map((task) => task.id), + readiness, + }; +} + +export function topologicalTaskIds(graph: TaskGraph): string[] { + assertValidGraph(graph); + const remaining = new Map(graph.tasks.map((task) => [task.id, new Set(task.dependencies)])); + const ordered: string[] = []; + while (remaining.size > 0) { + const next = Array.from(remaining.entries()).filter(([, dependencies]) => dependencies.size === 0).map(([taskId]) => taskId).sort(); + if (next.length === 0) throw new Error("invalid task graph: dependency_cycle"); + for (const taskId of next) { + ordered.push(taskId); + remaining.delete(taskId); + for (const dependencies of Array.from(remaining.values())) dependencies.delete(taskId); + } + } + return ordered; +} + +export function createTaskGraphRuntimeEvents(graph: TaskGraph): TaskGraphRuntimeEvent[] { + return [ + { type: "task_graph.created", taskIds: graph.tasks.map((task) => task.id), topologicalOrder: topologicalTaskIds(graph) }, + ...computeTaskReadiness(graph).filter((item) => item.ready).map((item) => ({ type: "task.ready" as const, taskId: item.taskId, reason: item.reasons.join("; ") })), + ]; } export function transitionReadyTasks(graph: TaskGraph): TaskGraph { @@ -159,9 +244,7 @@ export function transitionReadyTasks(graph: TaskGraph): TaskGraph { } export function computeBlockedTaskIds(graph: TaskGraph): string[] { - const completedOrRepaired = getCompletedOrRepairedTaskIds(graph); - const failed = new Set(graph.tasks.filter((task) => task.status === "failed" && !completedOrRepaired.has(task.id)).map((task) => task.id)); - return graph.tasks.filter((task) => task.status === "pending" && task.dependencies.some((dependency) => failed.has(dependency))).map((task) => task.id); + return computeTaskReadiness(graph).filter((item) => item.status === "blocked").map((item) => item.taskId); } export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token: string; now: string; leaseExpiresAt: string }): TaskGraph { @@ -211,6 +294,45 @@ function assertValidGraph(graph: TaskGraph): void { if (issues.length > 0) throw new Error(`invalid task graph: ${issues.map((issue) => issue.code).join(", ")}`); } +function computeFailedBlockersByTask(graph: TaskGraph, failed: Set): Map { + const tasksById = new Map(graph.tasks.map((task) => [task.id, task])); + const memo = new Map(); + const collect = (taskId: string): string[] => { + const cached = memo.get(taskId); + if (cached) return cached; + const task = tasksById.get(taskId); + if (!task) return []; + const blockers = new Set(); + for (const dependency of task.dependencies) { + if (failed.has(dependency)) blockers.add(dependency); + for (const upstream of collect(dependency)) blockers.add(upstream); + } + const result = Array.from(blockers).sort(); + memo.set(taskId, result); + return result; + }; + for (const task of graph.tasks) collect(task.id); + return memo; +} + +function describeFailedBlocker(task: TaskNode, failedTaskId: string, tasksById: Map, failed: Set): string { + if (task.dependencies.includes(failedTaskId) && failed.has(failedTaskId)) return `dependency ${failedTaskId} failed`; + const via = task.dependencies.find((dependency) => dependsOnFailedTask(dependency, failedTaskId, tasksById)); + return via ? `blocked by failed upstream dependency ${failedTaskId} via ${via}` : `blocked by failed upstream dependency ${failedTaskId}`; +} + +function dependsOnFailedTask(taskId: string, failedTaskId: string, tasksById: Map): boolean { + const task = tasksById.get(taskId); + if (!task) return false; + if (task.dependencies.includes(failedTaskId)) return true; + return task.dependencies.some((dependency) => dependsOnFailedTask(dependency, failedTaskId, tasksById)); +} + +function getFailedBlockingTaskIds(graph: TaskGraph): Set { + const completedOrRepaired = getCompletedOrRepairedTaskIds(graph); + return new Set(graph.tasks.filter((task) => task.status === "failed" && !completedOrRepaired.has(task.id)).map((task) => task.id)); +} + function getCompletedOrRepairedTaskIds(graph: TaskGraph): Set { const completedOrRepaired = new Set(graph.tasks.filter((task) => task.status === "completed").map((task) => task.id)); for (const task of graph.tasks) { diff --git a/test/task-graph.test.ts b/test/task-graph.test.ts index b605e3a..db90277 100644 --- a/test/task-graph.test.ts +++ b/test/task-graph.test.ts @@ -5,9 +5,13 @@ import { completeTask, computeBlockedTaskIds, computeReadyTaskIds, + computeTaskReadiness, + createTaskGraphRuntimeEvents, getStaleWorkerIds, markWorkerStale, markWorkerTerminal, + projectTaskGraphStatus, + topologicalTaskIds, transitionReadyTasks, validateTaskGraph, } from "../src/domain/task-graph.js"; @@ -24,6 +28,43 @@ test("task graph readiness waits for completed dependencies", () => { assert.deepEqual(ready, ["build"]); }); +test("task graph exposes topological order, readiness reasons, projection, and events", () => { + const graph = { + tasks: [ + { id: "prepare", title: "Prepare", description: "Prepare inputs", status: "completed" as const, dependencies: [] }, + { id: "branch-a", title: "A", description: "Parallel A", status: "pending" as const, dependencies: ["prepare"] }, + { id: "branch-b", title: "B", description: "Parallel B", status: "pending" as const, dependencies: ["prepare"] }, + { id: "join", title: "Join", description: "Join branches", status: "pending" as const, dependencies: ["branch-a", "branch-b"] }, + { id: "seal", title: "Seal", description: "Seal run", status: "pending" as const, dependencies: ["join"] }, + ], + }; + + assert.deepEqual(topologicalTaskIds(graph), ["prepare", "branch-a", "branch-b", "join", "seal"]); + assert.deepEqual(computeReadyTaskIds(graph), ["branch-a", "branch-b"]); + assert.deepEqual(computeTaskReadiness(graph).find((item) => item.taskId === "join"), { + taskId: "join", + status: "waiting", + ready: false, + waitingFor: ["branch-a", "branch-b"], + blockedBy: [], + reasons: ["waiting for dependency branch-a", "waiting for dependency branch-b"], + }); + assert.deepEqual(projectTaskGraphStatus(graph), { + total: 5, + ready: ["branch-a", "branch-b"], + blocked: [], + waiting: ["join", "seal"], + completed: ["prepare"], + failed: [], + readiness: computeTaskReadiness(graph), + }); + assert.deepEqual(createTaskGraphRuntimeEvents(graph), [ + { type: "task_graph.created", taskIds: ["prepare", "branch-a", "branch-b", "join", "seal"], topologicalOrder: ["prepare", "branch-a", "branch-b", "join", "seal"] }, + { type: "task.ready", taskId: "branch-a", reason: "all dependencies completed" }, + { type: "task.ready", taskId: "branch-b", reason: "all dependencies completed" }, + ]); +}); + test("claim manager rejects duplicate active ownership", () => { const graph = { tasks: [ @@ -86,13 +127,22 @@ test("failed dependencies block downstream tasks unless a repair supersedes them { id: "build", status: "failed" as const, dependencies: [] }, { id: "repair-build", status: "completed" as const, dependencies: [], supersedes: ["build"] }, { id: "verify", status: "pending" as const, dependencies: ["build"] }, - { id: "publish", status: "pending" as const, dependencies: ["missing-fix"] }, { id: "missing-fix", status: "failed" as const, dependencies: [] }, + { id: "publish", status: "pending" as const, dependencies: ["missing-fix"] }, + { id: "announce", status: "pending" as const, dependencies: ["publish"] }, ], }; assert.deepEqual(computeReadyTaskIds(graph), ["verify"]); - assert.deepEqual(computeBlockedTaskIds(graph), ["publish"]); + assert.deepEqual(computeBlockedTaskIds(graph), ["publish", "announce"]); + assert.deepEqual(computeTaskReadiness(graph).find((item) => item.taskId === "announce"), { + taskId: "announce", + status: "blocked", + ready: false, + waitingFor: [], + blockedBy: ["missing-fix"], + reasons: ["blocked by failed upstream dependency missing-fix via publish"], + }); }); test("worker heartbeat stale detection and terminal retention are modeled", () => {