-
-
Notifications
You must be signed in to change notification settings - Fork 34
Workflow Engine
DAG (Directed Acyclic Graph) workflow engine for CortexPrism, enabling complex multi-step automation pipelines with branching, parallel execution, goto control flow, and approval wait nodes.
Single file in src/workflow/:
| File | Purpose |
|---|---|
engine.ts |
Workflow class, WorkflowContext, run records, built-in workflows |
type WorkflowNode =
| { kind: 'step'; step: WorkflowStep } // Single async step
| { kind: 'branch'; branch: WorkflowBranch } // Conditional branch
| { kind: 'parallel'; parallel: WorkflowParallel } // Parallel execution
| { kind: 'goto'; target: string } // Jump to labeled step
| { kind: 'wait' }; // Pause for external approvalinterface WorkflowStep {
name: string;
fn: (ctx: WorkflowContext) => Promise<void>;
}interface WorkflowBranch {
condition: (ctx: WorkflowContext) => boolean;
yes: Workflow; // Sub-workflow when true
no: Workflow; // Sub-workflow when false
}interface WorkflowParallel {
name: string;
steps: WorkflowStep[];
}new Workflow(name: string, description?: string)| Method | Description |
|---|---|
.step(name, fn) |
Add an execution step |
.branch(condition, { then, else }) |
Add conditional branching with sub-workflows |
.parallel(name, steps) |
Add parallel execution group |
.goto(target) |
Jump to a labeled step |
.waitForApproval() |
Pause until approve() is called |
async execute(
ctx?: WorkflowContext,
onStepStart?: (step: string) => void,
onStepEnd?: (step: string, ok: boolean, dur: number) => void
): Promise<WorkflowResult>interface WorkflowResult {
name: string;
success: boolean;
error?: string;
durationMs: number;
stepsCompleted: number;
stepsTotal: number;
context: Record<string, unknown>;
}| Node | Behavior |
|---|---|
step |
Execute function, catch and surface errors |
branch |
Evaluate condition, execute yes or no sub-workflow. If sub-workflow fails, parent fails |
parallel |
Execute all steps via Promise.allSettled, fail on first rejection |
goto |
Jump to step by name label. Labels are auto-assigned at execution start |
wait |
Pause execution. Call workflow.approve() to resume. Returns partial result with error: "wait_for_approval"
|
A key-value store passed through all steps in a workflow:
class WorkflowContext {
get<T>(key: string): T | undefined;
set(key: string, value: unknown): void;
getAll(): Record<string, unknown>;
}Context is shared across all steps within a single execution. Branch sub-workflows receive the same context instance. Parallel steps share context (use with caution for race conditions).
Workflow executions are persisted to dataDir/workflow-runs.json (capped at 200 entries):
interface WorkflowRunRecord {
workflowName: string;
status: 'completed' | 'failed';
success: boolean;
started: string;
durationMs: number;
stepsCompleted: number;
stepsTotal: number;
error?: string;
context: Record<string, unknown>;
}Newest runs appear first. The context snapshot is stored for debugging.
| Function | Description |
|---|---|
registerWorkflow(wf) |
Register a workflow by name |
getWorkflow(name) |
Retrieve by name |
listWorkflows() |
List all registered workflows |
deleteWorkflow(name) |
Remove from registry |
Pre-registered workflow with three steps:
step: check-disk → runs df -h /
step: check-memory → runs free -m
step: summarize → aggregates disk + memory output
import { Workflow, registerWorkflow } from './workflow/engine.ts';
const deployWf = new Workflow('deploy', 'Deploy application');
deployWf
.step('build', async (ctx) => {
const cmd = new Deno.Command('npm', { args: ['run', 'build'] });
const out = await cmd.output();
ctx.set('buildSuccess', out.code === 0);
})
.branch(
(ctx) => ctx.get<boolean>('buildSuccess') === true,
{
then: new Workflow('deploy-branch')
.step('deploy', async (ctx) => {
// deploy logic
ctx.set('deployed', true);
}),
else: new Workflow('fail-branch')
.step('notify', async (ctx) => {
ctx.set('status', 'build_failed');
}),
},
)
.waitForApproval()
.step('verify', async (ctx) => {
// post-deploy checks
});
registerWorkflow(deployWf);
const result = await deployWf.execute();
if (result.error === 'wait_for_approval') {
// prompt user, then:
deployWf.approve();
// re-execute with context from partial result
}cortex workflow list # List registered workflows
cortex workflow run <name> # Execute a workflow
cortex workflow runs # Show execution history
cortex workflow approve <name> # Approve a waiting workflow| Method | Path | Description |
|---|---|---|
GET |
/api/workflows |
List registered workflows |
POST |
/api/workflows/:name/run |
Execute a workflow |
GET |
/api/workflows/:name/runs |
Get run history |
POST |
/api/workflows/:name/approve |
Approve waiting workflow |
- Triggers — Event-driven triggers that can invoke workflows
- Eval-System — Evaluation suites as specialized workflows
- Update-System — Update flow follows a similar step-based pattern
CortexPrism — Open-source agentic AI harness · MIT License · Built with Deno 2.x + TypeScript
- Agent Loop
- Metacognition
- Memory System
- Skills System
- Sub-Agents
- Built-in Tools
- Code Intelligence
- Code Sandbox
- Cross-Agent Context Protocol
- Prompt Lab
- PKM Assistant
- Voice Pipeline
- Computer Use
- Browser Tool
- Git & GitHub
- Scheduler & Jobs
- Dashboard
- Observability
- A2A Protocol
- MCP Gateway
- Distributed Nodes
- Memori Checkpoints
- Eval System
- Workflow Engine
- Triggers
- Projects
- TUI
- Glossary
- Update System