# LangGraph Crash Course: From State Machines to Legal AI Agents

**Interview Prep for LegalOn Technologies**  
*Building on your game dev state machine & orchestrator experience*

---

## Setup

First, install the required packages:

In [2]:
uv sync

Note: you may need to restart the kernel to use updated packages.


c:\Python314\python.exe: No module named uv


In [3]:
# Run this first!
%pip install langgraph langchain-anthropic langchain-core python-dotenv

Defaulting to user installation because normal site-packages is not writeable
Collecting langgraph
  Downloading langgraph-1.0.1-py3-none-any.whl.metadata (7.4 kB)
Collecting langchain-anthropic
  Downloading langchain_anthropic-1.0.0-py3-none-any.whl.metadata (1.9 kB)
Collecting langchain-core
  Downloading langchain_core-1.0.0-py3-none-any.whl.metadata (3.4 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting langgraph-checkpoint<4.0.0,>=2.1.0 (from langgraph)
  Downloading langgraph_checkpoint-3.0.0-py3-none-any.whl.metadata (4.2 kB)
Collecting langgraph-prebuilt<1.1.0,>=1.0.0 (from langgraph)
  Downloading langgraph_prebuilt-1.0.1-py3-none-any.whl.metadata (5.0 kB)
Collecting langgraph-sdk<0.3.0,>=0.2.2 (from langgraph)
  Downloading langgraph_sdk-0.2.9-py3-none-any.whl.metadata (1.5 kB)
Collecting pydantic>=2.7.4 (from langgraph)
  Downloading pydantic-2.12.3-py3-none-any.whl.metadata (87 kB)
Collecting xxhash>=3.5.0 (from lan

In [4]:
import os
from dotenv import load_dotenv

# Load API key
load_dotenv()
# Make sure you have ANTHROPIC_API_KEY in your .env file
# or set it here:
# os.environ["ANTHROPIC_API_KEY"] = "your-key-here"

True

---

## Concept 1: StateGraph Basics - Your Game State, but for Agents

In game programming, you had:
```python
class NPCState:
    position: Vector3
    health: int
    target: Enemy
```

LangGraph uses `TypedDict` for strongly-typed state that flows through the graph:

In [5]:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

# Define your state schema
class AgentState(TypedDict):
    message: str
    steps_taken: int
    is_complete: bool

# Nodes are just functions that take state and return state updates
def process_node(state: AgentState) -> dict:
    print(f"Processing: {state['message']}")
    return {
        "message": state["message"].upper(),
        "steps_taken": state["steps_taken"] + 1
    }

def finalize_node(state: AgentState) -> dict:
    print(f"Finalizing after {state['steps_taken']} steps")
    return {"is_complete": True}

# Build the graph
graph = StateGraph(AgentState)
graph.add_node("process", process_node)
graph.add_node("finalize", finalize_node)

# Define flow: START -> process -> finalize -> END
graph.add_edge(START, "process")
graph.add_edge("process", "finalize")
graph.add_edge("finalize", END)

# Compile and run
app = graph.compile()

# Invoke with initial state
result = app.invoke({
    "message": "hello langgraph",
    "steps_taken": 0,
    "is_complete": False
})

print("\nFinal state:")
print(result)

  from pydantic.v1.fields import FieldInfo as FieldInfoV1


Processing: hello langgraph
Finalizing after 1 steps

Final state:
{'message': 'HELLO LANGGRAPH', 'steps_taken': 1, 'is_complete': True}


**Key Insight**: 
- Nodes return **partial updates** (only changed fields)
- LangGraph **automatically merges** them back into state
- Like your game loop updating only changed properties

---

## Concept 2: Conditional Edges - Animation State Transitions

Remember animation state machines?
```python
if health < 20:
    transition_to("flee")
elif enemy_in_range:
    transition_to("attack")
```

LangGraph uses **conditional edges** for dynamic routing:

In [6]:
from typing import Literal

class CounterState(TypedDict):
    count: int
    max_count: int

def increment_node(state: CounterState) -> dict:
    new_count = state["count"] + 1
    print(f"Count: {new_count}")
    return {"count": new_count}

# Conditional routing function (like your game state machine)
def should_continue(state: CounterState) -> Literal["continue", "stop"]:
    if state["count"] >= state["max_count"]:
        return "stop"
    return "continue"

# Build graph with conditional routing
graph = StateGraph(CounterState)
graph.add_node("increment", increment_node)

graph.add_edge(START, "increment")

# Conditional edge: route based on state
graph.add_conditional_edges(
    "increment",
    should_continue,
    {
        "continue": "increment",  # Loop back to self
        "stop": END
    }
)

app = graph.compile()

result = app.invoke({"count": 0, "max_count": 5})
print(f"\nFinal count: {result['count']}")

Count: 1
Count: 2
Count: 3
Count: 4
Count: 5

Final count: 5


**Key Insight**: 
- Conditional edges enable **loops** (self-edges)
- The routing function decides the next node
- This is how agents decide to continue or finish

---

## Concept 3: ReAct Pattern - The Classic Agent Loop

This is the bread and butter of autonomous agents. Like your Claude Code orchestrator:
1. **Think** (reason about what to do)
2. **Act** (execute tool/action)
3. **Observe** (get result)
4. **Repeat** until done

In [7]:
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from typing import Annotated
from operator import add

# State with append-only messages (critical!)
class ReActState(TypedDict):
    messages: Annotated[list, add]  # 'add' means append, not replace
    next_action: str

# Initialize Claude
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)

# Reasoning node: LLM decides what to do
def reasoning_node(state: ReActState) -> dict:
    print("\n🧠 REASONING...")
    
    # Add system prompt
    messages = [
        SystemMessage(content="""You are an AI assistant. Decide if you need to:
        - 'search': Look up information
        - 'calculate': Perform a calculation
        - 'answer': Provide final answer
        
        Respond with ONLY the action name.""")
    ] + state["messages"]
    
    response = llm.invoke(messages)
    action = response.content.strip().lower()
    
    print(f"   Decision: {action}")
    
    return {
        "messages": [response],
        "next_action": action
    }

# Action node: Execute the decided action
def action_node(state: ReActState) -> dict:
    print(f"\n🔧 EXECUTING ACTION: {state['next_action']}")
    
    # Simulate tool execution
    if "search" in state["next_action"]:
        result = "[Search Result] LangGraph is a framework for building stateful LLM agents."
    elif "calculate" in state["next_action"]:
        result = "[Calculation] 2 + 2 = 4"
    else:
        result = "Action completed."
    
    print(f"   Result: {result}")
    
    return {
        "messages": [HumanMessage(content=f"Tool result: {result}")]
    }

# Router: Decide if we're done or need another loop
def should_continue(state: ReActState) -> Literal["action", "end"]:
    if "answer" in state["next_action"]:
        return "end"
    return "action"

# Build the ReAct graph
graph = StateGraph(ReActState)
graph.add_node("reasoning", reasoning_node)
graph.add_node("action", action_node)

graph.add_edge(START, "reasoning")
graph.add_edge("action", "reasoning")  # Loop back after action

graph.add_conditional_edges(
    "reasoning",
    should_continue,
    {
        "action": "action",
        "end": END
    }
)

app = graph.compile()

# Test it
result = app.invoke({
    "messages": [HumanMessage(content="What is 2+2? Then tell me what LangGraph is.")],
    "next_action": ""
})

print("\n" + "="*60)
print("FINAL MESSAGES:")
for msg in result["messages"]:
    print(f"  {msg.__class__.__name__}: {msg.content[:100]}...")


🧠 REASONING...
   Decision: i need to break this down into two parts:

1. first part: "what is 2+2?" - this is a simple calculation
2. second part: "then tell me what langgraph is" - this requires looking up information

let me start with the first action:

calculate

🔧 EXECUTING ACTION: i need to break this down into two parts:

1. first part: "what is 2+2?" - this is a simple calculation
2. second part: "then tell me what langgraph is" - this requires looking up information

let me start with the first action:

calculate
   Result: [Calculation] 2 + 2 = 4

🧠 REASONING...
   Decision: now i need to search for information about langgraph:

search

🔧 EXECUTING ACTION: now i need to search for information about langgraph:

search
   Result: [Search Result] LangGraph is a framework for building stateful LLM agents.

🧠 REASONING...
   Decision: answer

FINAL MESSAGES:
  HumanMessage: What is 2+2? Then tell me what LangGraph is....
  AIMessage: I need to break this down into two parts:

1.

**Key Insights**:
- `Annotated[list, add]` makes messages **append-only** (like Redux)
- The loop continues until the LLM decides to stop
- This is how all autonomous agents work under the hood

---

## Concept 4: Checkpointing - Time Travel for Agents

This is LangGraph's superpower for legal AI. Imagine you could save/load your game state at any point.

**Use Cases**:
- Pause agent for lawyer approval
- Resume from where you left off
- Debug by rewinding state
- A/B test different paths

In [8]:
from langgraph.checkpoint.memory import MemorySaver

class ContractState(TypedDict):
    contract_text: str
    risk_level: str
    requires_review: bool
    approved: bool

def analyze_contract(state: ContractState) -> dict:
    print("📄 Analyzing contract...")
    # Simulate risk analysis
    risk = "HIGH" if "indemnify" in state["contract_text"].lower() else "LOW"
    requires_review = risk == "HIGH"
    
    print(f"   Risk Level: {risk}")
    return {
        "risk_level": risk,
        "requires_review": requires_review
    }

def await_approval(state: ContractState) -> dict:
    print("⏸️  PAUSED: Waiting for lawyer approval...")
    # This is where execution stops!
    return {}

def finalize_contract(state: ContractState) -> dict:
    print("✅ Contract finalized!")
    return {"approved": True}

def should_pause_for_review(state: ContractState) -> Literal["await_approval", "finalize"]:
    if state["requires_review"]:
        return "await_approval"
    return "finalize"

# Build graph with checkpointing
graph = StateGraph(ContractState)
graph.add_node("analyze", analyze_contract)
graph.add_node("await_approval", await_approval)
graph.add_node("finalize", finalize_contract)

graph.add_edge(START, "analyze")
graph.add_conditional_edges(
    "analyze",
    should_pause_for_review,
    {
        "await_approval": "await_approval",
        "finalize": "finalize"
    }
)
graph.add_edge("await_approval", "finalize")
graph.add_edge("finalize", END)

# Add checkpointing (this enables pause/resume)
checkpointer = MemorySaver()
app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["await_approval"]  # Pause before this node
)

# Test with high-risk contract
thread_config = {"configurable": {"thread_id": "contract_123"}}

print("\n" + "="*60)
print("EXECUTION 1: Running until interrupt...")
print("="*60)

result = app.invoke(
    {
        "contract_text": "Party A shall indemnify Party B...",
        "risk_level": "",
        "requires_review": False,
        "approved": False
    },
    config=thread_config
)

print(f"\nState after pause: {result}")
print("\n(Lawyer reviews and approves...)")

print("\n" + "="*60)
print("EXECUTION 2: Resuming from checkpoint...")
print("="*60)

# Resume from checkpoint (same thread_id)
result = app.invoke(None, config=thread_config)

print(f"\nFinal state: {result}")


EXECUTION 1: Running until interrupt...
📄 Analyzing contract...
   Risk Level: HIGH

State after pause: {'contract_text': 'Party A shall indemnify Party B...', 'risk_level': 'HIGH', 'requires_review': True, 'approved': False}

(Lawyer reviews and approves...)

EXECUTION 2: Resuming from checkpoint...
⏸️  PAUSED: Waiting for lawyer approval...
✅ Contract finalized!

Final state: {'contract_text': 'Party A shall indemnify Party B...', 'risk_level': 'HIGH', 'requires_review': True, 'approved': True}


**Key Insights**:
- `interrupt_before` pauses execution before a node
- `thread_id` identifies a unique execution instance
- Resume by invoking with `None` and same thread_id
- Critical for legal workflows: human-in-the-loop approval

---

## Concept 5: Subgraphs - Your Manager-Worker Pattern

You built orchestrators that managed sub-agents. LangGraph makes this first-class with **subgraphs**.

Example: Contract review with specialized analyzers

In [9]:
# Worker subgraph: Clause analyzer
class ClauseState(TypedDict):
    clause_text: str
    clause_type: str
    risk_score: int

def identify_clause_type(state: ClauseState) -> dict:
    print(f"   🔍 Identifying clause type: {state['clause_text'][:50]}...")
    
    # Simple heuristic
    text = state["clause_text"].lower()
    if "liability" in text or "indemnif" in text:
        clause_type = "LIABILITY"
    elif "payment" in text or "fee" in text:
        clause_type = "PAYMENT"
    else:
        clause_type = "GENERAL"
    
    return {"clause_type": clause_type}

def assess_risk(state: ClauseState) -> dict:
    print(f"   📊 Assessing risk for {state['clause_type']} clause")
    
    risk_score = 8 if state["clause_type"] == "LIABILITY" else 3
    return {"risk_score": risk_score}

# Build worker subgraph
clause_graph = StateGraph(ClauseState)
clause_graph.add_node("identify", identify_clause_type)
clause_graph.add_node("assess", assess_risk)
clause_graph.add_edge(START, "identify")
clause_graph.add_edge("identify", "assess")
clause_graph.add_edge("assess", END)

clause_analyzer = clause_graph.compile()

# Manager graph: Orchestrates multiple clause analyses
class ContractManagerState(TypedDict):
    clauses: list[str]
    analysis_results: list[dict]
    overall_risk: str

def extract_clauses(state: ContractManagerState) -> dict:
    print("\n📋 Manager: Extracting clauses...")
    # Simulate clause extraction
    clauses = [
        "Party A shall indemnify Party B against all claims.",
        "Payment terms: Net 30 days from invoice date.",
        "This agreement is governed by the laws of California."
    ]
    return {"clauses": clauses}

def analyze_all_clauses(state: ContractManagerState) -> dict:
    print("\n🤖 Manager: Delegating to clause analyzers...")
    
    results = []
    for i, clause in enumerate(state["clauses"], 1):
        print(f"\n  Analyzing clause {i}/{len(state['clauses'])}")
        
        # Invoke the subgraph (worker)
        analysis = clause_analyzer.invoke({
            "clause_text": clause,
            "clause_type": "",
            "risk_score": 0
        })
        
        results.append(analysis)
        print(f"     Result: {analysis['clause_type']} (Risk: {analysis['risk_score']}/10)")
    
    return {"analysis_results": results}

def determine_overall_risk(state: ContractManagerState) -> dict:
    print("\n📊 Manager: Determining overall risk...")
    
    avg_risk = sum(r["risk_score"] for r in state["analysis_results"]) / len(state["analysis_results"])
    overall = "HIGH" if avg_risk > 6 else "MEDIUM" if avg_risk > 3 else "LOW"
    
    print(f"   Average risk score: {avg_risk:.1f}/10")
    print(f"   Overall risk: {overall}")
    
    return {"overall_risk": overall}

# Build manager graph
manager_graph = StateGraph(ContractManagerState)
manager_graph.add_node("extract", extract_clauses)
manager_graph.add_node("analyze", analyze_all_clauses)
manager_graph.add_node("aggregate", determine_overall_risk)

manager_graph.add_edge(START, "extract")
manager_graph.add_edge("extract", "analyze")
manager_graph.add_edge("analyze", "aggregate")
manager_graph.add_edge("aggregate", END)

manager_app = manager_graph.compile()

# Run the manager
print("="*60)
print("MANAGER-WORKER PATTERN EXECUTION")
print("="*60)

result = manager_app.invoke({
    "clauses": [],
    "analysis_results": [],
    "overall_risk": ""
})

print("\n" + "="*60)
print("FINAL RESULT")
print("="*60)
print(f"Overall Risk: {result['overall_risk']}")
print(f"Analyzed {len(result['analysis_results'])} clauses")

MANAGER-WORKER PATTERN EXECUTION

📋 Manager: Extracting clauses...

🤖 Manager: Delegating to clause analyzers...

  Analyzing clause 1/3
   🔍 Identifying clause type: Party A shall indemnify Party B against all claims...
   📊 Assessing risk for LIABILITY clause
     Result: LIABILITY (Risk: 8/10)

  Analyzing clause 2/3
   🔍 Identifying clause type: Payment terms: Net 30 days from invoice date....
   📊 Assessing risk for PAYMENT clause
     Result: PAYMENT (Risk: 3/10)

  Analyzing clause 3/3
   🔍 Identifying clause type: This agreement is governed by the laws of Californ...
   📊 Assessing risk for GENERAL clause
     Result: GENERAL (Risk: 3/10)

📊 Manager: Determining overall risk...
   Average risk score: 4.7/10
   Overall risk: MEDIUM

FINAL RESULT
Overall Risk: MEDIUM
Analyzed 3 clauses


**Key Insights**:
- Subgraphs are just compiled graphs used as nodes
- Manager invokes worker subgraphs via `.invoke()`
- Each subgraph has its own state schema
- This is how you build modular, scalable agent systems

---

## Concept 6: Parallel Execution - Your Multi-Agent Pattern

You ran multiple agents in parallel. LangGraph supports this with `Send()` API for dynamic fan-out.

In [10]:
from langgraph.types import Send
import time

class ParallelState(TypedDict):
    contract_sections: list[str]
    analyses: list[dict]

class SectionState(TypedDict):
    section: str
    section_id: int
    analysis: str

def split_contract(state: ParallelState) -> dict:
    print("📄 Splitting contract into sections...")
    sections = [
        "Definitions and Interpretations",
        "Scope of Services",
        "Payment Terms",
        "Liability and Indemnification",
        "Termination Clause"
    ]
    return {"contract_sections": sections}

# This is the magic: fan out to parallel workers
def start_parallel_analysis(state: ParallelState):
    print(f"\n🚀 Launching {len(state['contract_sections'])} parallel analyses...\n")
    
    # Send() creates a message to a specific node for each section
    return [
        Send(
            "analyze_section",
            {"section": section, "section_id": i, "analysis": ""}
        )
        for i, section in enumerate(state["contract_sections"])
    ]

def analyze_section(state: SectionState) -> dict:
    """This runs in parallel for each section"""
    print(f"   Worker {state['section_id']}: Analyzing '{state['section']}'...")
    time.sleep(0.5)  # Simulate work
    
    analysis = f"Section '{state['section']}' reviewed. Risk: MEDIUM"
    print(f"   Worker {state['section_id']}: ✅ Complete")
    
    return {"analysis": analysis}

def aggregate_results(state: ParallelState) -> dict:
    print("\n📊 Aggregating parallel results...")
    # All parallel analyses are now complete
    return {}

# Build parallel graph
parallel_graph = StateGraph(ParallelState)
parallel_graph.add_node("split", split_contract)
parallel_graph.add_node("analyze_section", analyze_section)
parallel_graph.add_node("aggregate", aggregate_results)

parallel_graph.add_edge(START, "split")
# Conditional edge returns a list of Send() objects
parallel_graph.add_conditional_edges("split", start_parallel_analysis)
parallel_graph.add_edge("analyze_section", "aggregate")
parallel_graph.add_edge("aggregate", END)

parallel_app = parallel_graph.compile()

# Run it
print("="*60)
print("PARALLEL EXECUTION DEMO")
print("="*60)

start_time = time.time()
result = parallel_app.invoke({
    "contract_sections": [],
    "analyses": []
})
elapsed = time.time() - start_time

print(f"\n⏱️  Completed in {elapsed:.2f}s (would be ~2.5s if sequential)")
print(f"Analyzed {len(result['contract_sections'])} sections in parallel")

PARALLEL EXECUTION DEMO
📄 Splitting contract into sections...

🚀 Launching 5 parallel analyses...

   Worker 0: Analyzing 'Definitions and Interpretations'...
   Worker 1: Analyzing 'Scope of Services'...
   Worker 2: Analyzing 'Payment Terms'...
   Worker 3: Analyzing 'Liability and Indemnification'...
   Worker 4: Analyzing 'Termination Clause'...
   Worker 0: ✅ Complete
   Worker 1: ✅ Complete
   Worker 4: ✅ Complete
   Worker 3: ✅ Complete
   Worker 2: ✅ Complete


InvalidUpdateError: At key 'analysis': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE

**Key Insights**:
- `Send(node_name, input_state)` creates a parallel execution path
- Return list of `Send()` from conditional edges for fan-out
- LangGraph automatically waits for all parallel nodes before continuing
- This is how you scale agent systems for large documents

---

## Concept 7: Streaming - Real-Time Updates

LangGraph can stream state updates as the graph executes. Critical for UX when agents take time.

In [11]:
import time

class StreamState(TypedDict):
    progress: str
    step: int

def step1(state: StreamState) -> dict:
    time.sleep(1)
    return {"progress": "Extracting clauses", "step": 1}

def step2(state: StreamState) -> dict:
    time.sleep(1)
    return {"progress": "Analyzing risks", "step": 2}

def step3(state: StreamState) -> dict:
    time.sleep(1)
    return {"progress": "Generating report", "step": 3}

# Build graph
stream_graph = StateGraph(StreamState)
stream_graph.add_node("step1", step1)
stream_graph.add_node("step2", step2)
stream_graph.add_node("step3", step3)

stream_graph.add_edge(START, "step1")
stream_graph.add_edge("step1", "step2")
stream_graph.add_edge("step2", "step3")
stream_graph.add_edge("step3", END)

stream_app = stream_graph.compile()

# Stream the execution
print("="*60)
print("STREAMING EXECUTION")
print("="*60)

for chunk in stream_app.stream({"progress": "Starting", "step": 0}):
    print(f"\n📡 Update: {chunk}")

print("\n✅ Streaming complete!")

STREAMING EXECUTION

📡 Update: {'step1': {'progress': 'Extracting clauses', 'step': 1}}

📡 Update: {'step2': {'progress': 'Analyzing risks', 'step': 2}}

📡 Update: {'step3': {'progress': 'Generating report', 'step': 3}}

✅ Streaming complete!


**Key Insights**:
- Use `.stream()` instead of `.invoke()` for real-time updates
- Each node completion emits a chunk
- Critical for user feedback on long-running agents
- LegalOn likely streams contract analysis progress to lawyers

---

## Concept 8: Putting It All Together - Legal Contract Reviewer

Let's build a mini version of what LegalOn might use:
1. Upload contract
2. Parallel analysis of sections
3. Risk scoring
4. Human review checkpoint
5. Final report generation

In [12]:
from typing import Annotated
from operator import add
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Send

# Main contract state
class LegalReviewState(TypedDict):
    contract_text: str
    sections: list[str]
    section_analyses: Annotated[list[dict], add]  # Append results
    overall_risk: str
    requires_lawyer_review: bool
    lawyer_approved: bool
    final_report: str

# Section analysis state (for parallel workers)
class SectionAnalysisState(TypedDict):
    section_text: str
    section_id: int
    risk_level: str
    issues: list[str]

# Node 1: Parse contract into sections
def parse_contract(state: LegalReviewState) -> dict:
    print("\n📄 Parsing contract into sections...")
    
    # Simulate section extraction
    sections = [
        "DEFINITIONS: Party A (Provider), Party B (Client)...",
        "SCOPE: Provider shall deliver services as outlined...",
        "PAYMENT: Client agrees to pay $10,000 per month...",
        "LIABILITY: Provider shall indemnify Client against ALL claims...",
        "TERMINATION: Either party may terminate with 30 days notice..."
    ]
    
    print(f"   Found {len(sections)} sections")
    return {"sections": sections}

# Node 2: Fan out to parallel section analyzers
def route_to_analyzers(state: LegalReviewState):
    print("\n🚀 Routing sections to parallel analyzers...")
    return [
        Send(
            "analyze_section_worker",
            {
                "section_text": section,
                "section_id": i,
                "risk_level": "",
                "issues": []
            }
        )
        for i, section in enumerate(state["sections"])
    ]

# Worker: Analyze individual section
def analyze_section_worker(state: SectionAnalysisState) -> dict:
    print(f"   🔍 Worker {state['section_id']}: Analyzing section...")
    
    # Simulate risk analysis
    text = state["section_text"].lower()
    
    if "indemnify" in text and "all" in text:
        risk = "CRITICAL"
        issues = ["Unlimited indemnification clause", "No cap on liability"]
    elif "termination" in text:
        risk = "MEDIUM"
        issues = ["Short notice period"]
    else:
        risk = "LOW"
        issues = []
    
    print(f"   🔍 Worker {state['section_id']}: Risk={risk}, Issues={len(issues)}")
    
    return {
        "risk_level": risk,
        "issues": issues,
        "section_analyses": [{
            "section_id": state["section_id"],
            "risk": risk,
            "issues": issues
        }]
    }

# Node 3: Aggregate parallel results
def aggregate_risks(state: LegalReviewState) -> dict:
    print("\n📊 Aggregating risk analysis...")
    
    risk_levels = [a["risk"] for a in state["section_analyses"]]
    has_critical = "CRITICAL" in risk_levels
    
    overall_risk = "HIGH" if has_critical else "MEDIUM" if "MEDIUM" in risk_levels else "LOW"
    requires_review = has_critical
    
    print(f"   Overall Risk: {overall_risk}")
    print(f"   Requires Lawyer Review: {requires_review}")
    
    return {
        "overall_risk": overall_risk,
        "requires_lawyer_review": requires_review
    }

# Node 4: Human review checkpoint
def await_lawyer_approval(state: LegalReviewState) -> dict:
    print("\n⚖️  CHECKPOINT: Awaiting lawyer approval...")
    print("   (In production, this would notify the legal team)")
    return {}

# Node 5: Generate final report
def generate_report(state: LegalReviewState) -> dict:
    print("\n📝 Generating final report...")
    
    report = f"""\n
    CONTRACT REVIEW REPORT
    =====================
    
    Overall Risk Level: {state['overall_risk']}
    Sections Analyzed: {len(state['section_analyses'])}
    
    CRITICAL ISSUES:
    """
    
    for analysis in state["section_analyses"]:
        if analysis["risk"] == "CRITICAL":
            report += f"\n    - Section {analysis['section_id']}: {', '.join(analysis['issues'])}"
    
    report += "\n\n    Lawyer Approved: " + ("✅ YES" if state.get("lawyer_approved") else "⏳ PENDING")
    
    print(report)
    return {"final_report": report}

# Router: Should we pause for lawyer?
def check_review_needed(state: LegalReviewState) -> Literal["await_approval", "generate_report"]:
    if state["requires_lawyer_review"]:
        return "await_approval"
    return "generate_report"

# Build the full legal review graph
legal_graph = StateGraph(LegalReviewState)

legal_graph.add_node("parse", parse_contract)
legal_graph.add_node("analyze_section_worker", analyze_section_worker)
legal_graph.add_node("aggregate", aggregate_risks)
legal_graph.add_node("await_approval", await_lawyer_approval)
legal_graph.add_node("generate_report", generate_report)

# Build the flow
legal_graph.add_edge(START, "parse")
legal_graph.add_conditional_edges("parse", route_to_analyzers)
legal_graph.add_edge("analyze_section_worker", "aggregate")
legal_graph.add_conditional_edges(
    "aggregate",
    check_review_needed,
    {
        "await_approval": "await_approval",
        "generate_report": "generate_report"
    }
)
legal_graph.add_edge("await_approval", "generate_report")
legal_graph.add_edge("generate_report", END)

# Compile with checkpointing
checkpointer = MemorySaver()
legal_app = legal_graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["await_approval"]
)

# Execute the full pipeline
print("="*60)
print("LEGAL CONTRACT REVIEW SYSTEM - FULL DEMO")
print("="*60)

thread_config = {"configurable": {"thread_id": "contract_789"}}

# Phase 1: Run until lawyer checkpoint
result = legal_app.invoke(
    {
        "contract_text": "Sample contract...",
        "sections": [],
        "section_analyses": [],
        "overall_risk": "",
        "requires_lawyer_review": False,
        "lawyer_approved": False,
        "final_report": ""
    },
    config=thread_config
)

print("\n" + "="*60)
print("(Lawyer reviews and approves the contract...)")
print("="*60)

# Simulate lawyer approval
import time
time.sleep(2)

# Phase 2: Resume from checkpoint
result = legal_app.invoke(None, config=thread_config)

print("\n" + "="*60)
print("✅ PIPELINE COMPLETE")
print("="*60)

LEGAL CONTRACT REVIEW SYSTEM - FULL DEMO

📄 Parsing contract into sections...
   Found 5 sections

🚀 Routing sections to parallel analyzers...
   🔍 Worker 0: Analyzing section...
   🔍 Worker 0: Risk=LOW, Issues=0
   🔍 Worker 3: Analyzing section...
   🔍 Worker 3: Risk=CRITICAL, Issues=2
   🔍 Worker 1: Analyzing section...
   🔍 Worker 1: Risk=LOW, Issues=0
   🔍 Worker 2: Analyzing section...
   🔍 Worker 2: Risk=LOW, Issues=0
   🔍 Worker 4: Analyzing section...
   🔍 Worker 4: Risk=MEDIUM, Issues=1


InvalidUpdateError: At key 'risk_level': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE

---

## Interview Prep Summary

### Key Talking Points

1. **StateGraph = Your Game State Machines**
   - TypedDict for type safety
   - Nodes modify state, edges define flow
   - Like animation state machines but for agent logic

2. **Conditional Edges = Dynamic Routing**
   - Functions that inspect state and decide next node
   - Enable loops (self-edges) for agent iterations
   - The core of ReAct pattern

3. **Checkpointing = Time Travel**
   - Human-in-the-loop approval (critical for legal)
   - Pause/resume execution
   - Debug and A/B test different paths

4. **Subgraphs = Your Manager-Worker Pattern**
   - Compiled graphs used as nodes
   - Modular, reusable agent components
   - Manager orchestrates specialized workers

5. **Parallelization = Scale**
   - `Send()` API for dynamic fan-out
   - Analyze multiple contract sections simultaneously
   - Automatic synchronization

6. **Streaming = UX**
   - Real-time progress updates
   - Critical for long-running legal analysis
   - Better user experience for lawyers

### What They're Looking For

**Technical Skills:**
- Understanding of stateful agent architectures
- Experience with orchestration patterns
- Knowledge of LLM agent frameworks

**Legal Domain:**
- Human-in-the-loop workflows
- Risk assessment pipelines
- Document processing at scale

**Your Strengths to Highlight:**
- Game dev state machine expertise → LangGraph state graphs
- Claude Code orchestrator experience → Manager-worker pattern
- Multi-agent parallel execution → Send() API

### Questions to Ask Them

1. "How do you handle version control for agent prompts across your Legal Document Graph?"
2. "What's your strategy for evaluating agent performance on legal accuracy?"
3. "How do you integrate human lawyer feedback back into the training loop?"
4. "What's the most challenging aspect of the OpenAI partnership integration?"

---

## Next Steps

Before Tuesday:
1. ✅ Run all cells in this notebook
2. 📚 Read LangGraph docs on:
   - [ReAct Agent](https://langchain-ai.github.io/langgraph/tutorials/introduction/)
   - [Human-in-the-loop](https://langchain-ai.github.io/langgraph/how-tos/human-in-the-loop/)
   - [Subgraphs](https://langchain-ai.github.io/langgraph/how-tos/subgraph/)
3. 🔧 Build a mini contract analyzer with real Claude calls
4. 💪 Practice explaining patterns using your game dev analogies

**You've got this! Your orchestrator experience maps perfectly to LangGraph.**