# Exploring the Execution Graph Builder

This notebook explores the new execution graph framework for async dataset generation.
The graph uses a **hybrid representation** where column structure is stored explicitly
while cell-level nodes are computed on-demand to handle millions of records efficiently.

## Key Concepts

- **CellNodeId**: Identifies a single cell (row, column) in the dataset
- **BarrierNodeId**: Synchronization point where all inputs must complete before outputs begin
- **ExecutionTraits**: Flags describing generator behavior (START, CELL_BY_CELL, ROW_STREAMABLE, BARRIER)
- **ColumnDescriptor**: Metadata about a column including its traits and dependencies
- **ExecutionGraph**: The main graph structure with virtual node iteration
- **CompletionTracker**: Memory-efficient tracking of completed nodes


In [1]:
from data_designer.engine.execution_graph import (
    BarrierNodeId,
    CellNodeId,
    ColumnDescriptor,
    CompletionTracker,
    ExecutionGraph,
    ExecutionTraits,
    GraphBuilder,
    NodeId,
)

## 1. Node Identification

Nodes are identified using frozen dataclasses with `slots=True` for memory efficiency.


In [2]:
# CellNodeId identifies a single cell (row, column)
cell = CellNodeId(row=5, column="question")
print(f"Cell node: {cell}")
print(f"  Row: {cell.row}, Column: {cell.column}")

# BarrierNodeId identifies a synchronization point for a column
barrier = BarrierNodeId(column="validation")
print(f"\nBarrier node: {barrier}")
print(f"  Column: {barrier.column}")

Cell node: Cell(5, 'question')
  Row: 5, Column: question

Barrier node: Barrier('validation')
  Column: validation


In [3]:
# Nodes are hashable and can be used in sets/dicts
completed_nodes: set[NodeId] = {
    CellNodeId(0, "category"),
    CellNodeId(1, "category"),
    BarrierNodeId("validation"),
}

print(f"Completed nodes: {completed_nodes}")
print(f"\nCellNodeId(0, 'category') in set: {CellNodeId(0, 'category') in completed_nodes}")
print(f"CellNodeId(2, 'category') in set: {CellNodeId(2, 'category') in completed_nodes}")

Completed nodes: {Barrier('validation'), Cell(0, 'category'), Cell(1, 'category')}

CellNodeId(0, 'category') in set: True
CellNodeId(2, 'category') in set: False


## 2. Execution Traits

Traits describe the execution characteristics of column generators. They're inferred from generator properties (not class names) to support plugin generators.


In [4]:
# ExecutionTraits is a Flag enum - traits can be combined
print("Available traits:")
for trait in ExecutionTraits:
    if trait != ExecutionTraits.NONE:
        print(f"  - {trait.name}: {trait.value}")

Available traits:
  - START: 1
  - CELL_BY_CELL: 2
  - ROW_STREAMABLE: 4
  - BARRIER: 8


In [5]:
# Traits can be combined using bitwise OR
sampler_traits = ExecutionTraits.START | ExecutionTraits.CELL_BY_CELL | ExecutionTraits.ROW_STREAMABLE
print(f"Sampler traits: {sampler_traits}")
print(f"  Has START: {bool(sampler_traits & ExecutionTraits.START)}")
print(f"  Has BARRIER: {bool(sampler_traits & ExecutionTraits.BARRIER)}")

# Validator/barrier column traits
barrier_traits = ExecutionTraits.BARRIER
print(f"\nBarrier traits: {barrier_traits}")
print(f"  Has BARRIER: {bool(barrier_traits & ExecutionTraits.BARRIER)}")

Sampler traits: ExecutionTraits.START|CELL_BY_CELL|ROW_STREAMABLE
  Has START: True
  Has BARRIER: False

Barrier traits: ExecutionTraits.BARRIER
  Has BARRIER: True


## 3. Column Descriptors

ColumnDescriptor stores metadata about a column including its configuration, generator class, execution traits, and dependencies.


In [6]:
# Import a mock config and generator for demonstration
from data_designer.engine.column_generators.generators.base import ColumnGenerator

# For demonstration, we'll create descriptors manually
# In practice, GraphBuilder creates these from DataDesignerConfig

# A start column (sampler) - can generate data from scratch
category_desc = ColumnDescriptor(
    name="category",
    config=None,  # Would be a real config in practice
    generator_cls=ColumnGenerator,  # Would be a real generator
    traits=ExecutionTraits.START | ExecutionTraits.CELL_BY_CELL | ExecutionTraits.ROW_STREAMABLE,
    dependencies=[],
    side_effects=[],
)

print("Category column:")
print(f"  is_start_column: {category_desc.is_start_column}")
print(f"  is_cell_by_cell: {category_desc.is_cell_by_cell}")
print(f"  is_row_streamable: {category_desc.is_row_streamable}")
print(f"  is_barrier: {category_desc.is_barrier}")

Category column:
  is_start_column: True
  is_cell_by_cell: True
  is_row_streamable: True
  is_barrier: False


In [7]:
# A dependent column (LLM text) - processes cells independently
question_desc = ColumnDescriptor(
    name="question",
    config=None,
    generator_cls=ColumnGenerator,
    traits=ExecutionTraits.CELL_BY_CELL | ExecutionTraits.ROW_STREAMABLE,
    dependencies=["category"],  # Depends on category column
    side_effects=[],
)

print("Question column:")
print(f"  is_start_column: {question_desc.is_start_column}")
print(f"  has_dependencies: {question_desc.has_dependencies}")
print(f"  dependencies: {question_desc.dependencies}")

Question column:
  is_start_column: False
  has_dependencies: True
  dependencies: ['category']


In [8]:
# A barrier column (validator) - needs all inputs before producing any output
validation_desc = ColumnDescriptor(
    name="validation",
    config=None,
    generator_cls=ColumnGenerator,
    traits=ExecutionTraits.BARRIER,
    dependencies=["question"],
    side_effects=[],
)

print("Validation column:")
print(f"  is_barrier: {validation_desc.is_barrier}")
print(f"  is_row_streamable: {validation_desc.is_row_streamable}")

Validation column:
  is_barrier: True
  is_row_streamable: False


## 4. Execution Graph

The ExecutionGraph models cell-level dependencies while maintaining memory efficiency through a hybrid representation:

- **Explicit**: Column structure (ColumnDescriptors) is stored in memory
- **Virtual**: Cell-level nodes are computed on-demand, not stored

This allows handling datasets with millions of records without creating millions of explicit nodes.


In [9]:
# Create an execution graph with our descriptors
# Descriptors must be in topological order (dependencies before dependents)
graph = ExecutionGraph(
    num_records=5,
    column_descriptors=[category_desc, question_desc, validation_desc],
)

print("Graph properties:")
print(f"  num_records: {graph.num_records}")
print(f"  num_columns: {graph.num_columns}")
print(f"  num_nodes: {graph.num_nodes}")
print(f"  start_columns: {graph.start_columns}")
print(f"  column_names: {graph.column_names}")

Graph properties:
  num_records: 5
  num_columns: 3
  num_nodes: 16
  start_columns: ['category']
  column_names: ['category', 'question', 'validation']


In [10]:
# Memory efficiency demonstration
# With 1M records and 10 columns, we'd have ~10M virtual nodes
# But we only store O(C) column metadata, not O(C x R) nodes

large_graph = ExecutionGraph(
    num_records=1_000_000,
    column_descriptors=[category_desc, question_desc, validation_desc],
)

print("Large graph with 1M records:")
print(f"  num_nodes: {large_graph.num_nodes:,}")
print(f"  Memory: O({large_graph.num_columns}) columns, not O({large_graph.num_nodes:,}) nodes")

Large graph with 1M records:
  num_nodes: 3,000,001
  Memory: O(3) columns, not O(3,000,001) nodes


## 5. Node Iteration

The graph provides several iteration methods for execution engines.


In [11]:
# iter_start_nodes() - nodes that can begin immediately (no dependencies)
print("Start nodes (first 10):")
for i, node in enumerate(graph.iter_start_nodes()):
    if i >= 10:
        print("  ...")
        break
    print(f"  {node}")

Start nodes (first 10):
  Cell(0, 'category')
  Cell(1, 'category')
  Cell(2, 'category')
  Cell(3, 'category')
  Cell(4, 'category')


In [12]:
# iter_nodes() - all virtual nodes in topological order
print("All nodes:")
for node in graph.iter_nodes():
    print(f"  {node}")

All nodes:
  Cell(0, 'category')
  Cell(1, 'category')
  Cell(2, 'category')
  Cell(3, 'category')
  Cell(4, 'category')
  Cell(0, 'question')
  Cell(1, 'question')
  Cell(2, 'question')
  Cell(3, 'question')
  Cell(4, 'question')
  Barrier('validation')
  Cell(0, 'validation')
  Cell(1, 'validation')
  Cell(2, 'validation')
  Cell(3, 'validation')
  Cell(4, 'validation')


## 6. Dependency Resolution

Dependencies are resolved differently based on column traits:

- **CELL_BY_CELL / ROW_STREAMABLE**: Row-local dependencies (same row)
- **BARRIER**: BarrierNodeId depends on ALL cells of dependency columns


In [13]:
# Dependencies for a start column (none)
cell = CellNodeId(row=2, column="category")
deps = graph.get_dependencies(cell)
print(f"Dependencies for {cell}: {deps}")

Dependencies for Cell(2, 'category'): []


In [14]:
# Dependencies for a cell-by-cell column (row-local)
cell = CellNodeId(row=2, column="question")
deps = graph.get_dependencies(cell)
print(f"Dependencies for {cell}: {deps}")
print("  -> Same row in dependency column (category)")

Dependencies for Cell(2, 'question'): [Cell(2, 'category')]
  -> Same row in dependency column (category)


In [15]:
# Dependencies for a barrier node (ALL cells of dependency columns)
barrier = BarrierNodeId(column="validation")
deps = graph.get_dependencies(barrier)
print(f"Dependencies for {barrier}:")
for dep in deps:
    print(f"  {dep}")
print("  -> ALL cells of dependency column (question)")

Dependencies for Barrier('validation'):
  Cell(0, 'question')
  Cell(1, 'question')
  Cell(2, 'question')
  Cell(3, 'question')
  Cell(4, 'question')
  -> ALL cells of dependency column (question)


In [16]:
# Dependencies for output cells of a barrier column
# They depend on the barrier node, not directly on input cells
cell = CellNodeId(row=2, column="validation")
deps = graph.get_dependencies(cell)
print(f"Dependencies for {cell}: {deps}")
print("  -> Depends on the barrier being complete")

Dependencies for Cell(2, 'validation'): [Barrier('validation')]
  -> Depends on the barrier being complete


## 7. Completion Tracking

The CompletionTracker provides memory-efficient tracking:

- Tracks fully completed columns as a set of names: O(C)
- Only stores partial completion progress for in-progress columns
- Automatically compacts when columns fully complete


In [17]:
# Create a completion tracker
tracker = CompletionTracker(num_records=graph.num_records)

print("Initial state:")
print(f"  Completed nodes: {len(tracker)}")
print(f"  category complete: {tracker.is_column_complete('category')}")

Initial state:
  Completed nodes: 0
  category complete: False


In [18]:
# Mark individual cells complete
tracker.mark_complete(CellNodeId(0, "category"))
tracker.mark_complete(CellNodeId(1, "category"))

print("After marking 2 cells complete:")
print(f"  Completed nodes: {len(tracker)}")
print(f"  category completion count: {tracker.column_completion_count('category')}")
print(f"  category complete: {tracker.is_column_complete('category')}")

After marking 2 cells complete:
  Completed nodes: 2
  category completion count: 2
  category complete: False


In [19]:
# Check node completion using 'in' syntax
print(f"CellNodeId(0, 'category') in tracker: {CellNodeId(0, 'category') in tracker}")
print(f"CellNodeId(2, 'category') in tracker: {CellNodeId(2, 'category') in tracker}")

CellNodeId(0, 'category') in tracker: True
CellNodeId(2, 'category') in tracker: False


In [20]:
# Complete the rest of the category column
for row in range(2, graph.num_records):
    tracker.mark_complete(CellNodeId(row, "category"))

print("After completing category column:")
print(f"  Completed nodes: {len(tracker)}")
print(f"  category complete: {tracker.is_column_complete('category')}")

After completing category column:
  Completed nodes: 5
  category complete: True


## 8. Ready Node Iteration

The `iter_ready_nodes()` method is the primary interface for async execution engines.
It yields nodes whose dependencies are all satisfied.


In [21]:
# Now that category is complete, question cells should be ready
ready_nodes = list(graph.iter_ready_nodes(tracker))

print("Ready nodes after category completion:")
for node in ready_nodes:
    print(f"  {node}")

Ready nodes after category completion:
  Cell(0, 'question')
  Cell(1, 'question')
  Cell(2, 'question')
  Cell(3, 'question')
  Cell(4, 'question')


In [22]:
# Complete some question cells
for row in range(graph.num_records):
    tracker.mark_complete(CellNodeId(row, "question"))

# Now the validation barrier should be ready
ready_nodes = list(graph.iter_ready_nodes(tracker))

print("Ready nodes after question completion:")
for node in ready_nodes:
    print(f"  {node}")

Ready nodes after question completion:
  Barrier('validation')


In [23]:
# Complete the barrier
tracker.mark_complete(BarrierNodeId("validation"))

# Now validation output cells should be ready
ready_nodes = list(graph.iter_ready_nodes(tracker))

print("Ready nodes after barrier completion:")
for node in ready_nodes:
    print(f"  {node}")

Ready nodes after barrier completion:
  Cell(0, 'validation')
  Cell(1, 'validation')
  Cell(2, 'validation')
  Cell(3, 'validation')
  Cell(4, 'validation')


In [24]:
# Complete validation cells
for row in range(graph.num_records):
    tracker.mark_complete(CellNodeId(row, "validation"))

# Check if graph is complete
print(f"Graph complete: {graph.is_complete(tracker)}")
print(f"Total completed nodes: {len(tracker)}")

Graph complete: True
Total completed nodes: 16


## 9. Simulation: Full Execution Loop

Let's simulate a complete execution loop using the graph.


In [25]:
# Create a fresh graph and tracker
graph = ExecutionGraph(
    num_records=3,
    column_descriptors=[category_desc, question_desc, validation_desc],
)
tracker = CompletionTracker(num_records=graph.num_records)

print("=== Starting Execution Simulation ===")
print(f"Graph: {graph.num_records} records, {graph.num_columns} columns\n")

iteration = 0
while not graph.is_complete(tracker):
    iteration += 1
    ready = list(graph.iter_ready_nodes(tracker))

    if not ready:
        print("ERROR: No ready nodes but graph not complete!")
        break

    print(f"Iteration {iteration}: {len(ready)} ready nodes")
    for node in ready:
        # Simulate execution
        print(f"  Executing: {node}")
        tracker.mark_complete(node)
    print()

print("=== Execution Complete ===")
print(f"Total iterations: {iteration}")
print(f"Total nodes processed: {len(tracker)}")

=== Starting Execution Simulation ===
Graph: 3 records, 3 columns

Iteration 1: 3 ready nodes
  Executing: Cell(0, 'category')
  Executing: Cell(1, 'category')
  Executing: Cell(2, 'category')

Iteration 2: 3 ready nodes
  Executing: Cell(0, 'question')
  Executing: Cell(1, 'question')
  Executing: Cell(2, 'question')

Iteration 3: 1 ready nodes
  Executing: Barrier('validation')

Iteration 4: 3 ready nodes
  Executing: Cell(0, 'validation')
  Executing: Cell(1, 'validation')
  Executing: Cell(2, 'validation')

=== Execution Complete ===
Total iterations: 4
Total nodes processed: 10


## 10. Using GraphBuilder with Real Configs

In practice, you'll use `GraphBuilder` to construct graphs from `DataDesignerConfig` objects.
The builder infers execution traits from generator properties automatically.


In [26]:
# Import the config API and registry
import data_designer.config as dd
from data_designer.engine.column_generators.registry import create_default_column_generator_registry

# Create a simple config
config_builder = dd.DataDesignerConfigBuilder()

# Add a sampler column (START trait)
config_builder.add_column(
    dd.SamplerColumnConfig(
        name="category",
        sampler_type=dd.SamplerType.CATEGORY,
        params=dd.CategorySamplerParams(values=["science", "history", "math"]),
    )
)

# Add an LLM text column (depends on category)
config_builder.add_column(
    dd.LLMTextColumnConfig(
        name="question",
        prompt="Generate a trivia question about {{ category }}",
        model_alias="gpt-4o-mini",
    )
)

config = config_builder.build()
print(f"Config columns: {[c.name for c in config.columns]}")

Config columns: ['category', 'question']


In [27]:
# Build the graph using GraphBuilder
builder = GraphBuilder(create_default_column_generator_registry())

graph = builder.build(config, num_records=100)

print("Built graph:")
print(f"  num_records: {graph.num_records}")
print(f"  num_columns: {graph.num_columns}")
print(f"  num_nodes: {graph.num_nodes}")
print(f"  start_columns: {graph.start_columns}")
print(f"  column_names: {graph.column_names}")

Built graph:
  num_records: 100
  num_columns: 2
  num_nodes: 200
  start_columns: ['category']
  column_names: ['category', 'question']


In [28]:
# Inspect inferred traits
for col_name in graph.column_names:
    desc = graph.get_column_descriptor(col_name)
    print(f"\n{col_name}:")
    print(f"  traits: {desc.traits}")
    print(f"  is_start_column: {desc.is_start_column}")
    print(f"  is_barrier: {desc.is_barrier}")
    print(f"  dependencies: {desc.dependencies}")


category:
  traits: ExecutionTraits.START|ROW_STREAMABLE
  is_start_column: True
  is_barrier: False
  dependencies: []

question:
  traits: ExecutionTraits.CELL_BY_CELL|ROW_STREAMABLE
  is_start_column: False
  is_barrier: False
  dependencies: ['category']


## Summary

The execution graph framework provides:

1. **Memory Efficiency**: O(C) storage for column metadata, virtual nodes computed on-demand
2. **Trait-Based Execution**: Generators describe their execution characteristics via properties
3. **Flexible Dependencies**: Row-local for streaming, barrier synchronization for batch operations
4. **Async-Ready**: `iter_ready_nodes()` enables efficient async/parallel execution
5. **Plugin Support**: Traits inferred from properties, not class names

### Key API Methods

| Method                           | Description                              |
| -------------------------------- | ---------------------------------------- |
| `iter_start_nodes()`             | Nodes that can begin immediately         |
| `iter_ready_nodes(tracker)`      | Nodes with satisfied dependencies        |
| `get_dependencies(node)`         | Dependencies for a node                  |
| `get_dependents(node)`           | Nodes that depend on this node           |
| `get_generator_and_config(node)` | Generator class and config for execution |
