# Graph Event System Examples

**All examples print actual event output!**

In [1]:
import json

from lfx.debug.event_recorder import record_graph_with_events
from lfx.debug.events import GraphMutationEvent
from lfx.graph import Graph

  from .autonotebook import tqdm as notebook_tqdm


## Example 1: Dependency Events

The most important event - when run_predecessors and run_map change.

In [None]:
# Direct API call to see dependency events
graph = Graph()

async def print_deps(event: GraphMutationEvent):
    if event.event_type == "dependency_added":
        print(f"\n{event.timing.upper()} Event (Step {event.step}):")
        print(f"  Vertex: {event.changes['vertex']}")
        print(f"  Predecessor: {event.changes['predecessor']}")

        if event.timing == "after":
            print("\n  State AFTER mutation:")
            print(json.dumps(event.state_after, indent=4))
            print(f"\n  ✅ run_predecessors changed: {event.changes['run_predecessors_changed']}")
            print(f"  ✅ run_map changed: {event.changes['run_map_changed']}")

graph.register_observer(print_deps)

print('Calling graph.add_dynamic_dependency("ComponentA", "ComponentB")...')
await graph.add_dynamic_dependency("ComponentA", "ComponentB")


Calling graph.add_dynamic_dependency("ComponentA", "ComponentB")...

BEFORE Event (Step 0):
  Vertex: ComponentA
  Predecessor: ComponentB

AFTER Event (Step 1):
  Vertex: ComponentA
  Predecessor: ComponentB

  State AFTER mutation:
{
    "run_predecessors": {
        "ComponentA": [
            "ComponentB"
        ]
    },
    "run_map": {
        "ComponentB": [
            "ComponentA"
        ]
    }
}

  ✅ run_predecessors changed: True
  ✅ run_map changed: True

💡 Both structures updated atomically - impossible to desync!


## Example 2: Queue Events

See how the execution queue changes.

In [3]:
graph2 = Graph()

async def print_queue(event: GraphMutationEvent):
    if "queue" in event.event_type and event.timing == "after":
        before_size = event.changes.get("queue_size_before", 0)
        after_size = event.changes.get("queue_size_after", 0)

        print(f"Step {event.step}: {event.event_type}")
        print(f"  Queue: {before_size} → {after_size} items")

        if event.changes.get("added"):
            print(f"  Added: {event.changes['added']}")
        if event.changes.get("removed"):
            print(f"  Removed: {event.changes['removed']}")

graph2.register_observer(print_queue)

await graph2.extend_run_queue(["v1", "v2"])
await graph2.extend_run_queue(["v3"])
await graph2.get_next_in_queue()

Step 1: queue_extended
  Queue: 0 → 2 items
  Added: ['v1', 'v2']
Step 3: queue_extended
  Queue: 2 → 3 items
  Added: ['v3']
Step 5: queue_dequeued
  Queue: 3 → 2 items
  Removed: v1


'v1'

## Example 3: Vertex State Events

When vertices change state (ACTIVE/INACTIVE).

In [4]:
from lfx import components as cp
from lfx.graph.vertex.base import VertexStates

# Create a graph with vertices
inp = cp.ChatInput()
out = cp.ChatOutput().set(input_value=inp.message_response)
graph3 = Graph(inp, out)
graph3.prepare()

async def print_states(event: GraphMutationEvent):
    if event.event_type == "vertex_marked" and event.timing == "after":
        vertex = event.vertex_id.split("-")[0] if event.vertex_id else "?"
        old = event.changes.get("old_state")
        new = event.changes.get("new_state")
        print(f"Step {event.step}: {vertex} state {old} → {new}")

graph3.register_observer(print_states)

# Manually mark a vertex to trigger event
vertex_id = graph3.vertices[0].id
print(f'Marking {vertex_id.split("-")[0]} as INACTIVE...')
await graph3.mark_vertex(vertex_id, VertexStates.INACTIVE)

Marking ChatInput as INACTIVE...
Step 1: ChatInput state VertexStates.ACTIVE → VertexStates.INACTIVE


## Example 4: Complete Event Structure

See all fields in an event.

In [5]:
# Record a simple flow
recording = await record_graph_with_events(Graph(inp, out), "Demo")

print(f"Captured {len(recording.events)} events")

# Show complete structure of first event
if recording.events:
    print("\nComplete event structure:")
    print(json.dumps(recording.events[0].to_dict(), indent=2, default=str))

Captured 8 events

Complete event structure:
{
  "event_type": "queue_dequeued",
  "vertex_id": "ChatInput-vRY7R",
  "state_before": {
    "queue": [
      "ChatInput-vRY7R"
    ]
  },
  "state_after": {
    "queue": [
      "ChatInput-vRY7R"
    ]
  },
  "changes": {
    "about_to_remove": "ChatInput-vRY7R"
  },
  "graph_snapshot": {
    "run_manager": {
      "run_map": {
        "ChatInput-vRY7R": [
          "ChatOutput-tB2oc"
        ]
      },
      "run_predecessors": {
        "ChatOutput-tB2oc": []
      },
      "vertices_to_run": "{'ChatOutput-tB2oc', 'ChatInput-vRY7R'}",
      "vertices_being_run": "set()",
      "ran_at_least_once": "{'ChatOutput-tB2oc', 'ChatInput-vRY7R'}"
    },
    "queue": [
      "ChatInput-vRY7R"
    ],
    "vertices_layers": [
      [
        "ChatOutput-tB2oc"
      ]
    ],
    "context": {},
    "inactivated_vertices": "set()",
    "activated_vertices": [],
    "conditionally_excluded": "set()"
  },
  "step": 0,
  "metadata": {},
  "timing": "bef

## Summary

**Key Event Types:**
- `dependency_added`: run_predecessors & run_map updated
- `vertex_marked`: State changes (ACTIVE/INACTIVE)
- `queue_extended/dequeued`: Queue operations

**Every event contains:**
- state_before & state_after
- changes (what changed)
- graph_snapshot (complete context)
- timing (before/after)

**Benefits:**
- Zero overhead without observers
- Complete state history
- Impossible to desync structures
- Extensible for custom tracking