# DAG Workflow Execution

Execute multi-step workflows with dependencies using the DAG (Directed Acyclic Graph) executor.

**Features:**
- Parallel execution of independent tasks
- Dependency management between tasks
- State management and checkpoints
- Event streaming for real-time progress

## Setup

In [None]:
import {
  DenoSandboxExecutor,
  ParallelExecutor,
  createInitialState,
  type DAGExecutionResult,
  type ToolExecutor
} from "jsr:@casys/mcp-gateway";

// Create sandbox for code execution
const sandbox = new DenoSandboxExecutor({
  timeout: 5000,
  memoryLimit: 128,
});

console.log("‚úÖ DAG executor ready");

## Define a Simple DAG

Create a workflow with 3 tasks:
```
fetchData ‚Üí processData ‚Üí aggregate
```

In [None]:
const dag = {
  tasks: {
    fetchData: {
      tool: "sandbox",
      params: { code: "return { numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] }" },
      dependencies: []
    },
    processData: {
      tool: "sandbox",
      params: { code: "return context.numbers.map(n => n * n)" },
      dependencies: ["fetchData"]
    },
    aggregate: {
      tool: "sandbox",
      params: { code: "return { sum: context.reduce((a, b) => a + b, 0), count: context.length }" },
      dependencies: ["processData"]
    }
  }
};

console.log("üìä DAG defined:");
console.log("   Tasks:", Object.keys(dag.tasks).join(" ‚Üí "));

## Execute DAG Manually

Step through each task to see how it works:

In [None]:
// Execute fetchData
console.log("\nüì• Step 1: fetchData");
const result1 = await sandbox.execute(dag.tasks.fetchData.params.code);
console.log("   Result:", result1.result);

// Execute processData with context from fetchData
console.log("\n‚öôÔ∏è  Step 2: processData");
const result2 = await sandbox.execute(
  dag.tasks.processData.params.code,
  result1.result // Pass previous result as context
);
console.log("   Result:", result2.result);

// Execute aggregate with context from processData
console.log("\nüìä Step 3: aggregate");
const result3 = await sandbox.execute(
  dag.tasks.aggregate.params.code,
  result2.result
);
console.log("   Result:", result3.result);

console.log("\n‚úÖ Workflow complete!");

## Parallel DAG Execution

Now with tasks that can run in parallel:
```
    ‚îå‚îÄ taskA ‚îÄ‚îê
start‚îÇ         ‚îÇ‚Üí combine
    ‚îî‚îÄ taskB ‚îÄ‚îò
```

In [None]:
const parallelDag = {
  tasks: {
    taskA: {
      tool: "sandbox",
      params: { code: "return { source: 'A', value: Math.random() * 100 }" },
      dependencies: []
    },
    taskB: {
      tool: "sandbox",
      params: { code: "return { source: 'B', value: Math.random() * 100 }" },
      dependencies: []
    },
    combine: {
      tool: "sandbox",
      params: { code: "return { total: context.A.value + context.B.value, sources: [context.A.source, context.B.source] }" },
      dependencies: ["taskA", "taskB"]
    }
  }
};

console.log("üîÄ Parallel DAG:");
console.log("   taskA (no deps)");
console.log("   taskB (no deps)");
console.log("   combine (depends on A and B)");

In [None]:
// Execute A and B in parallel
console.log("\n‚ö° Executing taskA and taskB in parallel...");

const [resultA, resultB] = await Promise.all([
  sandbox.execute(parallelDag.tasks.taskA.params.code),
  sandbox.execute(parallelDag.tasks.taskB.params.code)
]);

console.log("   taskA:", resultA.result);
console.log("   taskB:", resultB.result);

// Combine results
console.log("\nüîó Combining results...");
const combined = await sandbox.execute(
  parallelDag.tasks.combine.params.code,
  { A: resultA.result, B: resultB.result }
);

console.log("   Combined:", combined.result);
console.log("\n‚úÖ Parallel workflow complete!");

## Complex DAG with Multiple Layers

```
       ‚îå‚îÄ extract ‚îÄ‚îê
fetch ‚îÄ‚îº‚îÄ validate ‚îÄ‚îº‚îÄ transform ‚îÄ save
       ‚îî‚îÄ enrich ‚îÄ‚îÄ‚îò
```

In [None]:
const complexDag = {
  tasks: {
    fetch: {
      code: "return { users: [{id: 1, name: 'Alice'}, {id: 2, name: 'Bob'}] }",
      deps: []
    },
    extract: {
      code: "return context.users.map(u => u.name)",
      deps: ["fetch"]
    },
    validate: {
      code: "return { valid: context.users.every(u => u.id && u.name), count: context.users.length }",
      deps: ["fetch"]
    },
    enrich: {
      code: "return context.users.map(u => ({ ...u, createdAt: new Date().toISOString() }))",
      deps: ["fetch"]
    },
    transform: {
      code: "return { names: context.extract, isValid: context.validate.valid, enriched: context.enrich }",
      deps: ["extract", "validate", "enrich"]
    },
    save: {
      code: "return { saved: true, summary: `Saved ${context.names.length} users, valid: ${context.isValid}` }",
      deps: ["transform"]
    }
  }
};

// Execute layer by layer
const results: Record<string, unknown> = {};

console.log("üèóÔ∏è  Executing complex DAG...\n");

// Layer 1: fetch
console.log("Layer 1: fetch");
results.fetch = (await sandbox.execute(complexDag.tasks.fetch.code)).result;
console.log("   ‚úì fetch complete");

// Layer 2: extract, validate, enrich (parallel)
console.log("\nLayer 2: extract, validate, enrich (parallel)");
const [extract, validate, enrich] = await Promise.all([
  sandbox.execute(complexDag.tasks.extract.code, results.fetch),
  sandbox.execute(complexDag.tasks.validate.code, results.fetch),
  sandbox.execute(complexDag.tasks.enrich.code, results.fetch)
]);
results.extract = extract.result;
results.validate = validate.result;
results.enrich = enrich.result;
console.log("   ‚úì extract, validate, enrich complete");

// Layer 3: transform
console.log("\nLayer 3: transform");
results.transform = (await sandbox.execute(
  complexDag.tasks.transform.code,
  { extract: results.extract, validate: results.validate, enrich: results.enrich }
)).result;
console.log("   ‚úì transform complete");

// Layer 4: save
console.log("\nLayer 4: save");
results.save = (await sandbox.execute(
  complexDag.tasks.save.code,
  results.transform
)).result;
console.log("   ‚úì save complete");

console.log("\nüìä Final result:");
console.log(JSON.stringify(results.save, null, 2));

## Summary

**DAG Workflows enable:**
- ‚úÖ Sequential task execution with dependencies
- ‚úÖ Parallel execution of independent tasks
- ‚úÖ Context passing between tasks
- ‚úÖ Complex multi-layer workflows

**Use cases:**
- ETL pipelines
- Data processing workflows
- Multi-step AI agent tasks
- Orchestrated tool execution