In [11]:
# Workshop Setup
import sys
sys.path.insert(0, '..')

from setup_llm import verify_setup, get_chat_model

verify_setup()

print("\nüìö Session 2, Module 2: Advanced Human-in-the-Loop")
print("=" * 60)
print("\nThis module covers:")
print("\nPart 1: Dynamic Interrupts (15 min)")
print("  ‚Ä¢ interrupt() vs compile-time breakpoints")
print("  ‚Ä¢ Custom payloads and return values")
print("  ‚Ä¢ Conditional interrupts at runtime")
print("\nPart 2: Approve/Edit/Reject Patterns (15 min)")
print("  ‚Ä¢ Structured human responses")
print("  ‚Ä¢ Command(resume=...) for continuation")
print("  ‚Ä¢ Multi-step approval workflows")
print("\nPart 3: Durable Execution with @task (10 min)")
print("  ‚Ä¢ Fault-tolerant long operations")
print("  ‚Ä¢ Automatic checkpointing")
print("  ‚Ä¢ RetryPolicy for resilience")
print("\nüéØ Build production-ready human checkpoints!")
print("\n‚ö†Ô∏è  Prerequisites: Module 1 (Memory Architecture)")

üîç Checking LLM Configuration...
üì° Provider: DIAL (Azure OpenAI via EPAM AI Proxy)
‚úÖ DIAL_API_KEY is set

üìã Configuration:
   AZURE_OPENAI_ENDPOINT: https://ai-proxy.lab.epam.com
   AZURE_OPENAI_API_VERSION: 2024-02-01
   AZURE_OPENAI_DEPLOYMENT_NAME: gpt-4

‚úÖ DIAL setup verified successfully!

üìö Session 2, Module 2: Advanced Human-in-the-Loop

This module covers:

Part 1: Dynamic Interrupts (15 min)
  ‚Ä¢ interrupt() vs compile-time breakpoints
  ‚Ä¢ Custom payloads and return values
  ‚Ä¢ Conditional interrupts at runtime

Part 2: Approve/Edit/Reject Patterns (15 min)
  ‚Ä¢ Structured human responses
  ‚Ä¢ Command(resume=...) for continuation
  ‚Ä¢ Multi-step approval workflows

Part 3: Durable Execution with @task (10 min)
  ‚Ä¢ Fault-tolerant long operations
  ‚Ä¢ Automatic checkpointing
  ‚Ä¢ RetryPolicy for resilience

üéØ Build production-ready human checkpoints!

‚ö†Ô∏è  Prerequisites: Module 1 (Memory Architecture)


## Session 1 Recap: Basic HITL

In **Session 1**, you learned static breakpoints:

```python
# Static - defined at compile time
app = graph.compile(
    checkpointer=memory,
    interrupt_before=["node_name"]  # Always pauses here
)

# Resume with None
result = app.invoke(None, config)
```

**Limitations**:
- Breakpoints are fixed at compile time
- No custom payload when pausing
- Can't conditionally interrupt
- No structured response from human

---

## Part 1: Dynamic Interrupts with `interrupt()`

LangGraph v1 introduces **dynamic interrupts** - call anywhere in code:

```python
from langgraph.types import interrupt

def my_node(state):
    # Dynamic - call anywhere, with payload
    response = interrupt({
        "question": "Should I proceed?",
        "data": state["data"]
    })
    return {"approved": response == "yes"}
```

### Comparison

| Feature | `interrupt_before` (S1) | `interrupt()` (S2) |
|---------|------------------------|--------------------|
| When defined | Compile time | Runtime |
| Custom payload | ‚ùå No | ‚úÖ Yes |
| Conditional | ‚ùå No | ‚úÖ Yes |
| Return value | ‚ùå No | ‚úÖ Yes |

In [12]:
# Demo: Dynamic interrupt with payload
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command

class ExpenseState(TypedDict):
    amount: float
    description: str
    status: Literal["pending", "approved", "rejected"]
    approver_note: str

def submit_expense(state: ExpenseState) -> dict:
    """Employee submits expense"""
    print(f"üìù Expense submitted: ${state['amount']:.2f} - {state['description']}")
    return {"status": "pending"}

def request_approval(state: ExpenseState) -> dict:
    """
    Request manager approval using dynamic interrupt.
    Only interrupt if amount > $100 (conditional!)
    """
    if state["amount"] > 100:
        print(f"\n‚è∏Ô∏è  Amount ${state['amount']:.2f} > $100 - Manager approval needed")
        
        # Dynamic interrupt with custom payload
        decision = interrupt({
            "type": "expense_approval",
            "amount": state["amount"],
            "description": state["description"],
            "options": ["approve", "reject"]
        })
        
        # After resume, decision contains the response
        return {
            "status": decision.get("decision", "rejected"),
            "approver_note": decision.get("note", "")
        }
    else:
        print(f"‚úÖ Amount ${state['amount']:.2f} <= $100 - Auto-approved")
        return {"status": "approved", "approver_note": "Auto-approved (under limit)"}

def finalize(state: ExpenseState) -> dict:
    """Process the final decision"""
    if state["status"] == "approved":
        print(f"\n‚úÖ APPROVED: ${state['amount']:.2f}")
    else:
        print(f"\n‚ùå REJECTED: {state['approver_note']}")
    return {}

# Build graph
graph = StateGraph(ExpenseState)
graph.add_node("submit", submit_expense)
graph.add_node("approve", request_approval)
graph.add_node("finalize", finalize)

graph.add_edge(START, "submit")
graph.add_edge("submit", "approve")
graph.add_edge("approve", "finalize")
graph.add_edge("finalize", END)

app = graph.compile(checkpointer=MemorySaver())
print("‚úÖ Expense approval workflow ready")

‚úÖ Expense approval workflow ready


In [13]:
# Test 1: Small expense (auto-approved)
print("=" * 50)
print("TEST 1: Small Expense (Auto-Approved)")
print("=" * 50)

result = app.invoke(
    {"amount": 50.0, "description": "Office supplies", "status": "pending", "approver_note": ""},
    {"configurable": {"thread_id": "expense_001"}}
)
print(f"Final status: {result['status']}")

TEST 1: Small Expense (Auto-Approved)
üìù Expense submitted: $50.00 - Office supplies
‚úÖ Amount $50.00 <= $100 - Auto-approved

‚úÖ APPROVED: $50.00
Final status: approved


In [14]:
# Test 2: Large expense (needs approval - hits interrupt)
print("\n" + "=" * 50)
print("TEST 2: Large Expense (Needs Approval)")
print("=" * 50)

config = {"configurable": {"thread_id": "expense_002"}}

result = app.invoke(
    {"amount": 500.0, "description": "Conference ticket", "status": "pending", "approver_note": ""},
    config
)

# Check if interrupted
if "__interrupt__" in result:
    payload = result["__interrupt__"][0].value
    print(f"\nüõë WORKFLOW PAUSED")
    print(f"   Amount: ${payload['amount']}")
    print(f"   Description: {payload['description']}")
    print(f"   Waiting for manager decision...")


TEST 2: Large Expense (Needs Approval)
üìù Expense submitted: $500.00 - Conference ticket

‚è∏Ô∏è  Amount $500.00 > $100 - Manager approval needed

üõë WORKFLOW PAUSED
   Amount: $500.0
   Description: Conference ticket
   Waiting for manager decision...


---

## Part 2: Resuming with `Command(resume=...)`

The new way to resume with structured data:

```python
from langgraph.types import Command

# Resume with structured response
result = app.invoke(
    Command(resume={"decision": "approved", "note": "Looks good"}),
    config
)
```

The value passed to `Command(resume=...)` becomes the return value of `interrupt()`.

In [15]:
# Resume with approval
print("\n" + "=" * 50)
print("MANAGER APPROVES")
print("=" * 50)

final_result = app.invoke(
    Command(resume={"decision": "approved", "note": "Approved for Q4 conference"}),
    config  # Same thread_id to resume
)

print(f"\nFinal status: {final_result['status']}")
print(f"Note: {final_result['approver_note']}")


MANAGER APPROVES

‚è∏Ô∏è  Amount $500.00 > $100 - Manager approval needed

‚úÖ APPROVED: $500.00

Final status: approved
Note: Approved for Q4 conference


In [16]:
# Test 3: Rejection scenario
print("\n" + "=" * 50)
print("TEST 3: Expense Rejected")
print("=" * 50)

config3 = {"configurable": {"thread_id": "expense_003"}}

# Submit large expense
result = app.invoke(
    {"amount": 2000.0, "description": "New laptop", "status": "pending", "approver_note": ""},
    config3
)

# Manager rejects
if "__interrupt__" in result:
    final = app.invoke(
        Command(resume={"decision": "rejected", "note": "Use IT procurement process instead"}),
        config3
    )
    print(f"Final status: {final['status']}")
    print(f"Note: {final['approver_note']}")


TEST 3: Expense Rejected
üìù Expense submitted: $2000.00 - New laptop

‚è∏Ô∏è  Amount $2000.00 > $100 - Manager approval needed

‚è∏Ô∏è  Amount $2000.00 > $100 - Manager approval needed

‚ùå REJECTED: Use IT procurement process instead
Final status: rejected
Note: Use IT procurement process instead


---

## Part 3: Durable Execution with `@task`

### The Problem

If a workflow fails mid-execution and restarts:

```python
def my_node(state):
    order_id = uuid.uuid4()  # Generates NEW id on retry!
    call_payment_api(order_id)  # Runs AGAIN on retry!
```

### The Solution: `@task`

Wrap non-deterministic operations to cache their results:

```python
from langgraph.func import task

@task
def call_payment_api(order_id):
    """Result is cached - won't re-run on resume"""
    return payment_gateway.charge(order_id)
```

In [17]:
# Demo: @task for durable execution
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict
import random

@task
def generate_order_id():
    """Cached - same ID even if workflow retries"""
    order_id = f"ORD-{random.randint(10000, 99999)}"
    print(f"   Generated: {order_id}")
    return order_id

@task
def process_payment(order_id: str, amount: float):
    """Cached - won't double-charge on retry"""
    print(f"   üí≥ Charging ${amount} for {order_id}")
    return {"status": "success", "transaction_id": f"TXN-{random.randint(1000, 9999)}"}

class OrderState(TypedDict):
    amount: float
    order_id: str
    payment_result: dict

def process_order(state: OrderState) -> dict:
    """Process order with durable tasks"""
    print("\nüõí Processing order...")
    
    # These are cached - safe to retry
    order_id = generate_order_id().result()
    payment = process_payment(order_id, state["amount"]).result()
    
    return {"order_id": order_id, "payment_result": payment}

# Build graph
order_graph = StateGraph(OrderState)
order_graph.add_node("process", process_order)
order_graph.add_edge(START, "process")
order_graph.add_edge("process", END)

order_app = order_graph.compile(checkpointer=MemorySaver())
print("‚úÖ Durable order workflow ready")

‚úÖ Durable order workflow ready


In [18]:
# Run durable workflow
result = order_app.invoke(
    {"amount": 99.99, "order_id": "", "payment_result": {}},
    {"configurable": {"thread_id": "order_001"}}
)

print(f"\n‚úÖ Order completed!")
print(f"   Order ID: {result['order_id']}")
print(f"   Payment: {result['payment_result']}")


üõí Processing order...
   Generated: ORD-77880
   üí≥ Charging $99.99 for ORD-77880

‚úÖ Order completed!
   Order ID: ORD-77880
   Payment: {'status': 'success', 'transaction_id': 'TXN-8664'}


---

## Part 4: Fault Tolerance with RetryPolicy

Handle transient failures automatically:

```python
from langgraph.pregel import RetryPolicy

# Retry up to 3 times with exponential backoff
app = graph.compile(
    checkpointer=checkpointer,
    retry_policy=RetryPolicy(max_attempts=3)
)
```

### RetryPolicy Options

| Parameter | Default | Description |
|-----------|---------|-------------|
| `max_attempts` | 3 | Maximum retry attempts |
| `initial_interval` | 0.5s | Initial wait between retries |
| `backoff_factor` | 2 | Multiply interval each retry |
| `max_interval` | 128s | Maximum wait time |

In [19]:
# Reference: RetryPolicy configuration
print("RetryPolicy Example:")
print("""
from langgraph.pregel import RetryPolicy

# Configure retry behavior
retry = RetryPolicy(
    max_attempts=3,           # Try up to 3 times
    initial_interval=0.5,     # Wait 0.5s after first failure
    backoff_factor=2,         # Double wait time each retry
    max_interval=10,          # Never wait more than 10s
    retry_on=Exception        # Retry on any exception
)

app = graph.compile(
    checkpointer=checkpointer,
    retry_policy=retry
)

# Timeline: fail ‚Üí 0.5s ‚Üí retry ‚Üí fail ‚Üí 1s ‚Üí retry ‚Üí fail ‚Üí 2s ‚Üí retry
""")

RetryPolicy Example:

from langgraph.pregel import RetryPolicy

# Configure retry behavior
retry = RetryPolicy(
    max_attempts=3,           # Try up to 3 times
    initial_interval=0.5,     # Wait 0.5s after first failure
    backoff_factor=2,         # Double wait time each retry
    max_interval=10,          # Never wait more than 10s
    retry_on=Exception        # Retry on any exception
)

app = graph.compile(
    checkpointer=checkpointer,
    retry_policy=retry
)

# Timeline: fail ‚Üí 0.5s ‚Üí retry ‚Üí fail ‚Üí 1s ‚Üí retry ‚Üí fail ‚Üí 2s ‚Üí retry



---

## Key Takeaways

### Dynamic Interrupts
‚úÖ `interrupt(payload)` - pause anywhere with custom data  
‚úÖ Conditional interrupts - only pause when needed  
‚úÖ Returns structured response from human  

### Command(resume=...)
‚úÖ Structured input when resuming  
‚úÖ Type-safe human responses  
‚úÖ Replaces `invoke(None, config)`  

### Durable Execution
‚úÖ `@task` caches results across retries  
‚úÖ Prevents duplicate side effects  
‚úÖ Essential for payments, emails, API calls  

### Fault Tolerance
‚úÖ `RetryPolicy` handles transient failures  
‚úÖ Exponential backoff built-in  
‚úÖ Combine with `@task` for full durability  

---

## Next Steps

Continue to **Module 3: Self-Reflection & Quality** where you'll learn:
- Generate ‚Üí Critique ‚Üí Revise loops
- LLM-as-Judge pattern for automated evaluation
- Quality gates and thresholds
- Feedback-based improvement

Open: `module-3-self-reflection.ipynb`