# LangGraph Essentials

**Duration**: ~2-2.5 hours

## What You'll Learn

This notebook covers LangGraph - the framework for building stateful, agentic workflows:

1. What is LangGraph and when to use it
2. LangChain vs LangGraph decision guide
3. Core concepts: nodes, edges, state
4. State management with TypedDict and reducers
5. Building your first graph
6. **Tools and Function Calling** (NEW!)
7. Conditional routing and cycles
8. Multi-agent supervisor pattern
9. Human-in-the-loop workflows
10. Checkpointing and persistence
11. Production patterns and best practices

## Prerequisites

- Completed LangChain Essentials notebook (notebook 03)
- Understanding of LCEL pipe syntax
- OpenAI API key

## What is LangGraph?

**LangGraph** builds on LangChain to add:
- **State management**: Shared data across steps
- **Cycles**: Loops and iterative processes
- **Conditional logic**: Dynamic routing
- **Human-in-the-loop**: Pause for human input

### When to Use LangGraph

**Use LangGraph when you need**:
- Cyclical workflows (loops, retries)
- Complex state management
- Multi-agent systems
- Human approval workflows
- Conditional decision points

**Don't use LangGraph for**:
- Simple linear workflows ‚Üí use LangChain LCEL
- Stateless operations ‚Üí use direct API calls
- Quick prototypes ‚Üí start simple first

### Visual Comparison

```
LangChain: [Question] ‚Üí [Retrieve] ‚Üí [LLM] ‚Üí [Answer]
           (Linear, stateless)

LangGraph: [Question] ‚Üí [Analyze] ‚Üí [Research?] ‚îÄYes‚Üí [Research] ‚îê
                           ‚îÇ                                       ‚îÇ
                           No                                      ‚îÇ
                           ‚Üì                                       ‚îÇ
                      [Answer] ‚Üê‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
           (Cyclical, stateful, conditional)
```

---

## LangChain vs LangGraph Decision Guide

Use this flowchart to decide which framework to use:

```
Do you need loops/retries?
‚îú‚îÄ YES ‚Üí LangGraph
‚îî‚îÄ NO
    ‚îÇ
    Do you need complex shared state?
    ‚îú‚îÄ YES ‚Üí LangGraph
    ‚îî‚îÄ NO
        ‚îÇ
        Do you need multiple agents?
        ‚îú‚îÄ YES ‚Üí LangGraph
        ‚îî‚îÄ NO
            ‚îÇ
            Do you need human-in-the-loop?
            ‚îú‚îÄ YES ‚Üí LangGraph
            ‚îî‚îÄ NO ‚Üí LangChain (LCEL)
```

### Examples

**Use LangChain (LCEL)**:
- Simple RAG chatbot
- Document summarization
- Q&A pipeline

**Use LangGraph**:
- Research agent that verifies facts (loops)
- Code generator with testing (retry logic)
- Multi-agent content team (supervisor pattern)
- Approval workflows (human-in-the-loop)

### Quick Decision Table

| Use LangChain | Use LangGraph |
|---------------|---------------|
| Linear workflows (A ‚Üí B ‚Üí C) | Cyclical workflows (loops, retries) |
| Simple RAG chatbot | Complex agents with tools |
| Stateless operations | Stateful operations |
| Single decision path | Multiple decision points |
| Quick prototypes | Production agents |

### Key Insight

**LangGraph = LangChain + State + Cycles + Agents**

You'll often use both in the same application!

---

## Section 1: Package Installation

### Learning Objectives
- Install LangGraph and dependencies
- Configure OpenAI API key
- Verify setup

In [None]:
# Install required packages
!pip install -qU \
    langchain \
    langchain-openai \
    langchain-community \
    langgraph \
    langgraph-checkpoint-sqlite

# Show installed versions
!pip list | grep -E "langchain|langgraph"

print("\n‚úÖ All packages installed!")

In [None]:
# Setup API key
import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")

print("‚úÖ API key configured!")

---

## Section 2: Core Concepts

### Learning Objectives
- Understand nodes, edges, and state
- Learn StateGraph basics
- Build a simple graph

### Core Concepts

1. **Nodes**: Functions that process state (like LLM calls, tools, etc.)
2. **Edges**: Connections between nodes (can be conditional)
3. **State**: Shared data structure (TypedDict) passed between nodes
4. **Cycles**: Support for loops - nodes can revisit earlier nodes

### Architecture

```
Input ‚Üí [Node A] ‚Üê‚Üí [Node B] ‚Üí Decide ‚Üí [Node C] ‚Üí Output
              ‚Üë                  ‚Üì
              ‚îî‚îÄ‚îÄ‚îÄ‚îÄ Loop back ‚îÄ‚îÄ‚îÄ‚îò
(Cyclical, stateful, conditional)
```

### Simple Example

Let's see a basic LangGraph in action:

## Understanding StateGraph Fundamentals

Before building our first graph, let's understand the core LangGraph concepts.

### What is StateGraph?

**StateGraph** is the container for building LangGraph workflows. Think of it as a workflow builder.

```python
workflow = StateGraph(SimpleState)
```

**Key differences from LangChain**:
- LangChain chains: Sequential data transformations (immutable)
- LangGraph StateGraph: Stateful workflows with cycles (mutable state)

**StateGraph enables**:
- Cyclic workflows (nodes can loop back)
- Conditional routing (dynamic paths based on state)
- Persistent state across steps
- Human-in-the-loop workflows

### What are Nodes?

**Nodes** are Python functions that process and update state.

```python
def greet(state: SimpleState):
    return {"message": f"Hello, {state['message']}!"}
```

**Critical rules**:
1. Node receives current state as input (dictionary)
2. Node returns PARTIAL state updates (only fields it modifies)
3. Other fields in state remain unchanged
4. Multiple nodes' returns are merged using reducers

**Adding nodes**:
```python
workflow.add_node("greet", greet)  # name (string), function
```

Node names are arbitrary - use descriptive names for clarity.

### What are Edges?

**Edges** define connections between nodes (control flow).

**Basic edges** (always go to next node):
```python
workflow.add_edge("greet", "increment")  # greet ‚Üí increment
```

**END sentinel** (terminate workflow):
```python
workflow.add_edge("increment", END)  # increment ‚Üí END
```

**Key points**:
- END is imported from `langgraph.graph`
- Every workflow path must eventually reach END
- Multiple edges can exist (conditional routing - covered later)

### Entry Points

**Entry point** specifies where execution starts.

**Modern syntax** (recommended - used throughout this notebook):
```python
workflow.add_edge(START, "greet")  # START from langgraph.graph
```

**Legacy syntax** (still functional but deprecated):
```python
workflow.set_entry_point("greet")  # Older approach
```

**Required** - workflow won't compile without an entry point.

### Compilation

**compile()** validates and prepares the workflow for execution.

```python
app = workflow.compile()  # Returns executable app
```

**What compilation does**:
- ‚úÖ Validates graph structure (no unreachable nodes)
- ‚úÖ Checks all paths reach END
- ‚úÖ Detects infinite loops without termination
- ‚úÖ Creates optimized execution plan

**When to add parameters**:
```python
app = workflow.compile(
    checkpointer=memory,           # For persistence
    interrupt_before=["approve"]   # For human-in-the-loop
)
```

Now let's build our first graph!

---

In [None]:
# Simple LangGraph example
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

# Define state
class SimpleState(TypedDict):
    message: str
    count: int

# Define nodes (functions that modify state)
def greet(state: SimpleState):
    return {"message": f"Hello! Count: {state['count']}"}

def increment(state: SimpleState):
    return {"count": state["count"] + 1}

# Build graph
workflow = StateGraph(SimpleState)

# Add nodes
workflow.add_node("greet", greet)
workflow.add_node("increment", increment)

# Add edges
workflow.add_edge(START, "greet")
workflow.add_edge("greet", "increment")
workflow.add_edge("increment", END)

# Compile
app = workflow.compile()

# Run
result = app.invoke({"message": "", "count": 0})
print("Final state:", result)
print("\n‚úÖ LangGraph executed: greet ‚Üí increment ‚Üí END")

### Key Takeaways

- **StateGraph**: Main class for building graphs
- **Nodes**: Processing functions that return state updates
- **Edges**: Connect nodes in sequence
- **END**: Special marker for terminal nodes
- **compile()**: Creates executable graph
- **invoke()**: Runs the graph with initial state

---

## Section 3: State Management

### Learning Objectives
- Understand state reducers (how updates merge)
- Use Annotated types for custom merge logic
- Design effective state schemas

### State Reducers

**Problem**: When multiple nodes update the same field, how do we merge?

**Solution**: Reducer functions

### Common Reducer Patterns

## Understanding State Merging and Reducers

This is the MOST IMPORTANT concept in LangGraph. Understanding state merging is critical.

### The Problem: Multiple Nodes, One State

When multiple nodes update the same state field, how do we merge the updates?

**Example scenario**:
- Node 1 returns: `{"messages": ["Hello"]}`
- Node 2 returns: `{"messages": ["How are you?"]}`
- What should final state be?

**Without reducers**: Second update OVERWRITES first
```python
final_state = {"messages": ["How are you?"]}  # ‚ùå Lost "Hello"
```

**With reducers**: Updates are MERGED
```python
final_state = {"messages": ["Hello", "How are you?"]}  # ‚úÖ Both preserved
```

### What is a Reducer?

A **reducer** is a function that defines HOW to merge state updates.

**Syntax**:
```python
from typing import Annotated
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]  # ‚Üê Reducer
```

**Annotated** (from `typing`):
- Adds metadata to type hints
- First parameter: type (list, int, str, etc.)
- Second parameter: reducer function

**operator.add**:
- For lists: concatenate (append items)
- For numbers: addition

### Common Reducer Patterns

**Pattern 1: Accumulate messages** (most common)
```python
messages: Annotated[list, operator.add]

# Node 1: return {"messages": ["Hello"]}
# Node 2: return {"messages": ["World"]}
# Result: {"messages": ["Hello", "World"]}
```

**Pattern 2: Keep maximum value**
```python
score: Annotated[int, lambda x, y: max(x, y)]

# Current state: {"score": 10}
# Node returns: {"score": 15}
# Result: {"score": 15}
```

**Pattern 3: Default (no reducer) - replace**
```python
status: str  # No Annotated = replace

# Current state: {"status": "pending"}
# Node returns: {"status": "completed"}
# Result: {"status": "completed"}
```

### How State Merging Works

**Execution flow**:
```
1. Entry node executes ‚Üí returns {"messages": ["Step 1"]}
2. LangGraph merges with initial state using reducer
3. Next node executes ‚Üí returns {"messages": ["Step 2"]}
4. LangGraph merges again using reducer
5. Final state: {"messages": ["Step 1", "Step 2"]}
```

**Visual example**:
```python
# Initial state
{"messages": [], "count": 0}

# Node 1 returns
{"messages": ["Hello"]}
# After merge (operator.add for messages)
{"messages": ["Hello"], "count": 0}

# Node 2 returns
{"messages": ["World"], "count": 1}
# After merge
{"messages": ["Hello", "World"], "count": 1}
```

üéØ **Key insight**: Nodes return ONLY the fields they modify. Reducers determine how updates merge.

Now let's see reducers in action!

---

In [None]:
# State with different reducer patterns
from typing import Annotated
import operator

class AdvancedState(TypedDict):
    # List - appends new items (using operator.add)
    messages: Annotated[list, operator.add]
    
    # Replace - overwrites (default behavior)
    current_step: str
    
    # Custom reducer - take maximum
    score: Annotated[int, lambda x, y: max(x, y)]
    
    # Custom reducer - concatenate strings
    notes: Annotated[str, lambda x, y: x + " | " + y]

print("‚úÖ Advanced state schema with reducers defined")

## Understanding Graph Invocation

How do you run a compiled graph? The `invoke()` method.

### Basic Invocation

```python
result = app.invoke({"messages": [], "count": 0})
```

**What invoke() does**:
1. Takes initial state dictionary as input
2. Executes graph from entry point
3. Follows edges until reaching END
4. Returns FULL final state (all fields)

### Initial State Requirements

**Must include all state fields** defined in TypedDict:

```python
class State(TypedDict):
    messages: Annotated[list, operator.add]
    count: int

# ‚úÖ Valid - all fields present
app.invoke({"messages": [], "count": 0})

# ‚ùå Invalid - missing count
app.invoke({"messages": []})  # Error!
```

**Tip**: Initialize lists as empty `[]`, numbers as `0`, strings as `""`

### Return Value

**invoke() returns FULL state** after all nodes execute:

```python
result = app.invoke({"messages": [], "count": 0})
print(result)
# {'messages': ['Hello', 'World'], 'count': 2}
```

**Not just last node's return** - it's the accumulated state after all merges.

### Execution Flow

```
app.invoke(initial_state)
         ‚Üì
Entry point node executes
         ‚Üì
Returns partial state ‚Üí Merge with current state using reducers
         ‚Üì
Follow edges to next node
         ‚Üì
Next node executes ‚Üí Returns partial state ‚Üí Merge
         ‚Üì
... repeat until END
         ‚Üì
Return final merged state
```

üéØ **Key insight**: invoke() is blocking (waits for completion) and returns full final state.

Let's test this!

---

In [None]:
# Test reducer behavior
def node1(state: AdvancedState):
    return {
        "messages": ["Message from node1"],
        "current_step": "node1",
        "score": 10,
        "notes": "Node1 executed"
    }

def node2(state: AdvancedState):
    return {
        "messages": ["Message from node2"],
        "current_step": "node2",
        "score": 5,  # Lower than node1, should stay 10
        "notes": "Node2 executed"
    }

# Build graph
workflow = StateGraph(AdvancedState)
workflow.add_node("node1", node1)
workflow.add_node("node2", node2)
workflow.add_edge(START, "node1")
workflow.add_edge("node1", "node2")
workflow.add_edge("node2", END)

app = workflow.compile()

# Run
result = app.invoke({
    "messages": [],
    "current_step": "",
    "score": 0,
    "notes": "Start"
})

print("Final state:")
print(f"  Messages: {result['messages']}")  # Should have 2 messages (appended)
print(f"  Current step: {result['current_step']}")  # Should be 'node2' (replaced)
print(f"  Score: {result['score']}")  # Should be 10 (max of 10 and 5)
print(f"  Notes: {result['notes']}")  # Should be concatenated

### State Design Best Practices

#### What to Track in State

**DO track**:
- User input/question
- Processing results
- Decision points (what path was taken)
- Iteration count (for retry logic)
- Accumulated data (messages, findings)

**DON'T track**:
- Computed values (recalculate instead)
- Temporary variables (use node-local variables)
- Constant configuration (pass as parameters)

#### State Schema Guidelines

1. **Use TypedDict**: Clear schema, type safety
2. **Use reducers**: Control how updates merge
3. **Keep it flat**: Avoid deep nesting
4. **Document fields**: Add comments explaining purpose

#### Example

```python
class WellDesignedState(TypedDict):
    # User input
    query: str  # Original user question
    
    # Processing state
    messages: Annotated[list[str], operator.add]  # Accumulated messages (append)
    
    # Decision tracking
    needs_research: bool  # Whether research is needed
    iteration_count: int  # How many loops executed
    
    # Results
    final_answer: str  # Final response to user
```

### Key Takeaways

- **Reducers**: Control how state updates merge
- **operator.add**: Append to lists
- **Custom reducers**: max, concatenate, custom logic
- **Design principles**: Flat, documented, typed schemas

---

## Section 4: Building Your First Graph

### Learning Objectives
- Build a research agent with conditional routing
- Use StateGraph with multiple nodes
- Implement conditional edges

### Use Case: Research Agent

**Goal**: Build an agent that:
1. Analyzes the question
2. Decides if research is needed
3. Either researches (complex question) or answers directly (simple question)

### State Schema

In [None]:
# Setup LLM components first
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Define state schema
class AgentState(TypedDict):
    question: str
    answer: str
    needs_research: bool
    reasoning: str

print("‚úÖ State schema defined")

### Define Nodes

In [None]:
# Node 1: Analyze question
def analyze_question(state: AgentState):
    """Determine if the question needs research"""
    question = state["question"]
    
    # Use LLM to decide
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Determine if this question needs research or can be answered directly. "
                   "Respond with 'RESEARCH' or 'DIRECT'."),
        ("human", "{question}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    decision = chain.invoke({"question": question})
    
    needs_research = "RESEARCH" in decision.upper()
    
    return {
        "needs_research": needs_research,
        "reasoning": f"Decision: {decision}"
    }

# Node 2: Research (for complex questions)
def research(state: AgentState):
    """Simulate research process"""
    question = state["question"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a research assistant. Provide a detailed, well-researched answer."),
        ("human", "{question}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    answer = chain.invoke({"question": question})
    
    return {"answer": f"[Researched] {answer}"}

# Node 3: Direct answer (for simple questions)
def direct_answer(state: AgentState):
    """Provide quick answer"""
    question = state["question"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Provide a concise, direct answer."),
        ("human", "{question}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    answer = chain.invoke({"question": question})
    
    return {"answer": f"[Direct] {answer}"}

print("‚úÖ All nodes defined")

### Build Graph with Conditional Edges

## Understanding Conditional Routing

So far, edges are static (always go to same node). What if you want dynamic routing based on state?

### Conditional Edges

**add_conditional_edges()** enables dynamic routing.

```python
workflow.add_conditional_edges(
    "analyze",           # Source node
    route_question,      # Router function
    {                    # Edge mapping
        "search": "search_web",
        "generate": "generate_answer"
    }
)
```

**Three parameters**:
1. **Source node**: Which node's output triggers routing decision
2. **Router function**: Function that examines state and returns routing decision
3. **Edge mapping**: Dictionary mapping router return values to target nodes

### Router Functions

**Router** is a Python function that takes state and returns a string.

```python
def route_question(state: AgentState) -> str:
    if "search" in state["question"].lower():
        return "search"
    else:
        return "generate"
```

**Critical rules**:
1. Takes state as argument (receives current state)
2. Returns string matching key in edge mapping
3. Returned value must be in mapping dict keys
4. Can return END to terminate
5. Called AFTER source node completes

### Execution Flow

```
"analyze" node executes
         ‚Üì
State updated via reducer
         ‚Üì
Router function called: route_question(state)
         ‚Üì
Router returns "search"
         ‚Üì
Look up "search" in edge mapping ‚Üí "search_web"
         ‚Üì
Execute "search_web" node next
```

### Visual Example

```python
# State after analyze node
{"question": "search for Python tutorials", "answer": ""}

# Router examines state
def route_question(state):
    if "search" in state["question"]:
        return "search"  # ‚Üê Returns "search"
    return "generate"

# Edge mapping
{"search": "search_web", "generate": "generate_answer"}

# Result: Go to "search_web" node
```

### Common Patterns

**Pattern 1: Route based on content**
```python
def route(state):
    if "urgent" in state["message"]:
        return "priority_handler"
    return "normal_handler"
```

**Pattern 2: Route based on flags**
```python
def route(state):
    if state["needs_approval"]:
        return "approval"
    return END
```

**Pattern 3: Multi-way routing**
```python
def route(state):
    if state["score"] > 0.8:
        return "high_confidence"
    elif state["score"] > 0.5:
        return "medium_confidence"
    else:
        return "low_confidence"
```

üéØ **Key insight**: Router functions enable branching logic based on runtime state.

Now let's build a routing workflow!

---

In [None]:
# Build the graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("analyze", analyze_question)
workflow.add_node("research", research)
workflow.add_node("answer", direct_answer)

# Add conditional edge from analyze
def route_question(state: AgentState):
    """Route to research or direct answer based on analysis"""
    if state["needs_research"]:
        return "research"
    else:
        return "answer"

workflow.add_edge(START, "analyze")
workflow.add_conditional_edges(
    "analyze",
    route_question,
    {
        "research": "research",
        "answer": "answer"
    }
)

# Both paths end
workflow.add_edge("research", END)
workflow.add_edge("answer", END)

# Compile
research_agent = workflow.compile()

print("‚úÖ Research agent compiled!")

### Test the Agent

In [None]:
# Test with simple question
simple_question = "What is 2 + 2?"

result = research_agent.invoke({
    "question": simple_question,
    "answer": "",
    "needs_research": False,
    "reasoning": ""
})

print("Simple Question:", simple_question)
print("Reasoning:", result["reasoning"])
print("Answer:", result["answer"])
print("Path taken:", "RESEARCH" if result["needs_research"] else "DIRECT")

In [None]:
# Test with complex question
complex_question = "What are the implications of quantum computing on cryptography?"

result = research_agent.invoke({
    "question": complex_question,
    "answer": "",
    "needs_research": False,
    "reasoning": ""
})

print("Complex Question:", complex_question)
print("Reasoning:", result["reasoning"])
print("Answer:", result["answer"][:200] + "...")
print("Path taken:", "RESEARCH" if result["needs_research"] else "DIRECT")

### Key Takeaways

- **StateGraph**: Define state schema with TypedDict
- **Nodes**: Functions that modify state
- **Conditional edges**: Dynamic routing with `add_conditional_edges`
- **Router function**: Returns node name based on state
- **Compile & invoke**: Just like LCEL chains

---

## Section 4.5: Tools and Function Calling

### Learning Objectives
- Understand what tools are and when to use them
- Define tools using @tool decorator
- Use ToolNode component for tool execution
- Use tools_condition for automatic routing
- Build a complete tool-enabled agent

### What Are Tools?

**Tools** are functions that LLMs can call to get external data or perform actions they can't do on their own.

**Examples of tools**:
- üîç Web search (Tavily, Google)
- üßÆ Calculator (math operations)
- üìö Database queries (SQL, vector stores)
- üåê API calls (weather, stock prices, etc.)
- üìù File operations (read, write, delete)

**When to use tools vs custom nodes**:
- **Use tools when**: LLM should decide what action to take based on user input
- **Use custom nodes when**: You always perform the same deterministic operation

---

In [None]:
# Subsection 1: Defining a Simple Tool
from langchain.tools import tool

@tool
def get_word_length(word: str) -> int:
    """Returns the length of a word."""
    return len(word)

# Test the tool
print(f"Tool name: {get_word_length.name}")
print(f"Tool description: {get_word_length.description}")
print(f"Test: get_word_length('hello') = {get_word_length.invoke('hello')}")

print("\n‚úÖ Simple tool defined!")

### Using Tools with LLM

How do LLMs know about tools? We **bind tools** to the LLM using `bind_tools()`.

When the LLM receives a question, it can:
1. **Respond directly** (if it knows the answer)
2. **Call a tool** (if it needs external data)

The LLM returns **tool_calls** in its response when it wants to use a tool.

In [None]:
# Subsection 2: LLM with Tools
from langchain_core.messages import HumanMessage

# Define multiple tools
@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

@tool
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b

# Bind tools to LLM
tools = [multiply, add]
llm_with_tools = llm.bind_tools(tools)

# LLM decides which tool to call
response = llm_with_tools.invoke([HumanMessage(content="What is 3 times 4?")])

print("LLM Response:")
print(f"  Content: {response.content}")
print(f"  Tool calls: {response.tool_calls}")

print("\n‚úÖ LLM decided to call the 'multiply' tool!")

### ToolNode - Executing Tools Automatically

**Problem**: The LLM returns tool_calls, but doesn't actually execute them.

**Solution**: `ToolNode` is a pre-built component that:
1. Takes messages with tool_calls
2. Executes the tools
3. Returns ToolMessage with results

**Why use it?**
- ‚úÖ Handles tool execution automatically
- ‚úÖ Error handling built-in
- ‚úÖ Properly formats results as ToolMessage

**Important**: ToolNode is designed to work within compiled graphs where config and state are managed automatically. You'll see it in action in Subsection 5 where we build a complete tool-enabled agent.

In [None]:
# Subsection 3: Understanding ToolNode Conceptually

# ToolNode is a pre-built component that executes tool calls
# It's designed to work within compiled graphs, not standalone

# What ToolNode does:
# 1. Takes messages with tool_calls (from LLM)
# 2. Executes the requested tools
# 3. Returns ToolMessage objects with results

# Example of what messages look like:
from langchain_core.messages import AIMessage, ToolMessage

# Step 1: LLM returns AIMessage with tool_calls
ai_message_example = AIMessage(
    content="",
    tool_calls=[{
        'name': 'multiply',
        'args': {'a': 3, 'b': 4},
        'id': 'call_1'
    }]
)

print("üìã What the LLM returns:")
print(f"  Tool to call: {ai_message_example.tool_calls[0]['name']}")
print(f"  Arguments: {ai_message_example.tool_calls[0]['args']}")

# Step 2: ToolNode would execute multiply(3, 4) and return:
tool_message_example = ToolMessage(
    content="12",  # Result of multiply(3, 4)
    tool_call_id="call_1"
)

print(f"\nüîß What ToolNode returns:")
print(f"  Result: {tool_message_example.content}")
print(f"  Message type: {type(tool_message_example).__name__}")

print("\n‚úÖ ToolNode automates this execution within graphs!")
print("üí° See the full working example in Subsection 5 below.")

### tools_condition - Automatic Routing

**Problem**: How do we know when to route to ToolNode vs END?

**Solution**: `tools_condition` is a pre-built router function that:
- Returns `"tools"` if the last message has tool_calls
- Returns `END` if no tool_calls exist

**This replaces** writing custom router functions for tools!

In [None]:
# Subsection 4: tools_condition
from langgraph.prebuilt import tools_condition

# Example state with tool calls
state_with_tools = {
    "messages": [AIMessage(
        content="",
        tool_calls=[{'name': 'multiply', 'args': {'a': 3, 'b': 4}, 'id': 'call_1'}]
    )]
}

# Example state without tool calls
state_without_tools = {
    "messages": [AIMessage(content="The answer is 12")]
}

# tools_condition checks for tool_calls
print("Routing decisions:")
print(f"  State WITH tool_calls: {tools_condition(state_with_tools)}")
print(f"  State WITHOUT tool_calls: {tools_condition(state_without_tools)}")

print("\n‚úÖ tools_condition automatically determines routing!")

### Building a Complete Tool-Enabled Agent

Now let's put it all together! We'll build an agent that:
1. Receives user questions
2. Decides whether to use tools
3. Executes tools if needed
4. Processes results and responds

**Key components**:
- `AgentState`: State with messages
- `agent_node`: LLM with bound tools
- `ToolNode`: Executes tools
- `tools_condition`: Routes to tools or END

In [None]:
# Subsection 5: Complete Tool-Enabled Agent
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import BaseMessage
import operator

# State schema
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]

# Define tools
@tool
def search_web(query: str) -> str:
    """Search the web for information (simulated)."""
    return f"Search results for '{query}': LangGraph is a framework for building stateful, multi-actor applications with LLMs."

@tool
def calculate(expression: str) -> str:
    """Calculate a mathematical expression (simulated)."""
    try:
        result = eval(expression)  # Note: eval is unsafe in production!
        return str(result)
    except:
        return "Error in calculation"

# Tools list
agent_tools = [search_web, calculate]

# Node 1: Agent (LLM with tools)
def agent_node(state: AgentState):
    """LLM decides whether to use tools or respond."""
    llm_with_tools = llm.bind_tools(agent_tools)
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# Build graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(agent_tools))  # ToolNode executes tools

# Add edges
workflow.add_edge(START, "agent")

# Conditional routing: if LLM calls tools ‚Üí execute them, otherwise END
workflow.add_conditional_edges(
    "agent",
    tools_condition,  # Pre-built router!
    {
        "tools": "tools",  # If tool_calls exist ‚Üí go to tools
        END: END           # Otherwise ‚Üí end
    }
)

# After tools execute, loop back to agent
workflow.add_edge("tools", "agent")

# Compile
agent_with_tools = workflow.compile()

print("‚úÖ Tool-enabled agent created!")
print("\nGraph structure:")
print("  START ‚Üí agent ‚Üí tools_condition")
print("             ‚Üì            ‚Üì")
print("            END      tools ‚Üí agent")

### Testing the Agent

Let's test our agent with different types of questions!

In [None]:
# Test 1: Question that requires search
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

result = agent_with_tools.invoke({
    "messages": [HumanMessage(content="What is LangGraph?")]
})

print("=== Test 1: Web Search ===")
print("\nConversation:")
for msg in result["messages"]:
    if isinstance(msg, HumanMessage):
        print(f"üë§ Human: {msg.content}")
    elif isinstance(msg, AIMessage):
        if msg.tool_calls:
            print(f"ü§ñ AI: [Calling tool: {msg.tool_calls[0]['name']}]")
        else:
            print(f"ü§ñ AI: {msg.content}")
    elif isinstance(msg, ToolMessage):
        print(f"üîß Tool: {msg.content[:80]}...")

In [None]:
# Test 2: Question that requires calculation
result = agent_with_tools.invoke({
    "messages": [HumanMessage(content="What is 25 * 4 + 10?")]
})

print("\n=== Test 2: Calculation ===")
print("\nConversation:")
for msg in result["messages"]:
    if isinstance(msg, HumanMessage):
        print(f"üë§ Human: {msg.content}")
    elif isinstance(msg, AIMessage):
        if msg.tool_calls:
            print(f"ü§ñ AI: [Calling tool: {msg.tool_calls[0]['name']}]")
        else:
            print(f"ü§ñ AI: {msg.content}")
    elif isinstance(msg, ToolMessage):
        print(f"üîß Tool: {msg.content}")

### Key Takeaways

**Core Concepts**:
- **Tools**: Functions that LLMs can call to get external data
- **`@tool` decorator**: Define tools with type hints and docstrings
- **`bind_tools()`**: Attach tools to LLM so it knows they're available
- **`ToolNode`**: Pre-built component that executes tool calls
- **`tools_condition`**: Pre-built router (routes to "tools" if tool_calls exist, END otherwise)

**The Pattern**:
```
User ‚Üí Agent (LLM) ‚Üí tools_condition
           ‚Üì              ‚Üì
          END        ToolNode ‚Üí Agent (process results) ‚Üí Response
```

**When to use tools**:
- ‚úÖ Need external data (web search, APIs, databases)
- ‚úÖ Need calculations, code execution
- ‚úÖ LLM should decide what action to take

**When NOT to use tools**:
- ‚ùå Simple deterministic logic ‚Üí use custom nodes
- ‚ùå Always need same operation ‚Üí use custom nodes
- ‚ùå No LLM decision needed ‚Üí use custom nodes

**Key Benefits of ToolNode + tools_condition**:
- ‚úÖ No manual tool execution code
- ‚úÖ No custom routing logic for tools
- ‚úÖ Built-in error handling
- ‚úÖ Properly formatted ToolMessages

---

## Section 6: Conditional Routing and Cycles

### Learning Objectives
- Implement loops and retries
- Add cycle prevention
- Build iterative workflows

### Use Case: Code Generator with Testing

**Goal**: Build an agent that:
1. Generates code
2. Tests the code
3. If tests fail, fix and retry (up to 3 times)
4. If tests pass, deliver

### State Schema with Iteration Tracking

In [None]:
# Code generator state
class CodeGenState(TypedDict):
    requirement: str
    code: str
    test_result: str
    iteration: int
    status: str  # "success", "failed", "retry"

# Node 1: Generate code
def generate_code(state: CodeGenState):
    """Generate code based on requirement"""
    requirement = state["requirement"]
    iteration = state.get("iteration", 0)
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a Python code generator. Generate clean, working code."),
        ("human", "Requirement: {requirement}\n\nGenerate Python code.")
    ])
    
    if iteration > 0:
        # Include previous test failure
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a Python code generator. Fix the code based on test failure."),
            ("human", "Requirement: {requirement}\n\nPrevious test result: {test_result}\n\nGenerate fixed code.")
        ])
    
    chain = prompt | llm | StrOutputParser()
    code = chain.invoke({"requirement": requirement, "test_result": state.get("test_result", "")})
    
    return {
        "code": code,
        "iteration": iteration + 1
    }

# Node 2: Test code (simulated)
def test_code(state: CodeGenState):
    """Simulate testing the code"""
    code = state["code"]
    iteration = state["iteration"]
    
    # Simulate: first 2 attempts fail, 3rd succeeds
    if iteration < 3:
        return {
            "test_result": f"Test failed (attempt {iteration}): Syntax error",
            "status": "retry"
        }
    else:
        return {
            "test_result": "All tests passed!",
            "status": "success"
        }

# Router: Decide next step
def route_after_test(state: CodeGenState):
    """Route based on test result and iteration count"""
    if state["status"] == "success":
        return "success"  # ‚úÖ Fixed: Return routing key, not node name
    elif state["iteration"] >= 3:
        return "failed"
    else:
        return "retry"

# Node 3: Deliver
def deliver_code(state: CodeGenState):
    """Deliver successful code"""
    return {"status": "delivered"}

# Node 4: Failed
def mark_failed(state: CodeGenState):
    """Mark as failed after max retries"""
    return {"status": "failed_max_retries"}

print("‚úÖ Code generator nodes defined")

## Understanding Cycles and Loops

LangGraph allows cycles - nodes can route back to themselves or earlier nodes.

### What are Cycles?

**Cycle**: When a node can route back to itself or a previous node in the graph.

```python
workflow.add_conditional_edges(
    "test",
    route_test_result,
    {
        "pass": "deploy",
        "retry": "generate"  # ‚Üê Loop back to earlier node!
    }
)
```

**Visual**:
```
generate ‚Üí test ‚Üí [pass] ‚Üí deploy ‚Üí END
            ‚Üë       |
            ‚îî‚îÄ[retry]
```

### Why Cycles?

**Use cases**:
- Retry logic (regenerate until tests pass)
- Iterative refinement (improve until quality threshold met)
- Human feedback loops (revise until approved)
- Multi-turn conversations (continue until user satisfied)

### Preventing Infinite Loops

**Problem**: Cycles can loop forever!

**Solution**: Iteration counter with max check

```python
class State(TypedDict):
    iteration: int
    max_iterations: int
    content: str

def route_test_result(state: State) -> str:
    # Check iteration limit
    if state["iteration"] >= state["max_iterations"]:
        return "fail"  # Stop looping

    # Check test result
    if tests_pass(state["content"]):
        return "pass"
    else:
        return "retry"  # Loop back
```

**Increment counter in node**:
```python
def generate(state: State):
    # Generate content
    new_content = generate_code()

    # Increment iteration counter
    return {
        "content": new_content,
        "iteration": state["iteration"] + 1
    }
```

### Cycle Best Practices

‚úÖ **Always have termination condition** (max iterations, timeout, success criteria)
‚úÖ **Track iteration count** in state
‚úÖ **Log iteration progress** for debugging
‚úÖ **Provide escape hatch** (manual override, fallback)

‚ùå **Don't assume** first iteration will succeed
‚ùå **Don't create** cycles without counters
‚ùå **Don't set** max_iterations too high (costs!)

### Example: Retry with Limit

```python
# Initial state
{"iteration": 0, "max_iterations": 3, "content": ""}

# Iteration 1: generate ‚Üí test ‚Üí retry ‚Üí generate (iteration=1)
# Iteration 2: generate ‚Üí test ‚Üí retry ‚Üí generate (iteration=2)
# Iteration 3: generate ‚Üí test ‚Üí retry ‚Üí generate (iteration=3)
# Iteration 4: generate ‚Üí test ‚Üí [max reached] ‚Üí fail ‚Üí END
```

üéØ **Key insight**: Cycles are powerful but require careful termination logic.

Now let's implement a retry loop!

---

In [None]:
# Build graph with cycle
workflow = StateGraph(CodeGenState)

# Add nodes
workflow.add_node("generate", generate_code)
workflow.add_node("test", test_code)
workflow.add_node("deliver", deliver_code)
workflow.add_node("failed", mark_failed)

# Edges
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "test")

# Conditional routing after test
workflow.add_conditional_edges(
    "test",
    route_after_test,
    {
        "retry": "generate",  # Loop back!
        "success": "deliver",
        "failed": "failed"
    }
)

workflow.add_edge("deliver", END)
workflow.add_edge("failed", END)

code_gen_agent = workflow.compile()

print("‚úÖ Code generator with retry logic compiled!")

In [None]:
# Test the cycle
result = code_gen_agent.invoke({
    "requirement": "Write a function to calculate fibonacci numbers",
    "code": "",
    "test_result": "",
    "iteration": 0,
    "status": ""
})

print(f"Final status: {result['status']}")
print(f"Iterations: {result['iteration']}")
print(f"Test result: {result['test_result']}")
print(f"\n‚úÖ Graph executed with {result['iteration']} iterations!")

### Key Takeaways

- **Cycles**: Nodes can loop back to earlier nodes
- **Iteration tracking**: Use state to count loops
- **Cycle prevention**: Set max iterations to avoid infinite loops
- **Conditional routing**: Route based on state (retry/success/fail)
- **Real-world use**: Code generation, validation workflows, retries

---

## Section 7: Multi-Agent Patterns

### Learning Objectives
- Understand supervisor pattern
- Build multi-agent system with specialized agents
- Implement centralized state management

### Supervisor Pattern

**Architecture**: One supervisor orchestrates multiple specialized agents

```
           ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
           ‚îÇ Supervisor  ‚îÇ
           ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                  ‚îÇ
       ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
       ‚Üì          ‚Üì          ‚Üì
  [Researcher] [Writer] [Critic]
```

### Use Case: Content Creation Team

Build a team with:
1. **Researcher**: Gathers information
2. **Writer**: Creates content
3. **Critic**: Reviews and provides feedback
4. **Supervisor**: Orchestrates the workflow

In [None]:
# Multi-agent state
from typing import Literal

class MultiAgentState(TypedDict):
    messages: Annotated[list[str], operator.add]  # All agent communications
    topic: str  # What to write about
    research: str  # Research findings
    draft: str  # Written content
    feedback: str  # Critic feedback
    next_agent: str  # Who to call next
    final_content: str  # Final approved content

print("‚úÖ Multi-agent state defined")

In [None]:
# Agent 1: Researcher
def researcher_agent(state: MultiAgentState):
    """Research the topic"""
    topic = state["topic"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a research specialist. Provide key facts and information about the topic."),
        ("human", "Research: {topic}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    research = chain.invoke({"topic": topic})
    
    return {
        "research": research,
        "messages": [f"Researcher: Completed research on '{topic}'"]
    }

# Agent 2: Writer
def writer_agent(state: MultiAgentState):
    """Write content based on research"""
    topic = state["topic"]
    research = state["research"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a professional writer. Create engaging content based on the research."),
        ("human", "Topic: {topic}\n\nResearch:\n{research}\n\nWrite a short article.")
    ])
    
    chain = prompt | llm | StrOutputParser()
    draft = chain.invoke({"topic": topic, "research": research})
    
    return {
        "draft": draft,
        "messages": ["Writer: Completed first draft"]
    }

# Agent 3: Critic
def critic_agent(state: MultiAgentState):
    """Review and provide feedback"""
    draft = state["draft"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a content critic. Review the draft and decide if it's APPROVED or needs REVISION. "
                   "If approved, respond with 'APPROVED'. If not, provide specific feedback."),
        ("human", "Draft:\n{draft}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    feedback = chain.invoke({"draft": draft})
    
    return {
        "feedback": feedback,
        "messages": [f"Critic: {feedback[:50]}..."]
    }

# Supervisor: Orchestrate workflow
def supervisor_agent(state: MultiAgentState):
    """Decide which agent to call next"""
    messages = state["messages"]
    
    # Simple routing logic
    if not state.get("research"):
        return {"next_agent": "researcher"}
    elif not state.get("draft"):
        return {"next_agent": "writer"}
    elif not state.get("feedback"):
        return {"next_agent": "critic"}
    elif "APPROVED" in state.get("feedback", "").upper():
        return {
            "next_agent": "END",
            "final_content": state["draft"],
            "messages": ["Supervisor: Content approved!"]
        }
    else:
        # Need revision - go back to writer
        return {
            "next_agent": "writer",
            "messages": ["Supervisor: Requesting revision"]
        }

print("‚úÖ All agents defined")

In [None]:
# Build multi-agent graph
workflow = StateGraph(MultiAgentState)

# Add all agents
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("critic", critic_agent)

# Supervisor routes to agents
def route_supervisor(state: MultiAgentState):
    """Route based on supervisor's decision"""
    next_agent = state.get("next_agent", "researcher")
    return next_agent

workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_supervisor,
    {
        "researcher": "researcher",
        "writer": "writer",
        "critic": "critic",
        "END": END
    }
)

# All agents return to supervisor
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("writer", "supervisor")
workflow.add_edge("critic", "supervisor")

# Compile
multi_agent_system = workflow.compile()

print("‚úÖ Multi-agent system compiled!")

In [None]:
# Test multi-agent system
result = multi_agent_system.invoke({
    "messages": [],
    "topic": "The future of artificial intelligence",
    "research": "",
    "draft": "",
    "feedback": "",
    "next_agent": "",
    "final_content": ""
})

print("Agent Communication Log:")
for msg in result["messages"]:
    print(f"  {msg}")

print("\nFinal Content:")
print(result["final_content"][:300] + "...")

### Key Takeaways

- **Supervisor pattern**: One orchestrator, multiple specialists
- **Centralized state**: All agents communicate via shared state
- **Conditional routing**: Supervisor decides next agent
- **Cycles**: Can loop back (e.g., writer ‚Üí critic ‚Üí writer)
- **Production-ready**: Scales to complex multi-agent systems

---

## Section 8: Human-in-the-Loop Workflows

### Learning Objectives
- Implement interrupt points for human approval
- Resume execution after human input
- Build approval workflows

### Why Human-in-the-Loop?

**Use cases**:
- Approval workflows (e.g., review before publishing)
- Sensitive operations (e.g., confirm before deleting)
- Quality control (e.g., human review of AI output)
- Data collection (e.g., gather additional input mid-workflow)

### Interrupt Mechanisms

LangGraph supports interrupts:
- **Before node execution**: `interrupt_before=["node_name"]`
- **After node execution**: `interrupt_after=["node_name"]`
- **Manual checkpointing**: Save state, resume later

## Understanding Checkpointing and Persistence

Checkpointing saves graph state between invocations. Required for interrupts and resuming workflows.

### What is a Checkpointer?

A **checkpointer** saves state snapshots at each step of execution.

**Without checkpointer**:
```python
app = workflow.compile()  # No persistence
result = app.invoke(state)  # Runs to completion, then state lost
```

**With checkpointer**:
```python
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
result = app.invoke(state, config={"configurable": {"thread_id": "1"}})
# State saved! Can resume later.
```

### Why Checkpointing?

**Enables**:
1. **Interrupts**: Pause execution for human approval
2. **Resuming**: Continue from where you left off
3. **Persistence**: Survive crashes and restarts
4. **Multi-turn**: Maintain state across conversations
5. **Debugging**: Inspect state at each step

**Without checkpointer**: None of these work!

### MemorySaver vs SqliteSaver

**MemorySaver** (in-memory):
```python
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
```
- ‚úÖ Fast
- ‚úÖ Simple setup
- ‚ùå Lost on restart
- ‚ùå Not production-ready

**SqliteSaver** (persistent):
```python
from langgraph.checkpoint.sqlite import SqliteSaver
saver = SqliteSaver.from_conn_string("checkpoints.db")
```
- ‚úÖ Persistent (survives restarts)
- ‚úÖ Production-ready
- ‚úÖ Can inspect history
- ‚ö†Ô∏è Slightly slower

**When to use**:
- Notebooks/demos: MemorySaver
- Production: SqliteSaver

### How Checkpointing Works

**Execution with checkpointer**:
```
invoke(state, config={"configurable": {"thread_id": "chat-1"}})
         ‚Üì
Entry node executes
         ‚Üì
Save checkpoint (thread_id="chat-1", step=1)
         ‚Üì
Next node executes
         ‚Üì
Save checkpoint (thread_id="chat-1", step=2)
         ‚Üì
... continue until END or interrupt
```

### thread_id: Isolating State

**thread_id** identifies separate workflow instances.

```python
# User Alice's workflow
app.invoke(state, config={"configurable": {"thread_id": "alice"}})

# User Bob's workflow (completely separate!)
app.invoke(state, config={"configurable": {"thread_id": "bob"}})
```

**Different thread_id** = different state = different execution path.

**Same thread_id** = resume from last checkpoint.

üéØ **Key insight**: Checkpointer + thread_id enable multi-user, persistent workflows.

Now let's add checkpointing!

---

In [None]:
# Human-in-the-loop example: Content approval workflow
from langgraph.checkpoint.memory import MemorySaver

class ApprovalState(TypedDict):
    content: str
    approved: bool
    feedback: str

def generate_content(state: ApprovalState):
    """Generate content (simulated)"""
    return {"content": "Draft article: AI is transforming the world..."}

def publish_content(state: ApprovalState):
    """Publish approved content"""
    return {"feedback": f"Published: {state['content'][:50]}..."}

# Build workflow with interrupt
approval_workflow = StateGraph(ApprovalState)
approval_workflow.add_node("generate", generate_content)
approval_workflow.add_node("publish", publish_content)

approval_workflow.add_edge(START, "generate")
approval_workflow.add_edge("generate", "publish")
approval_workflow.add_edge("publish", END)

# Compile with checkpointer (required for interrupts)
memory = MemorySaver()
approval_app = approval_workflow.compile(
    checkpointer=memory,
    interrupt_before=["publish"]  # Pause before publishing
)

print("‚úÖ Approval workflow with interrupt created")

## Understanding Interrupts and Human-in-the-Loop

Interrupts pause execution for human approval or input. Essential for production workflows.

### What are Interrupts?

**Interrupts** pause graph execution at specified nodes.

```python
app = workflow.compile(
    checkpointer=memory,
    interrupt_before=["publish"]  # Pause BEFORE publish node
)
```

**Two types**:
- **interrupt_before**: Pause BEFORE executing specified nodes
- **interrupt_after**: Pause AFTER executing specified nodes

### How Interrupts Work

**Execution flow**:
```
invoke(state, config={"configurable": {"thread_id": "1"}})
         ‚Üì
Nodes execute normally
         ‚Üì
Reach "publish" node ‚Üí INTERRUPT!
         ‚Üì
Save checkpoint and return current state
         ‚Üì
Wait for human approval...
         ‚Üì
invoke(None, config={"configurable": {"thread_id": "1"}})
         ‚Üì
Load checkpoint and resume from "publish" node
         ‚Üì
Continue to END
```

### Resuming from Interrupt

**Key pattern**: `invoke(None, config)` to resume.

```python
# Step 1: Initial invocation (pauses at interrupt)
result = app.invoke(
    {"content": "Draft blog post"},
    config={"configurable": {"thread_id": "approval-1"}}
)
print(result)  # Shows state at interrupt point

# Step 2: Human reviews and approves
# ... (human decision-making)

# Step 3: Resume execution
result = app.invoke(
    None,  # ‚Üê None means "load from checkpoint"
    config={"configurable": {"thread_id": "approval-1"}}  # Same thread_id!
)
print(result)  # Shows final state after completion
```

**Why None?**
- None tells LangGraph "don't use new state, load from checkpoint"
- config with same thread_id specifies which checkpoint to load
- Execution picks up exactly where it left off

### Use Cases

**Approval workflows**:
```python
interrupt_before=["publish"]  # Require approval before publishing
```

**Human feedback**:
```python
interrupt_after=["generate"]  # Let human review and edit generation
```

**Multi-stage approval**:
```python
interrupt_before=["review", "publish"]  # Two approval gates
```

### Requirements

**Interrupts require checkpointer**:
```python
# ‚ùå This won't work
app = workflow.compile(interrupt_before=["publish"])

# ‚úÖ This works
app = workflow.compile(
    checkpointer=memory,
    interrupt_before=["publish"]
)
```

Without checkpointer, graph cannot save state to resume later.

### Checking Interrupt Status

**How to know if interrupted?**
- invoke() returns state at interrupt point
- Check if all nodes completed or stopped mid-execution
- Use graph tracing/logging (covered in advanced topics)

üéØ **Key insight**: Interrupts + checkpointing enable human-in-the-loop workflows.

Now let's build an approval workflow!

---

In [None]:
# Run until interrupt - ACTUAL human-in-the-loop demo
config = {"configurable": {"thread_id": "approval-1"}}

print("üöÄ Starting approval workflow...")
result = approval_app.invoke({
    "content": "",
    "approved": False,
    "feedback": ""
}, config)

print("\n" + "="*60)
print("üìÑ GENERATED CONTENT:")
print("="*60)
print(result['content'])
print("="*60)

print("\n‚è∏Ô∏è  WORKFLOW PAUSED before 'publish' node")
print("\nüîç Human Review Required!")
print("In a real application, this content would be shown to a reviewer.")

# ACTUALLY WAIT for human input
approval = input("\nüë§ Type 'yes' to approve and publish, or 'no' to reject: ").strip().lower()

print("\n" + "-"*60)

if approval == 'yes':
    print("‚úÖ APPROVED by human reviewer!")
    print("üì§ Resuming workflow to publish content...\n")
    
    # Resume workflow
    final_result = approval_app.invoke(None, config)
    
    print(f"‚úÖ {final_result['feedback']}")
    print("\nüéâ Workflow completed after human approval!")
    
else:
    print("‚ùå REJECTED by human reviewer")
    print("üõë Workflow stopped - content was NOT published.")
    print("\nüí° In production, you could:")
    print("   ‚Ä¢ Send content back for revision")
    print("   ‚Ä¢ Log rejection reason")
    print("   ‚Ä¢ Trigger alternative workflow")

### Key Takeaways

- **Interrupts**: Pause before/after nodes for human input
- **Checkpointing required**: Must use checkpointer for interrupts
- **Resume with None**: Continue from checkpoint
- **Use cases**: Approval workflows, quality control, data collection

---

## Section 9: Checkpointing and Persistence

### Learning Objectives
- Understand checkpointing for state persistence
- Use MemorySaver for in-memory persistence
- Implement conversation threads

### Checkpointing - Persistent State

**Checkpointing** saves state between runs, enabling:
- Resume from failure
- Time-travel debugging
- Multi-turn conversations

In [None]:
# Checkpointing with MemorySaver
from langgraph.checkpoint.memory import MemorySaver

# Define simple state
class ConversationState(TypedDict):
    messages: Annotated[list[str], operator.add]
    score: Annotated[int, lambda x, y: max(x, y)]

def process_turn(state: ConversationState):
    """Process a conversation turn"""
    return {
        "messages": ["Processed turn"],
        "score": 10
    }

# Build graph
workflow = StateGraph(ConversationState)
workflow.add_node("process", process_turn)
workflow.add_edge(START, "process")
workflow.add_edge("process", END)

# Create in-memory checkpointer
memory = MemorySaver()

# Compile graph with checkpointer
app_with_memory = workflow.compile(checkpointer=memory)

print("‚úÖ Graph with checkpointing created")

In [None]:
# Run with thread_id for persistence
config = {"configurable": {"thread_id": "conversation-1"}}

result1 = app_with_memory.invoke({
    "messages": ["Turn 1"],
    "score": 0
}, config)

print("First run completed")
print(f"  Score: {result1['score']}")
print(f"  Messages: {len(result1['messages'])} messages")

# Run again with same thread_id - state persists!
result2 = app_with_memory.invoke({
    "messages": ["Turn 2"],
    "score": 15  # Higher score
}, config)

print("\nSecond run completed")
print(f"  Score: {result2['score']}")  # Should be 15 (max of previous 10 and new 15)
print(f"  Messages: {len(result2['messages'])} messages")  # Should accumulate

print("\n‚úÖ Checkpointing preserves state across runs!")

### Key Takeaways

- **MemorySaver**: In-memory checkpointer for demos
- **thread_id**: Maintain separate conversation threads
- **State persistence**: State accumulates across invocations
- **Production**: Use SqliteSaver or other persistent storage

---

## Section 10: Production Patterns and Best Practices

### Learning Objectives
- Implement error handling in nodes
- Add retry logic
- Build fallback mechanisms
- Production best practices

### Error Handling in Nodes

In [None]:
# Production workflow with error handling and retry logic
class ProductionState(TypedDict):
    input: str
    result: str
    error: str
    retry_count: int

def process_with_retry(state: ProductionState):
    """Process input with error handling"""
    try:
        # Simulate processing
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful assistant. Provide thoughtful answers."),
            ("human", "{input}")
        ])
        
        chain = prompt | llm | StrOutputParser()
        result = chain.invoke({"input": state["input"]})
        
        return {
            "result": result,
            "error": ""
        }
    except Exception as e:
        # Increment retry counter
        retry_count = state.get("retry_count", 0) + 1
        
        if retry_count < 3:  # Max 3 retries
            print(f"‚ö†Ô∏è  Error occurred: {str(e)}")
            print(f"üîÑ Retrying... (attempt {retry_count + 1}/3)")
            return {
                "error": str(e),
                "retry_count": retry_count
            }
        else:
            print(f"‚ùå Max retries reached. Failing.")
            return {
                "error": f"Failed after 3 retries: {str(e)}",
                "retry_count": retry_count
            }

def check_error(state: ProductionState):
    """Route based on error state"""
    if state.get("error") and state.get("retry_count", 0) < 3:
        return "retry"
    elif state.get("error"):
        return "failed"
    else:
        return "success"

# Build production graph with error handling
prod_workflow = StateGraph(ProductionState)

# Add nodes
prod_workflow.add_node("process", process_with_retry)

# Add conditional routing for retries
prod_workflow.add_edge(START, "process")  # ‚úÖ Fixed: Use prod_workflow instead of workflow
prod_workflow.add_conditional_edges(
    "process",
    check_error,
    {
        "retry": "process",  # Loop back to retry
        "success": END,
        "failed": END
    }
)

# Compile
prod_app = prod_workflow.compile()

print("‚úÖ Production workflow with error handling created!")

In [None]:
# Test production workflow
result = prod_app.invoke({
    "input": "What is the meaning of life?",
    "result": "",
    "error": "",
    "retry_count": 0
})

if result["error"]:
    print(f"‚ùå Error: {result['error']}")
    print(f"   Retries: {result['retry_count']}")
else:
    print(f"‚úÖ Success: {result['result'][:100]}...")
    print(f"   Retries: {result['retry_count']}")

### Production Best Practices

**1. Observability**
- Use LangSmith for tracing and monitoring
- Log all state transitions
- Track token usage and costs

**2. Error Handling**
- Try-except in every node
- Retry with exponential backoff
- Fallback paths for failures

**3. State Management**
- Explicit schemas with TypedDict
- Immutable state updates (use reducers)
- Checkpointing for reliability

**4. Performance**
- Parallel execution where possible
- Streaming for long-running tasks
- Cache expensive operations

**5. Testing**
- Unit test each node individually
- Integration tests for full graph
- LLM-as-a-judge for quality evaluation

### Key Takeaways

- **Error handling**: Try-except in all nodes
- **Retry logic**: Loop back on failure, max retries
- **Observability**: LangSmith for production monitoring
- **Checkpointing**: Reliability and resume capability
- **Testing**: Unit and integration tests essential

---

## Exercise: Build an Approval Workflow

**Duration**: ~20 minutes

Build a simple document approval workflow with these requirements:

### Requirements

1. **State**: document_content, approved (bool), reviewer_feedback, revision_count
2. **Nodes**:
   - generate_document: Creates sample document
   - review_document: Human reviews (interrupt point)
   - publish_document: Publishes if approved
   - revise_document: Handles rejections

3. **Flow**:
   - generate ‚Üí review (interrupt) ‚Üí approved? ‚Üí publish OR revise
   - If revised, loop back to review

**Bonus**: Add revision counter, max 3 revisions

### Hints

- Use `interrupt_before=["review_document"]` for human review
- Use conditional edges to route based on approval
- Track revision_count in state
- Use checkpointer (MemorySaver) for interrupts

### Solution Template

In [None]:
# Exercise: Build your approval workflow here

# 1. Define state
class DocumentState(TypedDict):
    document_content: str
    approved: bool
    reviewer_feedback: str
    revision_count: int

# 2. Define nodes
def generate_document(state: DocumentState):
    # TODO: Implement document generation
    pass

def review_document(state: DocumentState):
    # TODO: Implement human review (this is where interrupt happens)
    pass

def publish_document(state: DocumentState):
    # TODO: Implement publishing
    pass

def revise_document(state: DocumentState):
    # TODO: Implement revision
    pass

# 3. Build graph
# TODO: Create StateGraph, add nodes, add edges, compile with interrupt

# 4. Test workflow
# TODO: Invoke workflow and handle human approval

---

## Summary

### What You Learned

**Core Concepts**:
- LangGraph architecture: nodes, edges, state
- State management with TypedDict and reducers
- Conditional routing and cycles

**Patterns**:
- Research agent with decision points
- Code generator with retry logic
- Multi-agent supervisor pattern
- Human-in-the-loop workflows

**Production**:
- Error handling and retry logic
- Checkpointing for persistence
- Best practices for observability

### Key Takeaways

1. **LangGraph = LangChain + State + Cycles** - Use when complexity demands it

2. **State design matters** - Use TypedDict, reducers, and clear schemas

3. **Conditional routing** - Dynamic workflows with decision points

4. **Cycles enable iteration** - Loops, retries, refinement

5. **Multi-agent = specialized expertise** - Supervisor pattern enables complex workflows

6. **Human-in-the-loop** - Interrupts for approval and quality control

7. **Production-ready** - Error handling, observability, and checkpointing are essential

### Resources

**Official Documentation**:
- [LangGraph Documentation](https://www.langchain.com/langgraph)
- [LangChain Documentation](https://python.langchain.com/)
- [LangSmith](https://www.langchain.com/langsmith) - Observability platform

**Learn More**:
- [LangChain Academy](https://academy.langchain.com/) - Free course
- [Building LangGraph Blog](https://blog.langchain.com/building-langgraph/)
- [LangGraph Multi-Agent Workflows](https://blog.langchain.com/langgraph-multi-agent-workflows/)

### Next Steps

1. **Practice**: Complete the exercise above
2. **Explore**: Try different state patterns and reducers
3. **Integrate**: Combine with RAG from previous notebooks
4. **Production**: Add observability with LangSmith
5. **Advanced**: Explore function calling and tool use

### Congratulations!

You've mastered LangGraph essentials! You can now:
- Build stateful workflows with cycles
- Implement multi-agent systems
- Add human-in-the-loop workflows
- Design production-ready graphs

**You're ready for advanced agent patterns!**

---

**End of Notebook**