# Runnable in LangGraph: The Executable Interface

A Runnable in LangGraph is the compiled, executable form of a StateGraph. It's the interface that allows you to invoke, stream, and interact with your graph after it has been compiled. Runnables provide the actual execution capabilities for your LangGraph applications.

## Key Features:
- **Execution Interface**: Provides methods to run your compiled graph
- **State Management**: Handles state flow during execution
- **Multiple Execution Modes**: Support for sync, async, and streaming execution
- **Error Handling**: Built-in error management during execution

## Creating a Runnable:

### **Basic Runnable Creation**
```python
from langgraph import StateGraph
from typing import TypedDict

class MyState(TypedDict):
    user_input: str
    result: str

# Create and compile graph
graph = StateGraph(MyState)
graph.add_node("process", process_input)
graph.add_edge("process", "generate")
graph.set_entry_point("process")

# Compile into Runnable
runnable = graph.compile()
```

### **Runnable with Configuration**
```python
# Compile with custom configuration
runnable = graph.compile(
    debug=True,
    checkpointer=None,
    interrupt_before=None,
    interrupt_after=None
)
```

## Execution Methods:

### **Synchronous Execution**
```python
# Basic invocation
result = runnable.invoke({
    "user_input": "Hello World",
    "result": ""
})

print(result["result"])  # Output: Processed: HELLO WORLD
```

### **Asynchronous Execution**
```python
import asyncio

async def run_async():
    result = await runnable.ainvoke({
        "user_input": "Hello World",
        "result": ""
    })
    return result

# Execute asynchronously
result = asyncio.run(run_async())
```

### **Streaming Execution**
```python
# Stream intermediate results
for chunk in runnable.stream({
    "user_input": "Hello World",
    "result": ""
}):
    print(f"Step: {chunk}")
    # Output:
    # Step: {'process': {'processed_data': 'HELLO WORLD'}}
    # Step: {'generate': {'result': 'Processed: HELLO WORLD'}}
```

### **Batch Execution**
```python
# Process multiple inputs
inputs = [
    {"user_input": "Hello", "result": ""},
    {"user_input": "World", "result": ""},
    {"user_input": "LangGraph", "result": ""}
]

results = runnable.batch(inputs)
for result in results:
    print(result["result"])
```

## Advanced Execution Patterns:

### **Streaming with Configuration**
```python
# Stream with specific configuration
for chunk in runnable.stream(
    {"user_input": "Hello World", "result": ""},
    config={"recursion_limit": 50}
):
    print(f"Chunk: {chunk}")
```

### **Async Streaming**
```python
async def stream_async():
    async for chunk in runnable.astream({
        "user_input": "Hello World",
        "result": ""
    }):
        print(f"Async chunk: {chunk}")

# Run async streaming
asyncio.run(stream_async())
```

### **Execution with Callbacks**
```python
from langchain_core.callbacks import BaseCallbackHandler

class MyCallbackHandler(BaseCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Starting execution with inputs: {inputs}")
    
    def on_chain_end(self, outputs, **kwargs):
        print(f"Execution completed with outputs: {outputs}")

# Execute with callbacks
result = runnable.invoke(
    {"user_input": "Hello World", "result": ""},
    config={"callbacks": [MyCallbackHandler()]}
)
```

## Runnable State Management:

### **State Inspection**
```python
# Get current state during execution
def inspect_state(state):
    print(f"Current state: {state}")
    return state

# Add inspection node
graph.add_node("inspect", inspect_state)
```

### **State Persistence**
```python
# Create runnable with state persistence
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
runnable = graph.compile(checkpointer=checkpointer)

# Execute with thread ID for state persistence
result = runnable.invoke(
    {"user_input": "Hello World", "result": ""},
    config={"configurable": {"thread_id": "user-123"}}
)
```

### **State Updates**
```python
# Update state during execution
def update_state(state):
    return {
        "step_count": state.get("step_count", 0) + 1,
        "last_updated": datetime.now().isoformat()
    }
```

## Error Handling:

### **Execution Error Handling**
```python
try:
    result = runnable.invoke({
        "user_input": "Hello World",
        "result": ""
    })
    print("Execution successful:", result)
except Exception as e:
    print(f"Execution failed: {e}")
```

### **Streaming Error Handling**
```python
try:
    for chunk in runnable.stream({
        "user_input": "Hello World",
        "result": ""
    }):
        print(f"Chunk: {chunk}")
except Exception as e:
    print(f"Streaming failed: {e}")
```

### **Async Error Handling**
```python
async def safe_execution():
    try:
        result = await runnable.ainvoke({
            "user_input": "Hello World",
            "result": ""
        })
        return result
    except Exception as e:
        print(f"Async execution failed: {e}")
        return None
```

## Runnable Configuration:

### **Execution Configuration**
```python
# Configure execution parameters
config = {
    "recursion_limit": 100,
    "max_concurrency": 10,
    "timeout": 30,
    "callbacks": [MyCallbackHandler()]
}

result = runnable.invoke(
    {"user_input": "Hello World", "result": ""},
    config=config
)
```

### **Thread Configuration**
```python
# Execute with thread-specific configuration
result = runnable.invoke(
    {"user_input": "Hello World", "result": ""},
    config={
        "configurable": {
            "thread_id": "user-123",
            "checkpoint_id": "checkpoint-456"
        }
    }
)
```

## Runnable Methods:

### **Invoke Methods**
```python
# Synchronous invoke
result = runnable.invoke(input_data)

# Asynchronous invoke
result = await runnable.ainvoke(input_data)

# Batch invoke
results = runnable.batch([input1, input2, input3])
```

### **Stream Methods**
```python
# Synchronous stream
for chunk in runnable.stream(input_data):
    process_chunk(chunk)

# Asynchronous stream
async for chunk in runnable.astream(input_data):
    await process_chunk_async(chunk)
```

### **Batch Methods**
```python
# Batch processing
results = runnable.batch(inputs)

# Async batch processing
results = await runnable.abatch(inputs)
```

## Runnable Properties:

### **Graph Information**
```python
# Get graph structure
print("Nodes:", runnable.nodes)
print("Edges:", runnable.edges)
print("Entry point:", runnable.entry_point)
```

### **Execution Metadata**
```python
# Get execution information
print("Graph name:", runnable.name)
print("Graph description:", runnable.description)
print("Graph version:", runnable.version)
```

## Best Practices:

### **1. Execution Management**
- Use appropriate execution method for your use case
- Handle errors gracefully at execution level
- Monitor execution performance
- Use streaming for long-running processes

### **2. State Management**
- Design state structure for your execution needs
- Use state persistence for multi-step processes
- Validate state before execution
- Handle state transitions properly

### **3. Error Handling**
- Always wrap execution in try-catch blocks
- Provide meaningful error messages
- Implement fallback mechanisms
- Log execution errors

### **4. Performance**
- Use async execution for I/O operations
- Implement proper timeout handling
- Monitor memory usage during execution
- Use batch processing for multiple inputs

## Common Use Cases:

### **API Endpoints**
```python
# Use runnable in API endpoint
@app.post("/process")
async def process_input(request: Request):
    result = await runnable.ainvoke({
        "user_input": request.json["input"],
        "result": ""
    })
    return {"result": result["result"]}
```

### **Background Tasks**
```python
# Use runnable in background task
async def background_processing():
    result = await runnable.ainvoke({
        "user_input": "Background task",
        "result": ""
    })
    return result
```

### **Interactive Applications**
```python
# Use runnable in interactive app
def interactive_chat():
    while True:
        user_input = input("You: ")
        result = runnable.invoke({
            "user_input": user_input,
            "result": ""
        })
        print(f"Bot: {result['result']}")
```

## Benefits:
- **Flexible Execution**: Multiple execution modes
- **State Management**: Built-in state handling
- **Error Handling**: Robust error management
- **Performance**: Optimized execution
- **Integration**: Easy integration with other systems

## Tips for Success:
- Choose the right execution method for your needs
- Handle errors at the execution level
- Use streaming for long-running processes
- Implement proper state management
- Monitor execution performance
- Test with various input scenarios
- Use async execution for I/O operations
- Implement proper timeout handling
