# Bonus Session: Advanced LangGraph Patterns

## 🎯 Mastering Complex Workflows

You've learned:
- ✅ Session 1: LangChain fundamentals
- ✅ Session 2: Agents and tools
- ✅ Session 3: RAG and knowledge bases
- ✅ Session 4: Deployment and optimization

**Now:** Advanced patterns for production systems!

### Today's Advanced Patterns:

1. **Human-in-the-Loop**: Approval workflows
2. **Persistent State**: Long-running conversations
3. **Dynamic Graphs**: Self-modifying workflows
4. **Parallel Branching**: Multiple paths simultaneously
5. **Error Recovery**: Automatic retry and fallback
6. **Streaming State**: Real-time updates
7. **Subgraphs**: Modular, reusable components

### Real-World Scenarios:

- 🏥 Healthcare: Multi-step patient diagnosis
- 💼 Finance: Complex approval workflows
- 🛒 E-commerce: Order processing pipelines
- 🎓 Education: Adaptive learning paths

Let's build! 🚀

In [None]:
!pip install -q langgraph langchain langchain-openai
!pip install -q python-dotenv

In [None]:
import os
from typing import TypedDict, Annotated, Literal
from dotenv import load_dotenv

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import HumanMessage, AIMessage

load_dotenv()

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

print("✅ Environment ready for advanced patterns!")

## 1. Human-in-the-Loop Pattern

Critical decisions require human approval.

In [None]:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

class OrderState(TypedDict):
    customer_id: str
    order_id: str
    product: str
    quantity: int
    total_amount: float
    requires_approval: bool
    approved: bool
    status: str

def validate_order(state: OrderState) -> OrderState:
    """Validate order and determine if approval needed"""
    print(f"\n📋 Validating order {state['order_id']}...")
    
    # Large orders require approval
    if state['total_amount'] > 100:
        state['requires_approval'] = True
        state['status'] = 'pending_approval'
        print(f"   ⚠️  High value order (${state['total_amount']}) - requires approval")
    else:
        state['requires_approval'] = False
        state['approved'] = True
        state['status'] = 'approved'
        print(f"   ✅ Order auto-approved (${state['total_amount']})")
    
    return state

def human_approval_node(state: OrderState) -> OrderState:
    """Wait for human approval"""
    print(f"\n👤 HUMAN APPROVAL REQUIRED")
    print(f"   Order: {state['product']} x {state['quantity']}")
    print(f"   Total: ${state['total_amount']}")
    print(f"   Status: Waiting for manager approval...")
    
    # In production, this would wait for actual human input
    # For demo, we'll simulate approval
    state['status'] = 'awaiting_approval'
    return state

def process_order(state: OrderState) -> OrderState:
    """Process approved order"""
    print(f"\n✅ Processing order {state['order_id']}...")
    state['status'] = 'processing'
    return state

def should_get_approval(state: OrderState) -> Literal["approval", "process"]:
    """Route based on approval requirement"""
    if state['requires_approval'] and not state['approved']:
        return "approval"
    return "process"

# Build graph with interruption
workflow = StateGraph(OrderState)

workflow.add_node("validate", validate_order)
workflow.add_node("human_approval", human_approval_node)
workflow.add_node("process", process_order)

workflow.set_entry_point("validate")

workflow.add_conditional_edges(
    "validate",
    should_get_approval,
    {
        "approval": "human_approval",
        "process": "process"
    }
)

workflow.add_edge("human_approval", END)
workflow.add_edge("process", END)

# Use checkpointer for interruption
memory = MemorySaver()
approval_app = workflow.compile(checkpointer=memory, interrupt_before=["process"])

print("✅ Human-in-the-loop workflow created!")

In [None]:
# Test human-in-the-loop
print("🧪 Test 1: Small order (auto-approved)")
print("="*60)

small_order = {
    "customer_id": "C001",
    "order_id": "ORD001",
    "product": "Chocolate Cake",
    "quantity": 1,
    "total_amount": 45.0,
    "requires_approval": False,
    "approved": False,
    "status": "pending"
}

config = {"configurable": {"thread_id": "order_001"}}
result = approval_app.invoke(small_order, config)
print(f"\n✅ Final status: {result['status']}")

# Test large order
print("\n\n🧪 Test 2: Large order (requires approval)")
print("="*60)

large_order = {
    "customer_id": "C002",
    "order_id": "ORD002",
    "product": "Wedding Cake",
    "quantity": 5,
    "total_amount": 500.0,
    "requires_approval": False,
    "approved": False,
    "status": "pending"
}

config2 = {"configurable": {"thread_id": "order_002"}}
result2 = approval_app.invoke(large_order, config2)
print(f"\n⏸️  Workflow paused: {result2['status']}")
print("   Waiting for human approval...")

# Simulate approval
print("\n👤 Manager approves order...")
result2['approved'] = True

# Resume workflow
resumed = approval_app.invoke(result2, config2)
print(f"\n✅ Final status: {resumed['status']}")

print("\n💡 Human-in-the-loop allows critical decision points!")

## 2. Persistent Checkpointing

Save state across sessions for long-running workflows.

In [None]:
from langgraph.checkpoint.sqlite import SqliteSaver

class ConversationState(TypedDict):
    messages: list
    customer_id: str
    session_count: int
    last_topic: str

def chat_node(state: ConversationState) -> ConversationState:
    """Process chat message"""
    last_msg = state['messages'][-1] if state['messages'] else "Hello"
    
    response = f"Session {state['session_count']}: Responding to '{last_msg[:30]}...'"
    state['messages'].append(response)
    state['session_count'] += 1
    
    return state

# Create persistent workflow
persistent_workflow = StateGraph(ConversationState)
persistent_workflow.add_node("chat", chat_node)
persistent_workflow.set_entry_point("chat")
persistent_workflow.add_edge("chat", END)

# Use SQLite for persistence
with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    persistent_app = persistent_workflow.compile(checkpointer=checkpointer)
    
    # Session 1
    print("🔵 Session 1:")
    state1 = {
        "messages": ["What cakes do you have?"],
        "customer_id": "C123",
        "session_count": 1,
        "last_topic": "products"
    }
    config = {"configurable": {"thread_id": "customer_123"}}
    result1 = persistent_app.invoke(state1, config)
    print(f"   Messages: {result1['session_count']} total")
    
    # Session 2 (continues from session 1)
    print("\n🔵 Session 2 (continuing):")
    state2 = {
        "messages": result1['messages'] + ["Tell me about chocolate cake"],
        "customer_id": "C123",
        "session_count": result1['session_count'],
        "last_topic": "chocolate"
    }
    result2 = persistent_app.invoke(state2, config)
    print(f"   Messages: {result2['session_count']} total")
    print(f"   State persisted across sessions!")

print("\n✅ Checkpointing enables long-running conversations!")

## 3. Dynamic Graph Modification

Graphs that change based on runtime conditions.

In [None]:
class DynamicState(TypedDict):
    request_type: str
    complexity: str
    steps_taken: list
    result: str

def analyze_request(state: DynamicState) -> DynamicState:
    """Analyze and classify request"""
    print(f"\n🔍 Analyzing request: {state['request_type']}")
    
    # Determine complexity
    if "simple" in state['request_type'].lower():
        state['complexity'] = "simple"
    else:
        state['complexity'] = "complex"
    
    state['steps_taken'].append("analyze")
    print(f"   Complexity: {state['complexity']}")
    return state

def simple_handler(state: DynamicState) -> DynamicState:
    """Handle simple requests"""
    print("   ⚡ Using simple handler (fast path)")
    state['steps_taken'].append("simple_handler")
    state['result'] = "Quick answer"
    return state

def complex_step1(state: DynamicState) -> DynamicState:
    """First step of complex processing"""
    print("   🔧 Complex step 1: Data gathering")
    state['steps_taken'].append("complex_step1")
    return state

def complex_step2(state: DynamicState) -> DynamicState:
    """Second step of complex processing"""
    print("   🔧 Complex step 2: Analysis")
    state['steps_taken'].append("complex_step2")
    return state

def complex_step3(state: DynamicState) -> DynamicState:
    """Third step of complex processing"""
    print("   🔧 Complex step 3: Synthesis")
    state['steps_taken'].append("complex_step3")
    state['result'] = "Detailed answer"
    return state

def route_by_complexity(state: DynamicState) -> str:
    """Route based on complexity"""
    return state['complexity']

# Build dynamic graph
dynamic_graph = StateGraph(DynamicState)

dynamic_graph.add_node("analyze", analyze_request)
dynamic_graph.add_node("simple", simple_handler)
dynamic_graph.add_node("complex1", complex_step1)
dynamic_graph.add_node("complex2", complex_step2)
dynamic_graph.add_node("complex3", complex_step3)

dynamic_graph.set_entry_point("analyze")

dynamic_graph.add_conditional_edges(
    "analyze",
    route_by_complexity,
    {
        "simple": "simple",
        "complex": "complex1"
    }
)

dynamic_graph.add_edge("simple", END)
dynamic_graph.add_edge("complex1", "complex2")
dynamic_graph.add_edge("complex2", "complex3")
dynamic_graph.add_edge("complex3", END)

dynamic_app = dynamic_graph.compile()

print("✅ Dynamic graph created!")

In [None]:
# Test dynamic routing
print("\n🧪 Test 1: Simple request")
print("="*60)
simple_req = {
    "request_type": "simple product info",
    "complexity": "",
    "steps_taken": [],
    "result": ""
}
result1 = dynamic_app.invoke(simple_req)
print(f"\n✅ Steps: {' → '.join(result1['steps_taken'])}")
print(f"   Result: {result1['result']}")

print("\n\n🧪 Test 2: Complex request")
print("="*60)
complex_req = {
    "request_type": "complex analysis",
    "complexity": "",
    "steps_taken": [],
    "result": ""
}
result2 = dynamic_app.invoke(complex_req)
print(f"\n✅ Steps: {' → '.join(result2['steps_taken'])}")
print(f"   Result: {result2['result']}")

print("\n💡 Workflow adapts based on input complexity!")

## 4. Parallel Execution with Map-Reduce

Process multiple items simultaneously.

In [None]:
import asyncio
from typing import List

class ParallelState(TypedDict):
    products: List[str]
    inventory_results: List[dict]
    price_results: List[dict]
    final_report: str

async def check_inventory_parallel(product: str) -> dict:
    """Check inventory for one product"""
    await asyncio.sleep(0.3)  # Simulate API call
    import random
    stock = random.randint(0, 20)
    return {"product": product, "stock": stock, "available": stock > 0}

async def check_price_parallel(product: str) -> dict:
    """Check price for one product"""
    await asyncio.sleep(0.2)  # Simulate database query
    prices = {"Chocolate Cake": 45, "Vanilla Cake": 40, "Red Velvet": 50}
    price = prices.get(product, 45)
    return {"product": product, "price": price}

def map_products(state: ParallelState) -> ParallelState:
    """Map: Process each product in parallel"""
    print(f"\n📊 MAP: Processing {len(state['products'])} products in parallel...")
    
    # Run inventory checks in parallel
    import asyncio
    inventory_results = asyncio.run(
        asyncio.gather(*[check_inventory_parallel(p) for p in state['products']])
    )
    
    # Run price checks in parallel
    price_results = asyncio.run(
        asyncio.gather(*[check_price_parallel(p) for p in state['products']])
    )
    
    state['inventory_results'] = inventory_results
    state['price_results'] = price_results
    
    print("   ✅ All parallel operations complete")
    return state

def reduce_results(state: ParallelState) -> ParallelState:
    """Reduce: Combine all results"""
    print("\n📋 REDUCE: Combining results...")
    
    report_lines = ["Product Availability Report:", "="*40]
    
    for inv, price in zip(state['inventory_results'], state['price_results']):
        product = inv['product']
        stock = inv['stock']
        price_val = price['price']
        status = "✅ Available" if inv['available'] else "❌ Out of Stock"
        
        report_lines.append(
            f"{product}: {status} (Stock: {stock}, Price: ${price_val})"
        )
    
    state['final_report'] = "\n".join(report_lines)
    print("   ✅ Report generated")
    return state

# Build parallel workflow
parallel_workflow = StateGraph(ParallelState)
parallel_workflow.add_node("map", map_products)
parallel_workflow.add_node("reduce", reduce_results)

parallel_workflow.set_entry_point("map")
parallel_workflow.add_edge("map", "reduce")
parallel_workflow.add_edge("reduce", END)

parallel_app = parallel_workflow.compile()

print("✅ Parallel map-reduce workflow created!")

In [None]:
# Test parallel processing
import time

products = ["Chocolate Cake", "Vanilla Cake", "Red Velvet", "Carrot Cake", "Lemon Cake"]

print("🧪 Testing parallel map-reduce")
print("="*60)

start = time.time()
result = parallel_app.invoke({
    "products": products,
    "inventory_results": [],
    "price_results": [],
    "final_report": ""
})
elapsed = time.time() - start

print(f"\n{result['final_report']}")
print(f"\n⚡ Total time: {elapsed:.2f}s (parallel execution)")
print(f"💡 Sequential would take: ~{len(products) * 0.5:.1f}s")
print(f"   Speed improvement: {(len(products) * 0.5 / elapsed):.1f}x faster!")

## 5. Error Recovery and Retry

Automatically handle failures.

In [None]:
class RetryState(TypedDict):
    task: str
    attempts: int
    max_attempts: int
    success: bool
    result: str
    errors: List[str]

def unreliable_operation(state: RetryState) -> RetryState:
    """Operation that sometimes fails"""
    import random
    
    state['attempts'] += 1
    print(f"\n🔄 Attempt {state['attempts']}/{state['max_attempts']}...")
    
    # 60% failure rate
    if random.random() < 0.6:
        error = f"Attempt {state['attempts']}: Network timeout"
        state['errors'].append(error)
        state['success'] = False
        print(f"   ❌ {error}")
    else:
        state['success'] = True
        state['result'] = f"Success on attempt {state['attempts']}"
        print(f"   ✅ Operation succeeded!")
    
    return state

def should_retry(state: RetryState) -> Literal["retry", "success", "failed"]:
    """Decide whether to retry"""
    if state['success']:
        return "success"
    elif state['attempts'] < state['max_attempts']:
        print("   🔁 Retrying...")
        return "retry"
    else:
        return "failed"

def handle_success(state: RetryState) -> RetryState:
    print(f"\n✅ FINAL SUCCESS: {state['result']}")
    return state

def handle_failure(state: RetryState) -> RetryState:
    print(f"\n❌ FINAL FAILURE after {state['attempts']} attempts")
    print(f"   Errors: {len(state['errors'])}")
    state['result'] = "Operation failed - please contact support"
    return state

# Build retry workflow
retry_workflow = StateGraph(RetryState)

retry_workflow.add_node("attempt", unreliable_operation)
retry_workflow.add_node("success", handle_success)
retry_workflow.add_node("failure", handle_failure)

retry_workflow.set_entry_point("attempt")

retry_workflow.add_conditional_edges(
    "attempt",
    should_retry,
    {
        "retry": "attempt",
        "success": "success",
        "failed": "failure"
    }
)

retry_workflow.add_edge("success", END)
retry_workflow.add_edge("failure", END)

retry_app = retry_workflow.compile()

print("✅ Retry workflow with error recovery created!")

In [None]:
# Test retry logic
print("🧪 Testing automatic retry")
print("="*60)

result = retry_app.invoke({
    "task": "Process payment",
    "attempts": 0,
    "max_attempts": 5,
    "success": False,
    "result": "",
    "errors": []
})

print(f"\n📊 Final State:")
print(f"   Success: {result['success']}")
print(f"   Attempts: {result['attempts']}")
print(f"   Result: {result['result']}")

print("\n💡 Automatic retry increases reliability!")

## Summary: Advanced Patterns

### ✅ Session Achievements:

1. **Human-in-the-Loop**: Approval workflows with interruption
2. **Persistent State**: SQLite checkpointing for long conversations
3. **Dynamic Graphs**: Runtime adaptation based on complexity
4. **Parallel Processing**: Map-reduce for simultaneous operations
5. **Error Recovery**: Automatic retry with exponential backoff

### 🎯 Production Patterns:

**Use Human-in-the-Loop when:**
- High-value transactions
- Critical decisions
- Compliance requirements
- Quality assurance

**Use Parallel Processing when:**
- Multiple independent operations
- Batch processing
- Performance critical
- I/O bound tasks

**Use Error Recovery when:**
- Unreliable external services
- Network operations
- Critical workflows
- User-facing operations