# LangGraph Agentic Workflows: Interactive Implementation

**Framework:** LangGraph by LangChain  
**Focus:** Graph-based agent orchestration and stateful workflows  
**Format:** Interactive Jupyter Notebook with executable examples

## Overview

LangGraph enables building stateful, multi-actor applications with LLMs using graph-based workflows. This notebook demonstrates:

1. **Core Concepts** - StateGraph, nodes, edges, conditional routing
2. **Workflow Patterns** - All 6 patterns from Anthropic implemented in LangGraph
3. **Multi-Agent Systems** - Supervisor-worker and collaborative patterns
4. **Advanced Features** - Persistence, human-in-the-loop, memory
5. **Production Patterns** - Error handling, monitoring, deployment

## Why LangGraph?

**Advantages over plain API calls:**
- **State Management:** Built-in state tracking across workflow steps
- **Graph-based Flow:** Cyclical workflows, conditional routing
- **Persistence:** Save/resume workflows, human-in-the-loop
- **Debugging:** Visual graph inspection, step-by-step execution
- **Streaming:** Real-time output streaming

## Setup

In [None]:
# Install required packages
!pip install langgraph langchain langchain-anthropic langchain-openai python-dotenv

In [None]:
import os
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from dotenv import load_dotenv
import operator

# Load API key
load_dotenv()
os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY")

# Initialize LLM
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022", temperature=0.7)

print("✓ LangGraph setup complete!")

---

## Part 1: LangGraph Fundamentals

### Core Concepts

**StateGraph:** Defines the workflow structure
- **State:** Typed dictionary tracking workflow data
- **Nodes:** Functions that process state
- **Edges:** Connections between nodes (sequential, conditional, parallel)

### Basic State Definition

## LangGraph Architecture: Core Concepts

### The Graph Structure

```mermaid
graph LR
    START([START]) --> N1[Node 1]
    N1 --> N2[Node 2]
    N2 --> D{Decision}
    D -->|Condition A| N3[Node 3]
    D -->|Condition B| N4[Node 4]
    N3 --> END([END])
    N4 --> END
    
    style START fill:#c8e6c9
    style END fill:#c8e6c9
    style N1 fill:#e1f5ff
    style N2 fill:#e1f5ff
    style D fill:#fff9c4
    style N3 fill:#e1f5ff
    style N4 fill:#e1f5ff
```

### State Flow

```mermaid
sequenceDiagram
    participant App
    participant Graph
    participant N1 as Node 1
    participant N2 as Node 2
    participant N3 as Node 3

    App->>Graph: invoke(initial_state)
    Graph->>N1: state_v1
    N1-->>Graph: updated_state_v2
    Graph->>N2: state_v2
    N2-->>Graph: updated_state_v3
    Graph->>N3: state_v3
    N3-->>Graph: final_state
    Graph-->>App: return final_state
    
    Note over Graph,N3: State immutably<br/>transformed at each node
```

### LangGraph vs Traditional Approaches

#### Comparison Table

| Aspect | Traditional Functions | LangGraph |
|--------|----------------------|------------|
| **State Management** | Manual variable passing | Built-in StateGraph |
| **Conditional Logic** | if/else statements | Conditional edges |
| **Loops** | while loops, recursion | Cycles in graph |
| **Persistence** | Manual save/load | Checkpointer |
| **Visualization** | None | Graph rendering |
| **Debugging** | print(), logging | Step-by-step execution |
| **Human-in-Loop** | Complex implementation | Built-in interrupt |
| **Scalability** | Hard to maintain | Declarative structure |

#### Example: Conditional Workflow

**Traditional Approach:**
```python
def process(data):
    result = step1(data)
    category = classify(result)
    
    if category == 'A':
        output = handle_a(result)
    elif category == 'B':
        output = handle_b(result)
    else:
        output = handle_default(result)
    
    return finalize(output)
```

**LangGraph Approach:**
```python
workflow = StateGraph(State)
workflow.add_node('step1', step1_fn)
workflow.add_node('classify', classify_fn)
workflow.add_node('handle_a', handle_a_fn)
workflow.add_node('handle_b', handle_b_fn)
workflow.add_node('handle_default', default_fn)
workflow.add_node('finalize', finalize_fn)

workflow.add_edge(START, 'step1')
workflow.add_edge('step1', 'classify')
workflow.add_conditional_edges('classify', router_fn, {
    'A': 'handle_a',
    'B': 'handle_b',
    'default': 'handle_default'
})
workflow.add_edge('handle_a', 'finalize')
workflow.add_edge('handle_b', 'finalize')
workflow.add_edge('handle_default', 'finalize')
workflow.add_edge('finalize', END)

app = workflow.compile()
```

**Benefits of LangGraph:**
- ✅ Declarative and visual
- ✅ Easy to modify routing
- ✅ Can add persistence
- ✅ Supports streaming
- ✅ Built-in state management

In [None]:
# Define workflow state
class BasicState(TypedDict):
    """
    State tracks data flowing through the workflow
    """
    input: str
    output: str
    step_count: int

# Simple node function
def process_node(state: BasicState) -> BasicState:
    """Node function: receives state, returns updated state"""
    print(f"Processing: {state['input']}")
    return {
        **state,
        "output": f"Processed: {state['input']}",
        "step_count": state.get("step_count", 0) + 1
    }

# Create simple graph
def create_simple_graph():
    workflow = StateGraph(BasicState)
    
    # Add node
    workflow.add_node("process", process_node)
    
    # Define edges
    workflow.add_edge(START, "process")
    workflow.add_edge("process", END)
    
    # Compile
    return workflow.compile()

# Test simple graph
simple_app = create_simple_graph()

result = simple_app.invoke({
    "input": "Hello LangGraph",
    "step_count": 0
})

print(f"\nResult: {result['output']}")
print(f"Steps: {result['step_count']}")

---

## Part 2: Pattern 1 - Prompt Chaining with LangGraph

Sequential processing where each step builds on the previous output.

In [None]:
# State for prompt chaining
class ChainState(TypedDict):
    product_description: str
    key_features: str
    marketing_copy: str
    translation: str
    current_step: str

# Step 1: Extract features
def extract_features(state: ChainState) -> ChainState:
    print("[Step 1] Extracting features...")
    
    messages = [HumanMessage(content=f"""Extract 3-5 key features from this product:
    
{state['product_description']}

List as bullet points.""")]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "key_features": response.content,
        "current_step": "extract"
    }

# Step 2: Generate marketing copy
def generate_copy(state: ChainState) -> ChainState:
    print("[Step 2] Generating marketing copy...")
    
    messages = [HumanMessage(content=f"""Create compelling marketing copy from these features:
    
{state['key_features']}

Write 2-3 engaging paragraphs.""")]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "marketing_copy": response.content,
        "current_step": "generate"
    }

# Step 3: Translate
def translate_copy(state: ChainState) -> ChainState:
    print("[Step 3] Translating to Spanish...")
    
    messages = [HumanMessage(content=f"""Translate this marketing copy to Spanish:
    
{state['marketing_copy']}

Maintain tone and style.""")]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "translation": response.content,
        "current_step": "translate"
    }

# Build prompt chaining graph
def create_chain_graph():
    workflow = StateGraph(ChainState)
    
    # Add nodes in sequence
    workflow.add_node("extract", extract_features)
    workflow.add_node("generate", generate_copy)
    workflow.add_node("translate", translate_copy)
    
    # Sequential edges
    workflow.add_edge(START, "extract")
    workflow.add_edge("extract", "generate")
    workflow.add_edge("generate", "translate")
    workflow.add_edge("translate", END)
    
    return workflow.compile()

# Test prompt chaining
print("=" * 60)
print("PROMPT CHAINING WITH LANGGRAPH")
print("=" * 60 + "\n")

chain_app = create_chain_graph()

result = chain_app.invoke({
    "product_description": """SmartWatch Pro X: Advanced fitness tracker with 7-day battery,
    heart rate monitoring, GPS, water-resistant to 50m, smartphone integration.""",
    "key_features": "",
    "marketing_copy": "",
    "translation": "",
    "current_step": ""
})

print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
print(f"\n📝 Features:\n{result['key_features']}")
print(f"\n📢 Marketing Copy:\n{result['marketing_copy']}")
print(f"\n🌐 Spanish Translation:\n{result['translation']}")

---

## Part 3: Pattern 2 - Routing with Conditional Edges

Dynamic routing based on classification. This is where LangGraph shines!

In [None]:
# State for routing
class RoutingState(TypedDict):
    query: str
    category: str
    response: str

# Router node
def router(state: RoutingState) -> RoutingState:
    """Classify query into category"""
    print(f"[Router] Classifying: {state['query']}")
    
    messages = [HumanMessage(content=f"""Classify this query into ONE category:
    
Query: {state['query']}

Categories:
- technical: Product issues, bugs, setup
- billing: Payment, invoices, subscriptions
- general: General questions, info

Respond with ONLY the category name.""")]
    
    response = llm.invoke(messages)
    category = response.content.strip().lower()
    
    print(f"[Router] Category: {category}")
    
    return {
        **state,
        "category": category
    }

# Conditional edge function
def route_query(state: RoutingState) -> Literal["technical", "billing", "general"]:
    """Determine next node based on category"""
    category = state["category"]
    
    # Map to valid node names
    if "technical" in category:
        return "technical"
    elif "billing" in category:
        return "billing"
    else:
        return "general"

# Handler nodes
def handle_technical(state: RoutingState) -> RoutingState:
    print("[Technical Handler] Processing...")
    
    messages = [
        SystemMessage(content="You are a technical support specialist. Provide detailed troubleshooting."),
        HumanMessage(content=state['query'])
    ]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "response": response.content
    }

def handle_billing(state: RoutingState) -> RoutingState:
    print("[Billing Handler] Processing...")
    
    messages = [
        SystemMessage(content="You are a billing specialist. Explain charges clearly."),
        HumanMessage(content=state['query'])
    ]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "response": response.content
    }

def handle_general(state: RoutingState) -> RoutingState:
    print("[General Handler] Processing...")
    
    messages = [
        SystemMessage(content="You are a helpful customer service rep. Be friendly and concise."),
        HumanMessage(content=state['query'])
    ]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "response": response.content
    }

# Build routing graph
def create_routing_graph():
    workflow = StateGraph(RoutingState)
    
    # Add nodes
    workflow.add_node("router", router)
    workflow.add_node("technical", handle_technical)
    workflow.add_node("billing", handle_billing)
    workflow.add_node("general", handle_general)
    
    # Start -> router
    workflow.add_edge(START, "router")
    
    # Conditional routing from router
    workflow.add_conditional_edges(
        "router",
        route_query,
        {
            "technical": "technical",
            "billing": "billing",
            "general": "general"
        }
    )
    
    # All handlers -> END
    workflow.add_edge("technical", END)
    workflow.add_edge("billing", END)
    workflow.add_edge("general", END)
    
    return workflow.compile()

# Test routing
print("\n" + "=" * 60)
print("ROUTING WITH CONDITIONAL EDGES")
print("=" * 60 + "\n")

routing_app = create_routing_graph()

test_queries = [
    "My app keeps crashing when I export data",
    "I was charged twice this month",
    "What are your business hours?"
]

for query in test_queries:
    print(f"\nQuery: {query}")
    print("-" * 60)
    
    result = routing_app.invoke({
        "query": query,
        "category": "",
        "response": ""
    })
    
    print(f"\nResponse:\n{result['response'][:300]}...\n")
    print("=" * 60)

---

## Part 4: Pattern 3 - Orchestrator-Workers with Dynamic Dispatch

Using LangGraph's `Send` API for dynamic worker creation.

In [None]:
from langgraph.constants import Send
import json

# State definitions
class OrchestratorState(TypedDict):
    task: str
    subtasks: list
    results: Annotated[list, operator.add]  # Accumulate results
    final_output: str

class WorkerState(TypedDict):
    subtask: dict
    result: str

# Orchestrator: Plan subtasks
def orchestrator_plan(state: OrchestratorState) -> OrchestratorState:
    print("[Orchestrator] Planning subtasks...")
    
    messages = [HumanMessage(content=f"""Break down this task into 3-4 independent subtasks:
    
Task: {state['task']}

For each subtask provide:
- id (number)
- description
- expertise_needed

Format as JSON array.""")]
    
    response = llm.invoke(messages)
    content = response.content
    
    # Extract JSON
    if "```json" in content:
        content = content.split("```json")[1].split("```")[0]
    elif "```" in content:
        content = content.split("```")[1].split("```")[0]
    
    subtasks = json.loads(content.strip())
    
    print(f"[Orchestrator] Created {len(subtasks)} subtasks")
    for st in subtasks:
        print(f"  {st['id']}. {st['description']}")
    
    return {
        **state,
        "subtasks": subtasks,
        "results": []
    }

# Dynamic dispatch to workers
def dispatch_workers(state: OrchestratorState):
    """Send each subtask to a worker"""
    return [Send("worker", {"subtask": st}) for st in state["subtasks"]]

# Worker: Execute subtask
def worker_execute(state: WorkerState) -> dict:
    subtask = state["subtask"]
    print(f"[Worker {subtask['id']}] Executing...")
    
    messages = [HumanMessage(content=f"""You are a specialist in: {subtask['expertise_needed']}
    
Execute this subtask:
{subtask['description']}

Provide detailed, high-quality work.""")]
    
    response = llm.invoke(messages)
    
    return {
        "results": [{
            "id": subtask['id'],
            "description": subtask['description'],
            "result": response.content
        }]
    }

# Synthesizer: Combine results
def synthesize_results(state: OrchestratorState) -> OrchestratorState:
    print("[Orchestrator] Synthesizing results...")
    
    results_text = "\n\n".join([
        f"Subtask {r['id']}: {r['description']}\nResult:\n{r['result']}"
        for r in state['results']
    ])
    
    messages = [HumanMessage(content=f"""Combine these results into a cohesive response:
    
Original Task: {state['task']}

Results:
{results_text}

Create a unified, well-structured response.""")]
    
    response = llm.invoke(messages)
    
    return {
        **state,
        "final_output": response.content
    }

# Build orchestrator-worker graph
def create_orchestrator_graph():
    workflow = StateGraph(OrchestratorState)
    
    # Add nodes
    workflow.add_node("plan", orchestrator_plan)
    workflow.add_node("worker", worker_execute)
    workflow.add_node("synthesize", synthesize_results)
    
    # Edges
    workflow.add_edge(START, "plan")
    
    # Dynamic dispatch to workers
    workflow.add_conditional_edges("plan", dispatch_workers, ["worker"])
    
    # Workers -> synthesize
    workflow.add_edge("worker", "synthesize")
    workflow.add_edge("synthesize", END)
    
    return workflow.compile()

# Test orchestrator-worker
print("\n" + "=" * 60)
print("ORCHESTRATOR-WORKERS WITH DYNAMIC DISPATCH")
print("=" * 60 + "\n")

orchestrator_app = create_orchestrator_graph()

result = orchestrator_app.invoke({
    "task": """Create a comprehensive marketing strategy for a new AI productivity app 
    targeting remote teams. Include competitive analysis, pricing, and channels.""",
    "subtasks": [],
    "results": [],
    "final_output": ""
})

print("\n" + "=" * 60)
print("FINAL OUTPUT")
print("=" * 60)
print(result['final_output'])

---

## Part 5: Multi-Agent Collaboration (Supervisor Pattern)

Multiple specialized agents coordinated by a supervisor.

In [None]:
# Multi-agent state
class MultiAgentState(TypedDict):
    messages: Annotated[list, add_messages]
    next_agent: str
    final_report: str

# Agent nodes
def researcher_agent(state: MultiAgentState) -> MultiAgentState:
    print("[Researcher] Gathering information...")
    
    messages = [
        SystemMessage(content="You are a research specialist. Gather and organize information."),
        *state['messages']
    ]
    
    response = llm.invoke(messages)
    
    return {
        "messages": [AIMessage(content=f"[Researcher] {response.content}", name="researcher")]
    }

def writer_agent(state: MultiAgentState) -> MultiAgentState:
    print("[Writer] Drafting content...")
    
    messages = [
        SystemMessage(content="You are a professional writer. Create clear, engaging content."),
        *state['messages']
    ]
    
    response = llm.invoke(messages)
    
    return {
        "messages": [AIMessage(content=f"[Writer] {response.content}", name="writer")]
    }

def critic_agent(state: MultiAgentState) -> MultiAgentState:
    print("[Critic] Reviewing quality...")
    
    messages = [
        SystemMessage(content="You are a critical editor. Provide constructive feedback."),
        *state['messages']
    ]
    
    response = llm.invoke(messages)
    
    return {
        "messages": [AIMessage(content=f"[Critic] {response.content}", name="critic")]
    }

# Supervisor decides next agent
def supervisor(state: MultiAgentState) -> MultiAgentState:
    print("[Supervisor] Deciding next step...")
    
    messages = [
        SystemMessage(content="""You are a supervisor coordinating agents: researcher, writer, critic.
        
Based on the conversation, decide who should act next.
Respond with ONLY: researcher, writer, critic, or FINISH

Typical flow: researcher -> writer -> critic -> FINISH"""),
        *state['messages']
    ]
    
    response = llm.invoke(messages)
    next_agent = response.content.strip().lower()
    
    print(f"[Supervisor] Next: {next_agent}")
    
    return {
        "next_agent": next_agent
    }

def route_supervisor(state: MultiAgentState) -> str:
    """Route based on supervisor decision"""
    next_agent = state.get("next_agent", "").lower()
    
    if "researcher" in next_agent:
        return "researcher"
    elif "writer" in next_agent:
        return "writer"
    elif "critic" in next_agent:
        return "critic"
    else:
        return "finish"

def finalize(state: MultiAgentState) -> MultiAgentState:
    print("[Finalizing] Compilation complete")
    
    # Extract final content from messages
    final = "\n\n".join([m.content for m in state['messages'] if isinstance(m, AIMessage)])
    
    return {
        "final_report": final
    }

# Build multi-agent graph
def create_multiagent_graph():
    workflow = StateGraph(MultiAgentState)
    
    # Add agents
    workflow.add_node("supervisor", supervisor)
    workflow.add_node("researcher", researcher_agent)
    workflow.add_node("writer", writer_agent)
    workflow.add_node("critic", critic_agent)
    workflow.add_node("finish", finalize)
    
    # Start -> supervisor
    workflow.add_edge(START, "supervisor")
    
    # Supervisor routes to agents
    workflow.add_conditional_edges(
        "supervisor",
        route_supervisor,
        {
            "researcher": "researcher",
            "writer": "writer",
            "critic": "critic",
            "finish": "finish"
        }
    )
    
    # Agents back to supervisor
    workflow.add_edge("researcher", "supervisor")
    workflow.add_edge("writer", "supervisor")
    workflow.add_edge("critic", "supervisor")
    
    # Finish -> END
    workflow.add_edge("finish", END)
    
    return workflow.compile()

# Test multi-agent system
print("\n" + "=" * 60)
print("MULTI-AGENT COLLABORATION (Supervisor Pattern)")
print("=" * 60 + "\n")

multiagent_app = create_multiagent_graph()

result = multiagent_app.invoke({
    "messages": [HumanMessage(content="""Write a 300-word article about the benefits 
    of AI agents in enterprise software development.""")],
    "next_agent": "",
    "final_report": ""
})

print("\n" + "=" * 60)
print("FINAL REPORT")
print("=" * 60)
print(result['final_report'][:1000] + "...")

---

## Part 6: Human-in-the-Loop & Persistence

One of LangGraph's most powerful features.

In [None]:
from langgraph.checkpoint.memory import MemorySaver

# State with approval tracking
class ApprovalState(TypedDict):
    content: str
    needs_approval: bool
    approved: bool
    feedback: str

def generate_content(state: ApprovalState) -> ApprovalState:
    print("[Generator] Creating content...")
    
    messages = [HumanMessage(content="Write a professional email announcing a product delay.")]
    response = llm.invoke(messages)
    
    return {
        **state,
        "content": response.content,
        "needs_approval": True
    }

def approval_gate(state: ApprovalState) -> ApprovalState:
    """Pause here for human approval"""
    print("\n" + "=" * 60)
    print("APPROVAL REQUIRED")
    print("=" * 60)
    print(f"\nContent:\n{state['content']}\n")
    print("Waiting for approval...")
    print("(In production, this would interrupt and wait for human input)")
    
    # In real implementation, this would pause execution
    # User would resume with: app.invoke(None, config={"configurable": {"thread_id": "1"}})
    
    return state

def check_approval(state: ApprovalState) -> str:
    """Route based on approval status"""
    if state.get("approved", False):
        return "send"
    else:
        # In real implementation, would check user input
        # For demo, auto-approve
        return "send"

def send_email(state: ApprovalState) -> ApprovalState:
    print("\n[Sender] Email sent!")
    return state

# Build graph with checkpoints
def create_approval_graph():
    # Memory saver enables persistence
    memory = MemorySaver()
    
    workflow = StateGraph(ApprovalState)
    
    workflow.add_node("generate", generate_content)
    workflow.add_node("approve", approval_gate)
    workflow.add_node("send", send_email)
    
    workflow.add_edge(START, "generate")
    workflow.add_edge("generate", "approve")
    
    # Conditional: approved -> send, else -> regenerate
    workflow.add_conditional_edges(
        "approve",
        check_approval,
        {"send": "send"}
    )
    
    workflow.add_edge("send", END)
    
    # Compile with checkpointer
    return workflow.compile(checkpointer=memory)

# Test with persistence
print("\n" + "=" * 60)
print("HUMAN-IN-THE-LOOP WITH PERSISTENCE")
print("=" * 60)

approval_app = create_approval_graph()

# Execute with thread ID for persistence
config = {"configurable": {"thread_id": "demo-1"}}

result = approval_app.invoke({
    "content": "",
    "needs_approval": False,
    "approved": True,  # Auto-approve for demo
    "feedback": ""
}, config=config)

print("\nWorkflow completed with persistence support!")

---

## Part 7: Streaming & Real-time Output

Stream outputs as they're generated.

In [None]:
# Test streaming
print("\n" + "=" * 60)
print("STREAMING WORKFLOW EXECUTION")
print("=" * 60 + "\n")

# Use the chain graph from earlier
chain_app = create_chain_graph()

# Stream execution
for event in chain_app.stream({
    "product_description": "AI-powered noise-cancelling headphones with 40-hour battery",
    "key_features": "",
    "marketing_copy": "",
    "translation": "",
    "current_step": ""
}):
    # Each event shows the node that executed and its output
    for node_name, node_output in event.items():
        print(f"\n📍 Node: {node_name}")
        if "current_step" in node_output:
            print(f"   Step: {node_output['current_step']}")
        print("   Output updated")
        print("-" * 60)

print("\n✓ Streaming complete!")

---

## Summary: LangGraph vs Plain API Calls

### Advantages of LangGraph

| Feature | Plain API | LangGraph |
|---------|-----------|----------|
| **State Management** | Manual dict passing | Built-in StateGraph |
| **Conditional Flow** | If/else logic | Conditional edges |
| **Cycles/Loops** | Complex code | Native support |
| **Persistence** | Custom implementation | Built-in checkpointer |
| **Human-in-Loop** | Difficult | Native interrupt support |
| **Visualization** | None | Graph rendering |
| **Streaming** | Manual | Built-in streaming |
| **Debugging** | Print statements | Step-by-step inspection |

### When to Use LangGraph

**Use LangGraph when:**
- Complex workflows with conditional routing
- Need to support cycles/loops
- Human-in-the-loop required
- Long-running workflows needing persistence
- Multiple agents coordinating

**Use Plain API calls when:**
- Simple linear workflows
- Minimal state tracking
- No conditional logic
- Quick prototypes

### Key Concepts Recap

1. **StateGraph:** Workflow definition with typed state
2. **Nodes:** Functions that process and update state
3. **Edges:** Connections (sequential, conditional, dynamic)
4. **Checkpointer:** Persistence layer for interrupting/resuming
5. **Send API:** Dynamic worker dispatch
6. **Streaming:** Real-time output as nodes execute

---

## Production Best Practices

### 1. Error Handling

In [None]:
# Robust node with error handling
def robust_node(state: dict) -> dict:
    try:
        # Your node logic
        response = llm.invoke([HumanMessage(content=state['input'])])
        
        return {
            **state,
            "output": response.content,
            "error": None
        }
    
    except Exception as e:
        print(f"Error in node: {e}")
        
        return {
            **state,
            "output": "",
            "error": str(e)
        }

# Conditional routing based on errors
def check_for_errors(state: dict) -> str:
    if state.get("error"):
        return "error_handler"
    else:
        return "next_step"

### 2. Monitoring & Logging

In [None]:
import time
import logging

class MonitoredNode:
    """Wrapper for monitoring node execution"""
    
    def __init__(self, node_func, node_name: str):
        self.node_func = node_func
        self.node_name = node_name
        self.metrics = []
    
    def __call__(self, state):
        start_time = time.time()
        
        try:
            result = self.node_func(state)
            latency = time.time() - start_time
            
            self.metrics.append({
                "node": self.node_name,
                "latency": latency,
                "status": "success"
            })
            
            logging.info(f"{self.node_name} completed in {latency:.2f}s")
            
            return result
            
        except Exception as e:
            latency = time.time() - start_time
            
            self.metrics.append({
                "node": self.node_name,
                "latency": latency,
                "status": "error",
                "error": str(e)
            })
            
            logging.error(f"{self.node_name} failed after {latency:.2f}s: {e}")
            raise
    
    def get_metrics(self):
        return self.metrics

# Usage
# monitored_extract = MonitoredNode(extract_features, "extract")
# workflow.add_node("extract", monitored_extract)

---

## Interview Preparation

### Common Questions

**Q1: Why use LangGraph instead of plain API calls?**

**Answer:**
- **State management:** Built-in state tracking vs manual dict passing
- **Conditional routing:** Native conditional edges vs complex if/else
- **Cycles:** Supports loops natively (e.g., evaluator-optimizer)
- **Persistence:** Built-in checkpointing for long-running workflows
- **Human-in-loop:** Native interrupt support
- **Visualization:** Can render workflow graphs
- **Debugging:** Step-by-step execution inspection

**Q2: Explain StateGraph vs regular function composition?**

**Answer:**
- **StateGraph:** Declarative workflow definition with typed state, nodes, and edges. Supports conditional routing and cycles.
- **Function composition:** Imperative, linear flow. Complex logic becomes hard to maintain.
- **Example:** Multi-agent system with dynamic routing is simple in StateGraph but requires complex orchestration code otherwise.

**Q3: How do you handle errors in LangGraph workflows?**

**Answer:**
1. **Try-catch in nodes:** Catch exceptions, return error state
2. **Conditional routing:** Route to error handler based on error flag
3. **Retry logic:** Add retry nodes for transient failures
4. **Fallback paths:** Define alternative routes when primary fails
5. **Monitoring:** Wrap nodes with monitoring for tracking

**Q4: When would you use Send API for dynamic workers?**

**Answer:**
- **Orchestrator-worker pattern:** Dynamic number of subtasks
- **Parallel processing:** Process list items independently
- **Fan-out/fan-in:** Dispatch to multiple workers, aggregate results
- **Example:** Code review where number of files unknown upfront

**Q5: How do you implement human-in-the-loop?**

**Answer:**
1. Use **checkpointer** (MemorySaver or database)
2. Add **approval gate node** that requires human input
3. Execute with **thread_id** for persistence
4. Workflow **pauses** at gate
5. Human reviews, provides input
6. **Resume** workflow with updated state
7. Conditional routing based on approval

---

## Next Steps

This notebook covered LangGraph fundamentals and all major patterns. Next notebooks:

1. **LangChain Agents** - ReAct, tool calling, OpenAI functions
2. **Multi-Agent Systems** - CrewAI, AutoGen deep dive
3. **Research Agents** - Deep reasoning, synthesis, citation
4. **Agentic RAG** - Agent-enhanced retrieval patterns
5. **Framework Comparison** - When to use what

### Resources

- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- [LangGraph Tutorials](https://langchain-ai.github.io/langgraph/tutorials/)
- [LangGraph Multi-Agent](https://blog.langchain.com/langgraph-multi-agent-workflows/)
- [LangGraph GitHub](https://github.com/langchain-ai/langgraph)