Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio

### Graph Execution Runtime Boundary

The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, worker dispatch state, heartbeat/staleness transitions, structured worker reports, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.

Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.

Agent execution stays adapter-neutral. The `AgentAdapterContract` describes required launch/send/capture/health/readiness/report/substrate behavior for Pi, Codex, and Claude Code compatible workers without coupling the domain layer to any one CLI. Readiness requires a nonce-equivalent proof, worker reports require `taskId`, `status`, `evidenceRefs`, and `summary`, and health can be supported or best-effort but not absent.

The runtime state schema is separately versioned as `RuntimeState.schemaVersion: 1`. It defines additive boundaries for RunObjective, PolicySelection, TaskGraph refs, WorkerState, EvidenceRef, EvaluationResult, RewardRecord, and IntegrationCandidate data; unknown newer versions fail closed, and RunContract-facing artifact refs expose only objective, policy-selection, and evaluation artifacts.

The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `Verifier` validates evidence refs, and `GateEngine` decides transitions.
The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `WorkerExecutionState` records dispatch/report progress for claimed tasks, `Verifier` validates evidence refs, and `GateEngine` decides transitions.

Runtime events are append-only `schemaVersion: 1` envelopes with monotonic `seq`, stable `eventId`, `idempotencyKey`, `type`, `category`, timestamp, run id, and a typed payload. The event taxonomy covers run lifecycle, objective/policy decisions, graph readiness, claim/lease transitions, worker readiness/heartbeat/retention/safe-close, evidence, evaluation, repair, integration, and reward records. Replay applies events by ascending sequence into derived state; duplicate event ids must be byte-equivalent, stale leases and missing workers recover only through explicit events, and `run.sealed` is terminal.

Expand Down
8 changes: 4 additions & 4 deletions src/domain/runtime-state.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
export const RUNTIME_STATE_SCHEMA_VERSION = 1;

export type RuntimeStatus = "draft" | "active" | "blocked" | "failed" | "sealed";
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry";
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "in_progress" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry" | "cleanup-released" | "closed";
export type RuntimeArtifactKind = "run-objective" | "policy-selection" | "task-graph" | "worker-state" | "evidence" | "evaluation" | "reward" | "integration-candidate" | "final-report";
export type EvaluationVerdict = "pass" | "fail" | "repair" | "blocked";
export type IntegrationCandidateStatus = "pending" | "accepted" | "rejected" | "repair_required";
Expand Down Expand Up @@ -35,8 +35,8 @@ export interface RuntimeState {
export type RuntimeStateParseResult = { ok: true; state: RuntimeState } | { ok: false; reason: "malformed" | "unsupported-newer-schema"; issues: string[] };

const runtimeStatuses = new Set<RuntimeStatus>(["draft", "active", "blocked", "failed", "sealed"]);
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "verifying", "completed", "blocked", "failed", "repair_required"]);
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry"]);
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "in_progress", "verifying", "completed", "blocked", "failed", "repair_required"]);
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry", "cleanup-released", "closed"]);
const artifactKinds = new Set<RuntimeArtifactKind>(["run-objective", "policy-selection", "task-graph", "worker-state", "evidence", "evaluation", "reward", "integration-candidate", "final-report"]);
const evaluationVerdicts = new Set<EvaluationVerdict>(["pass", "fail", "repair", "blocked"]);
const integrationStatuses = new Set<IntegrationCandidateStatus>(["pending", "accepted", "rejected", "repair_required"]);
Expand Down
135 changes: 135 additions & 0 deletions src/domain/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,34 @@ export interface WorkerRuntimeRef {
id: string;
state: WorkerRuntimeState;
lastHeartbeatAt?: string;
readinessNonce?: string;
}

export type WorkerTaskReportStatus = "completed" | "failed" | "repair_required" | "blocked";

export interface WorkerTaskDispatch {
taskId: string;
workerId: string;
claimToken: string;
dispatchedAt: string;
readinessNonce?: string;
}

export interface WorkerTaskReport {
taskId: string;
workerId: string;
claimToken: string;
status: WorkerTaskReportStatus;
summary: string;
evidenceRefs: string[];
reportedAt: string;
}

export interface WorkerExecutionState {
graph: TaskGraph;
workers: WorkerRuntimeRef[];
dispatches: WorkerTaskDispatch[];
reports: WorkerTaskReport[];
}

export function validateTaskGraph(graph: TaskGraph): TaskGraphValidationIssue[] {
Expand Down Expand Up @@ -362,6 +390,70 @@ export function getStaleWorkerIds(workers: WorkerRuntimeRef[], options: { now: s
.map((worker) => worker.id);
}

export function recordWorkerReady(workers: WorkerRuntimeRef[], input: { workerId: string; now: string; readinessNonce?: string }): WorkerRuntimeRef[] {
requireText(input.workerId, "workerId");
if (input.readinessNonce !== undefined) requireText(input.readinessNonce, "readinessNonce");
parseTimestamp(input.now);
return upsertWorker(workers, {
id: input.workerId,
state: "ready",
lastHeartbeatAt: input.now,
...(input.readinessNonce ? { readinessNonce: input.readinessNonce } : {}),
});
}

export function recordWorkerHeartbeat(workers: WorkerRuntimeRef[], input: { workerId: string; now: string }): WorkerRuntimeRef[] {
parseTimestamp(input.now);
return updateWorker(workers, input.workerId, (worker) => ({ ...worker, lastHeartbeatAt: input.now }));
}

export function markMissingHeartbeats(workers: WorkerRuntimeRef[], options: { now: string; heartbeatTimeoutSeconds: number }): WorkerRuntimeRef[] {
const stale = new Set(getStaleWorkerIds(workers, options));
return workers.map((worker) => (stale.has(worker.id) ? { ...worker, state: "unhealthy" } : worker));
}

export function dispatchClaimedTask(state: WorkerExecutionState, request: { taskId: string; workerId: string; claimToken: string; now: string; readinessNonce?: string }): WorkerExecutionState {
const now = parseTimestamp(request.now);
const worker = findWorker(state.workers, request.workerId);
if (worker.state !== "ready") throw new Error(`worker ${worker.id} is not ready`);
if (worker.readinessNonce && request.readinessNonce !== worker.readinessNonce) throw new Error(`worker ${worker.id} readiness nonce mismatch`);
const task = findTask(state.graph, request.taskId);
if (task.status !== "claimed") throw new Error(`task ${task.id} is not claimed`);
const claim = requireActiveClaim(task, request.claimToken, now);
if (claim.workerId !== request.workerId) throw new Error(`task ${task.id} is claimed by ${claim.workerId}`);
if (state.dispatches.some((dispatch) => dispatch.taskId === task.id && dispatch.claimToken === request.claimToken)) throw new Error(`task ${task.id} claim is already dispatched`);
const graph = updateTask(state.graph, task.id, (current) => ({ ...current, status: "in_progress" }));
return {
...state,
graph,
workers: updateWorker(state.workers, worker.id, (current) => ({ ...current, state: "busy", lastHeartbeatAt: request.now })),
dispatches: [...state.dispatches, { taskId: task.id, workerId: worker.id, claimToken: request.claimToken, dispatchedAt: request.now, ...(request.readinessNonce ? { readinessNonce: request.readinessNonce } : {}) }],
};
}

export function captureWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): WorkerExecutionState {
validateWorkerReport(state, report);
const nextStatus = report.status === "completed" ? "verifying" : report.status;
return {
...state,
graph: updateTask(state.graph, report.taskId, (task) => ({ ...task, status: nextStatus, evidenceRefs: [...report.evidenceRefs] })),
workers: updateWorker(state.workers, report.workerId, (worker) => ({ ...worker, state: report.status === "completed" ? "completed-retained" : "unhealthy", lastHeartbeatAt: report.reportedAt })),
reports: [...state.reports, { ...report, evidenceRefs: [...report.evidenceRefs] }],
};
}

export function completeTaskFromWorkerReport(state: WorkerExecutionState, request: { taskId: string; workerId: string; now: string; releaseWorker?: boolean }): WorkerExecutionState {
const report = [...state.reports].reverse().find((item) => item.taskId === request.taskId && item.workerId === request.workerId);
if (!report) throw new Error(`task ${request.taskId} has no worker report`);
if (report.status !== "completed") throw new Error(`task ${request.taskId} report is not completed`);
if (report.evidenceRefs.length === 0) throw new Error(`task ${request.taskId} report requires evidence refs`);
return {
...state,
graph: completeTask(state.graph, { taskId: request.taskId, claimToken: report.claimToken, evidenceRefs: report.evidenceRefs, now: request.now }),
workers: updateWorker(state.workers, request.workerId, (worker) => markWorkerTerminal(worker, { retained: !request.releaseWorker })),
};
}

export function markWorkerStale(worker: WorkerRuntimeRef): WorkerRuntimeRef {
return { ...worker, state: "stale-registry" };
}
Expand All @@ -375,6 +467,45 @@ function assertValidGraph(graph: TaskGraph): void {
if (issues.length > 0) throw new Error(`invalid task graph: ${issues.map((issue) => issue.code).join(", ")}`);
}

function validateWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): void {
const reportedAt = parseTimestamp(report.reportedAt);
if (!report.summary.trim()) throw new Error(`task ${report.taskId} report requires summary`);
if (report.status === "completed" && report.evidenceRefs.length === 0) throw new Error(`task ${report.taskId} report requires evidence refs`);
const task = findTask(state.graph, report.taskId);
if (task.status !== "in_progress") throw new Error(`task ${report.taskId} is not reportable`);
const claim = requireActiveClaim(task, report.claimToken, reportedAt);
if (claim.workerId !== report.workerId) throw new Error(`task ${report.taskId} is claimed by ${claim.workerId}`);
const dispatch = state.dispatches.find((item) => item.taskId === report.taskId && item.workerId === report.workerId && item.claimToken === report.claimToken);
if (!dispatch) throw new Error(`task ${report.taskId} was not dispatched to worker ${report.workerId}`);
}

function findTask(graph: TaskGraph, taskId: string): TaskNode {
const task = graph.tasks.find((item) => item.id === taskId);
if (!task) throw new Error(`unknown task ${taskId}`);
return task;
}

function findWorker(workers: WorkerRuntimeRef[], workerId: string): WorkerRuntimeRef {
const worker = workers.find((item) => item.id === workerId);
if (!worker) throw new Error(`unknown worker ${workerId}`);
return worker;
}

function upsertWorker(workers: WorkerRuntimeRef[], worker: WorkerRuntimeRef): WorkerRuntimeRef[] {
return workers.some((item) => item.id === worker.id) ? updateWorker(workers, worker.id, () => worker) : [...workers, worker];
}

function updateWorker(workers: WorkerRuntimeRef[], workerId: string, update: (worker: WorkerRuntimeRef) => WorkerRuntimeRef): WorkerRuntimeRef[] {
let found = false;
const next = workers.map((worker) => {
if (worker.id !== workerId) return worker;
found = true;
return update(worker);
});
if (!found) throw new Error(`unknown worker ${workerId}`);
return next;
}

function computeFailedBlockersByTask(graph: TaskGraph, failed: Set<string>): Map<string, string[]> {
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
const memo = new Map<string, string[]>();
Expand Down Expand Up @@ -465,6 +596,10 @@ function findCycleTaskId(graph: TaskGraph): string | undefined {
return undefined;
}

function requireText(value: string, label: string): void {
if (!value.trim()) throw new Error(`${label} is required`);
}

function parseTimestamp(value: string): number {
const parsed = Date.parse(value);
if (!Number.isFinite(parsed)) throw new Error(`invalid timestamp: ${value}`);
Expand Down
4 changes: 2 additions & 2 deletions test/runtime-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ test("run contract runtime refs expose objective, policy, and evaluation artifac
});

test("runtime state can represent successful and repair-required examples", () => {
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }] };
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }, { id: "parallel", title: "Parallel slice", status: "in_progress", dependsOn: [], evidenceRefs: [], evaluationRefs: [] }], workers: [{ id: "w1", adapterId: "local", status: "closed", readinessNonce: "nonce", lastHeartbeatAt: now }] };
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], workers: [{ id: "w2", adapterId: "local", status: "cleanup-released" }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
assert.equal(parseRuntimeState(success).ok, true);
assert.equal(parseRuntimeState(repair).ok, true);
});
Loading
Loading