# ControlledExecutor: Advanced DAG Execution

The `ControlledExecutor` extends parallel DAG execution with:

- **Event Streaming**: Real-time execution progress
- **Command Queue**: Pause/resume/abort control
- **Checkpoints**: Save/restore execution state
- **AIL/HIL Decision Points**: Agent-in-Loop / Human-in-Loop
- **Episodic Memory**: Capture learning events
- **Speculative Execution**: Try uncertain paths with rollback

## Setup

In [1]:
import { ControlledExecutor } from "../../src/dag/controlled-executor.ts";
import { DenoSandboxExecutor } from "../../src/sandbox/deno-sandbox.ts";
import { EpisodicMemoryStore } from "../../src/memory/store.ts";
import {
  dagToMermaid,
  displayDag,
  displayEvolution,
  displayGraphrag,
  displayTimeline,
  type ExecutionEvent,
  executionTimelineToMermaid,
  graphragEvolutionToMermaid,
  graphragToMermaid,
  layersToMermaid,
  mapExecutorEvent,
  type ToolEdge,
} from "../lib/viz.ts";

// Create sandbox and episodic memory
const sandbox = new DenoSandboxExecutor({ timeout: 5000, memoryLimit: 128 });
const memory = new EpisodicMemoryStore({ dbPath: ":memory:" });

// Create controlled executor
const executor = new ControlledExecutor(sandbox, {
  maxConcurrency: 3,
  timeout: 10000,
});

executor.setEpisodicMemoryStore(memory);

console.log("‚úÖ ControlledExecutor ready (with display functions)");

TypeError: Module not found "file:///home/ubuntu/CascadeProjects/AgentCards/src/sandbox/deno-sandbox.ts".

## Example 1: Event Streaming

Watch DAG execution in real-time:

In [None]:
const dag = {
  id: "stream-example",
  tasks: [
    {
      id: "taskA",
      tool_name: "compute",
      tool: "sandbox",
      params: { code: "return { value: Math.random() * 100 }" },
      dependencies: [],
    },
    {
      id: "taskB",
      tool_name: "compute",
      tool: "sandbox",
      params: { code: "return { value: Math.random() * 100 }" },
      dependencies: [],
    },
    {
      id: "taskC",
      tool_name: "sum",
      tool: "sandbox",
      params: { code: "return { sum: context.taskA.value + context.taskB.value }" },
      dependencies: ["taskA", "taskB"],
    },
  ],
};

console.log("üìä DAG Structure:\n");

// Display visual diagram
await displayDag(dag);

In [None]:
console.log("‚ö° Executing with event stream:\n");

const events: ExecutionEvent[] = [];

for await (const event of executor.executeStream(dag, "wf-1")) {
  const execEvent = mapExecutorEvent(event);
  if (execEvent) events.push(execEvent);

  console.log(`[${event.type}]`, event.task_id || `layer ${event.layer}` || "");
}

console.log("\n‚úÖ Execution complete!");

console.log("üïê Execution Timeline:"); console.log("üí° taskA and taskB execute in parallel (Layer
0), taskC waits (Layer 1)\n");

// Display visual timeline await displayTimeline(events);

In [None]:
console.log("üïê Execution Timeline:\n");
console.log(executionTimelineToMermaid(events));
console.log("\nüí° taskA and taskB execute in parallel (Layer 0), taskC waits (Layer 1)");

## Example 2: AIL Decision Point

Agent decides whether to continue execution:

In [None]:
const dagWithDecision = {
  id: "ail-example",
  tasks: [
    {
      id: "validate",
      tool_name: "check",
      tool: "sandbox",
      params: { code: "return { valid: Math.random() > 0.3, confidence: 0.7 }" },
      dependencies: [],
    },
    {
      id: "decide",
      tool_name: "ail_decision",
      tool: "ail_checkpoint",
      params: {
        checkpoint_id: "decision-1",
        decision_fn: (context: any) => {
          // Agent logic: continue if valid and high confidence
          const shouldContinue = context.validate.valid && context.validate.confidence > 0.6;
          return {
            outcome: shouldContinue ? "continue" : "abort",
            reasoning:
              `Validation: ${context.validate.valid}, Confidence: ${context.validate.confidence}`,
          };
        },
      },
      dependencies: ["validate"],
    },
    {
      id: "process",
      tool_name: "compute",
      tool: "sandbox",
      params: { code: "return { result: 'processed' }" },
      dependencies: ["decide"],
    },
  ],
};

console.log("ü§ñ Executing with AIL decision point:\n");

try {
  for await (const event of executor.executeStream(dagWithDecision, "wf-2")) {
    if (event.type === "ail_decision") {
      console.log(`üîç AIL Decision: ${event.outcome}`);
      console.log(`   Reasoning: ${event.reasoning}`);
    } else {
      console.log(`[${event.type}]`, event.task_id || "");
    }
  }
  console.log("\n‚úÖ Workflow completed");
} catch (error) {
  console.log(`\n‚ö†Ô∏è  Workflow aborted: ${error.message}`);
}

## Example 3: Episodic Memory Capture

Every execution is captured for learning:

In [None]:
// Execute a simple DAG
const learningDag = {
  id: "learning-example",
  tasks: [
    { id: "read_data", tool_name: "read", tool: "sandbox", params: { code: "return { data: [1, 2, 3] }" }, dependencies: [] },
    { id: "transform", tool_name: "map", tool: "sandbox", params: { code: "return context.data.map(x => x * 2)" }, dependencies: ["read_data"] }
  ]
};

console.log("üìù Executing workflow with memory capture...\n");

for await (const event of executor.executeStream(learningDag, "wf-3")) {
  console.log(`[${event.type}]`, event.task_id || "");
}

console.log("\nüß† Querying episodic memory:\n");

// Query memory
const recentEvents = await memory.query({
  limit: 10,
  eventTypes: ["task_complete"]
});

console.log(`Found ${recentEvents.length} task completion events:");
for (const event of recentEvents) {
  console.log(`  ‚Ä¢ ${event.metadata.taskId}: ${event.metadata.status}`);
}

// Simulate GraphRAG edges before execution const edgesBefore: ToolEdge[] = [ { source: "read",
target: "map", weight: 0.3, relationship: "co-used" }, { source: "map", target: "filter", weight:
0.2, relationship: "co-used" } ];

console.log("üìä GraphRAG Before Execution:\n");

// Display visual diagram await displayGraphrag(edgesBefore);

In [None]:
// Execute workflow (read ‚Üí map)
for await (const event of executor.executeStream(learningDag, "wf-4")) {
  // Silent execution
}

// Simulate updated edges (weight increased due to co-usage)
const edgesAfter: ToolEdge[] = [
  { source: "read", target: "map", weight: 0.5, relationship: "co-used" }, // Increased!
  { source: "map", target: "filter", weight: 0.2, relationship: "co-used" },
];

console.log("üìä GraphRAG After Execution (Learning Applied):");
console.log("üí° read‚Üímap edge strengthened from 0.3 to 0.5!\n");

// Display visual evolution comparison
await displayEvolution(edgesBefore, edgesAfter);

In [None]:
// Simulate GraphRAG edges before execution
const edgesBefore: ToolEdge[] = [
  { source: "read", target: "map", weight: 0.3, relationship: "co-used" },
  { source: "map", target: "filter", weight: 0.2, relationship: "co-used" },
];

console.log("üìä GraphRAG Before Execution:\n");
console.log(graphragToMermaid(edgesBefore));

// Execute workflow (read ‚Üí map)
for await (const event of executor.executeStream(learningDag, "wf-4")) {
  // Silent execution
}

// Simulate updated edges (weight increased due to co-usage)
const edgesAfter: ToolEdge[] = [
  { source: "read", target: "map", weight: 0.5, relationship: "co-used" }, // Increased!
  { source: "map", target: "filter", weight: 0.2, relationship: "co-used" },
];

console.log("\nüìä GraphRAG After Execution (Learning Applied):\n");
console.log(graphragEvolutionToMermaid(edgesBefore, edgesAfter));
console.log("\nüí° read‚Üímap edge strengthened from 0.3 to 0.5!");

## Example 5: Command Queue Control

Pause, resume, and abort execution dynamically:

In [None]:
const longDag = {
  id: "control-example",
  tasks: [
    {
      id: "step1",
      tool_name: "work",
      tool: "sandbox",
      params: { code: "await new Promise(r => setTimeout(r, 1000)); return { done: 1 }" },
      dependencies: [],
    },
    {
      id: "step2",
      tool_name: "work",
      tool: "sandbox",
      params: { code: "await new Promise(r => setTimeout(r, 1000)); return { done: 2 }" },
      dependencies: ["step1"],
    },
    {
      id: "step3",
      tool_name: "work",
      tool: "sandbox",
      params: { code: "await new Promise(r => setTimeout(r, 1000)); return { done: 3 }" },
      dependencies: ["step2"],
    },
  ],
};

console.log("‚èØÔ∏è  Demonstrating command queue control:\n");

// Start execution
const execution = executor.executeStream(longDag, "wf-5");

// Pause after 1.5 seconds
setTimeout(() => {
  console.log("\n‚è∏Ô∏è  PAUSE command sent");
  executor.sendCommand({ type: "pause", workflowId: "wf-5" });
}, 1500);

// Resume after 3 seconds
setTimeout(() => {
  console.log("‚ñ∂Ô∏è  RESUME command sent\n");
  executor.sendCommand({ type: "resume", workflowId: "wf-5" });
}, 3000);

for await (const event of execution) {
  console.log(`[${event.type}]`, event.task_id || "");
}

console.log("\n‚úÖ Execution complete with pause/resume!");

## Summary

**ControlledExecutor adds:**

- ‚úÖ **Event Streaming**: Real-time progress monitoring
- ‚úÖ **Command Queue**: Pause/resume/abort control
- ‚úÖ **AIL/HIL Decisions**: Agent and human decision points
- ‚úÖ **Episodic Memory**: Automatic learning from executions
- ‚úÖ **GraphRAG Evolution**: Tool relationships strengthen over time
- ‚úÖ **Checkpoints**: Save/restore execution state
- ‚úÖ **Speculative Execution**: Try uncertain paths with rollback

**Perfect for:**

- Long-running workflows
- Interactive agent systems
- Learning from execution patterns
- Human-in-the-loop AI systems