# Getting Started with Meridian Runtime

Welcome to your first interactive Meridian Runtime tutorial! This notebook will guide you through the basic concepts of building dataflow graphs with Meridian Runtime.

## What You'll Learn

- How to create simple nodes and connect them
- Understanding the basic graph execution model
- Interactive exploration of message flow
- Real-time visualization of graph behavior

## Prerequisites

Make sure you have the notebook dependencies installed:
```bash
uv sync --extra notebooks
```


In [2]:
# Import required libraries
import sys
import time
import threading
from typing import List, Dict, Any
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, clear_output
import networkx as nx
import pandas as pd

# Add the project root to the path
sys.path.insert(0, '../..')

# Import Meridian Runtime
from meridian.core import (
    Node, Message, MessageType, Subgraph, Scheduler, 
    PortSpec, Port, PortDirection
)
from meridian.observability.config import ObservabilityConfig, configure_observability
from meridian.observability.logging import get_logger, with_context

print("✅ Meridian Runtime imported successfully!")


✅ Meridian Runtime imported successfully!


## Step 1: Understanding the Basic Components

Meridian Runtime is built around four core concepts:

1. **Nodes**: Single-responsibility processing units
2. **Edges**: Bounded queues connecting nodes
3. **Subgraphs**: Composable groups of nodes
4. **Scheduler**: Coordinates execution and enforces backpressure

Let's start by creating a simple producer-consumer pattern.


In [3]:
# Create a simple producer node
class InteractiveProducer(Node):
    """A producer that generates numbers with interactive controls."""
    
    def __init__(self, name: str = "producer", max_count: int = 10):
        super().__init__(
            name=name,
            inputs=[],
            outputs=[Port("output", PortDirection.OUTPUT, spec=PortSpec("output", int))]
        )
        self._max_count = max_count
        self._current = 0
        self._messages_sent = 0
        
    def _handle_tick(self):
        """Emit a number on each tick until we reach max_count."""
        if self._current < self._max_count:
            self.emit("output", Message(MessageType.DATA, self._current))
            self._current += 1
            self._messages_sent += 1
            
    def get_stats(self) -> Dict[str, Any]:
        """Get current statistics for visualization."""
        return {
            "messages_sent": self._messages_sent,
            "current_value": self._current,
            "is_complete": self._current >= self._max_count
        }

# Create a simple consumer node
class InteractiveConsumer(Node):
    """A consumer that collects and displays received messages."""
    
    def __init__(self, name: str = "consumer"):
        super().__init__(
            name=name,
            inputs=[Port("input", PortDirection.INPUT, spec=PortSpec("input", int))],
            outputs=[]
        )
        self._received_messages = []
        self._message_count = 0
        
    def _handle_message(self, port: str, msg: Message):
        """Process incoming messages."""
        if port == "input":
            self._received_messages.append(msg.payload)
            self._message_count += 1
            print(f"📨 Consumer received: {msg.payload}")
            
    def get_stats(self) -> Dict[str, Any]:
        """Get current statistics for visualization."""
        return {
            "messages_received": self._message_count,
            "last_values": self._received_messages[-5:] if self._received_messages else [],
            "all_values": self._received_messages.copy()
        }

print("✅ Producer and Consumer nodes created!")


✅ Producer and Consumer nodes created!


## Step 2: Building Your First Graph

Now let's create a simple graph that connects our producer to our consumer. We'll use interactive controls to explore different configurations.


In [5]:
# Interactive controls for graph configuration
max_count_slider = widgets.IntSlider(
    value=5, min=1, max=20, step=1,
    description='Max Count:',
    style={'description_width': 'initial'}
)

capacity_slider = widgets.IntSlider(
    value=8, min=1, max=32, step=1,
    description='Edge Capacity:',
    style={'description_width': 'initial'}
)

tick_interval_slider = widgets.IntSlider(
    value=100, min=50, max=500, step=50,
    description='Tick Interval (ms):',
    style={'description_width': 'initial'}
)

# Display controls
print("🎛️  Graph Configuration Controls:")
display(max_count_slider, capacity_slider, tick_interval_slider)


🎛️  Graph Configuration Controls:


IntSlider(value=5, description='Max Count:', max=20, min=1, style=SliderStyle(description_width='initial'))

IntSlider(value=8, description='Edge Capacity:', max=32, min=1, style=SliderStyle(description_width='initial')…

IntSlider(value=100, description='Tick Interval (ms):', max=500, min=50, step=50, style=SliderStyle(descriptio…

In [8]:
def build_interactive_graph(max_count: int, capacity: int) -> tuple[Subgraph, InteractiveProducer, InteractiveConsumer]:
    """Build a graph with the given configuration."""
    
    # Create nodes
    producer = InteractiveProducer(name="producer", max_count=max_count)
    consumer = InteractiveConsumer(name="consumer")
    
    # Create subgraph
    graph = Subgraph.from_nodes("interactive_demo", [producer, consumer])
    
    # Connect nodes with specified capacity
    graph.connect(("producer", "output"), ("consumer", "input"), capacity=capacity)
    
    return graph, producer, consumer

# Build the graph with current slider values
graph, producer, consumer = build_interactive_graph(
    max_count_slider.value,
    capacity_slider.value
)

print(f"✅ Graph built with {max_count_slider.value} messages and capacity {capacity_slider.value}")


✅ Graph built with 15 messages and capacity 32


## Step 3: Visualizing the Graph Structure

Let's create a visual representation of our graph to understand the topology.


In [9]:
def visualize_graph(graph: Subgraph):
    """Create a visual representation of the graph topology."""
    
    # Create NetworkX graph for visualization
    G = nx.DiGraph()
    
    # Add nodes
    for node in graph.nodes:
        G.add_node(node.name, type=node.__class__.__name__)
    
    # Add edges with capacity information
    for edge in graph.edges:
        G.add_edge(
            edge.source_node, 
            edge.target_node, 
            capacity=edge.capacity,
            policy=edge.policy.__class__.__name__
        )
    
    # Create the visualization
    plt.figure(figsize=(10, 6))
    pos = nx.spring_layout(G, seed=42)
    
    # Draw nodes
    nx.draw_networkx_nodes(G, pos, node_color='lightblue', node_size=2000)
    nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold')
    
    # Draw edges with labels
    edge_labels = {(u, v): f"cap={d['capacity']}" for u, v, d in G.edges(data=True)}
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=8)
    nx.draw_networkx_edges(G, pos, edge_color='gray', arrows=True, arrowsize=20)
    
    plt.title("Meridian Runtime Graph Topology", fontsize=14, fontweight='bold')
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    
    # Print graph information
    print(f"📊 Graph Information:")
    print(f"   Nodes: {len(graph.nodes)}")
    print(f"   Edges: {len(graph.edges)}")
    for edge in graph.edges:
        print(f"   {edge.source_node} → {edge.target_node} (capacity: {edge.capacity})")

# Visualize our graph
visualize_graph(graph)


AttributeError: 'str' object has no attribute 'name'

## Step 4: Running the Graph with Real-time Monitoring

Now let's run our graph and watch the messages flow in real-time!


In [None]:
# Configure observability for better visibility
configure_observability(ObservabilityConfig(
    log_level="INFO",
    log_json=False,  # Human-readable logs for notebooks
    metrics_enabled=True,
    tracing_enabled=False
))

def run_graph_with_monitoring(graph: Subgraph, producer: InteractiveProducer, 
                              consumer: InteractiveConsumer, max_ticks: int = 50):
    """Run the graph with real-time monitoring and visualization."""
    
    # Create scheduler
    scheduler = Scheduler()
    scheduler.register(graph)
    
    # Monitoring data
    producer_stats = []
    consumer_stats = []
    tick_count = 0
    
    print("🚀 Starting graph execution...")
    print("=" * 50)
    
    # Run for a limited number of ticks to avoid infinite loops
    while tick_count < max_ticks:
        # Check if producer is complete
        producer_stat = producer.get_stats()
        if producer_stat["is_complete"]:
            print("✅ Producer completed!")
            break
            
        # Run one tick
        scheduler._run_one_tick()
        tick_count += 1
        
        # Collect stats every few ticks
        if tick_count % 5 == 0:
            producer_stats.append(producer.get_stats())
            consumer_stats.append(consumer.get_stats())
            
            # Print progress
            print(f"Tick {tick_count}: Producer sent {producer_stat['messages_sent']}, "
                  f"Consumer received {consumer.get_stats()['messages_received']}")
    
    print("=" * 50)
    print("🏁 Graph execution completed!")
    
    # Final statistics
    final_producer_stats = producer.get_stats()
    final_consumer_stats = consumer.get_stats()
    
    print(f"📈 Final Statistics:")
    print(f"   Producer sent: {final_producer_stats['messages_sent']} messages")
    print(f"   Consumer received: {final_consumer_stats['messages_received']} messages")
    print(f"   Messages received: {final_consumer_stats['all_values']}")
    
    return producer_stats, consumer_stats

# Run the graph
producer_stats, consumer_stats = run_graph_with_monitoring(graph, producer, consumer)


## Step 5: Interactive Experimentation

Now let's create an interactive experiment where you can change parameters and see the results!


In [None]:
# Create an interactive experiment button
experiment_button = widgets.Button(
    description="Run New Experiment",
    button_style='success',
    tooltip='Click to run a new experiment with current settings'
)

output_area = widgets.Output()

def on_experiment_click(b):
    """Handle experiment button click."""
    with output_area:
        clear_output()
        
        # Build new graph with current settings
        new_graph, new_producer, new_consumer = build_interactive_graph(
            max_count_slider.value,
            capacity_slider.value
        )
        
        # Run the experiment
        run_graph_with_monitoring(new_graph, new_producer, new_consumer)
        
        # Show the graph visualization
        visualize_graph(new_graph)

experiment_button.on_click(on_experiment_click)

print("🔬 Interactive Experiment Controls:")
print("Adjust the sliders above, then click the button to run a new experiment!")
display(experiment_button, output_area)


## Step 6: Understanding Backpressure

Let's explore what happens when we create a bottleneck in our graph. This demonstrates Meridian's backpressure mechanism.


In [None]:
# Create a slow consumer to demonstrate backpressure
class SlowConsumer(InteractiveConsumer):
    """A consumer that processes messages slowly to create backpressure."""
    
    def __init__(self, name: str = "slow_consumer", delay_ms: int = 200):
        super().__init__(name=name)
        self._delay_ms = delay_ms
        
    def _handle_message(self, port: str, msg: Message):
        """Process messages with artificial delay."""
        if port == "input":
            # Simulate slow processing
            time.sleep(self._delay_ms / 1000.0)
            self._received_messages.append(msg.payload)
            self._message_count += 1
            print(f"🐌 Slow consumer processed: {msg.payload} (took {self._delay_ms}ms)")

# Interactive controls for backpressure experiment
delay_slider = widgets.IntSlider(
    value=100, min=0, max=500, step=50,
    description='Consumer Delay (ms):',
    style={'description_width': 'initial'}
)

backpressure_button = widgets.Button(
    description="Run Backpressure Experiment",
    button_style='warning',
    tooltip='Click to run backpressure experiment'
)

backpressure_output = widgets.Output()

def on_backpressure_click(b):
    """Handle backpressure experiment button click."""
    with backpressure_output:
        clear_output()
        
        print(f"🔍 Backpressure Experiment with {delay_slider.value}ms delay")
        print("=" * 60)
        
        # Create graph with slow consumer
        producer = InteractiveProducer(name="fast_producer", max_count=10)
        consumer = SlowConsumer(name="slow_consumer", delay_ms=delay_slider.value)
        
        graph = Subgraph.from_nodes("backpressure_demo", [producer, consumer])
        graph.connect(("fast_producer", "output"), ("slow_consumer", "input"), capacity=3)
        
        # Run with monitoring
        start_time = time.time()
        run_graph_with_monitoring(graph, producer, consumer, max_ticks=100)
        end_time = time.time()
        
        print(f"⏱️  Total execution time: {end_time - start_time:.2f} seconds")
        print(f"📊 Producer sent: {producer.get_stats()['messages_sent']} messages")
        print(f"📊 Consumer received: {consumer.get_stats()['messages_received']} messages")
        
        # Explain what happened
        if delay_slider.value > 0:
            print("\n💡 Backpressure Explanation:")
            print("   - The slow consumer created a bottleneck")
            print("   - The edge capacity (3) limited buffering")
            print("   - The producer was forced to wait (backpressure)")
            print("   - This prevents unbounded memory usage")

backpressure_button.on_click(on_backpressure_click)

print("🔍 Backpressure Experiment Controls:")
display(delay_slider, backpressure_button, backpressure_output)


## Summary

Congratulations! You've successfully completed your first Meridian Runtime tutorial. Here's what you've learned:

### Key Concepts
✅ **Nodes**: Single-responsibility processing units (Producer, Consumer)
✅ **Edges**: Bounded queues connecting nodes with configurable capacity
✅ **Subgraphs**: Composable groups of nodes
✅ **Scheduler**: Coordinates execution and manages the graph lifecycle
✅ **Backpressure**: Automatic flow control when consumers can't keep up

### Interactive Features
✅ **Real-time monitoring**: Watch messages flow through your graph
✅ **Visual graph topology**: See how nodes are connected
✅ **Parameter experimentation**: Adjust settings and see immediate results
✅ **Backpressure demonstration**: Understand flow control in action

### Next Steps

Ready to explore more advanced concepts? Check out:

1. **02-backpressure-policies.ipynb**: Different overflow policies (Drop, Latest, Coalesce)
2. **03-control-plane-priorities.ipynb**: Priority messaging and control flow
3. **04-observability-basics.ipynb**: Logs, metrics, and tracing

### Production Considerations

Remember that Jupyter notebooks are great for learning and experimentation, but for production:

- Use the command-line examples in the `examples/` directory
- Implement proper error handling and graceful shutdown
- Consider performance implications of your graph design
- Use appropriate observability and monitoring

Happy building with Meridian Runtime! 🚀
