# Open in Colab
<a target="_blank" href="https://colab.research.google.com/github/Nicolepcx/ai-agents-the-definitive-guide/blob/main/CH02/ch02_HITL.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>


# About this notebook

This notebook is a hands-on tour of Human-in-the-Loop patterns for LangGraph. It shows how to add precise human control to agent workflows, from lightweight approval gates to full interactive editing, while keeping the code compact and production minded.

## What you will learn

1. How to wire LangGraph interrupts to pause a run, collect human input, and resume deterministically.
2. How to checkpoint state with `InMemorySaver` so a run can stop and continue without losing context.
3. How to wrap tools so a human can accept, edit, or override a call before it executes.
4. How to drive simple ReAct style loops that honor human review of tool calls.
5. How to implement parallel interrupts and resume them with a single map.

## Models and configuration

The notebook auto selects the model at runtime.

* If `NEBIUS_API_KEY` is present, it calls `meta-llama/Llama-3.3-70B-Instruct-fast` through the Nebius endpoint using `ChatOpenAI` with a custom `base_url`.
* Otherwise it defaults to `gpt-4o-mini` through `langchain_openai`.

Environment variables are loaded from `.env` via `python-dotenv`:

* `OPENAI_API_KEY` for OpenAI models
* `NEBIUS_API_KEY` for Nebius Studio

## The patterns showcased

**Pattern A — Human feedback loop on content generation**
A simple write–review loop for LinkedIn posts. The model drafts. The human provides iterative feedback via `interrupt`. The graph loops until the human types done. Useful for any short form content workflow that benefits from iterative refinement.

**Pattern B — Approval gate before a critical call**
The model proposes an HTTP request as JSON. A human inspects and can approve or revise before the code performs the external call. This is a minimal but powerful safety interlock for network or finance critical actions.

**Pattern C — Review and edit state**
The model writes a short summary. The human edits the text in place. The edited text becomes the new state. This pattern is ideal for compliance or brand voice checks.

**Pattern D — Parallel interrupts with a single resume map**
Two independent interrupts fire at once. The runner prints both payloads and collects a resume value for each, then resumes the graph in one step. This is a template for multi item review tasks.

**Pattern E — Tool call review inside a tiny ReAct loop**
A tool is wrapped with `add_hitl`. Before execution, a human can accept, edit arguments, or respond with a stub result. The loop continues until the model no longer requests tools. This is the smallest possible example of supervised tool use.

**Pattern F — Static interrupts for debugging**
Graph level interrupts are registered before and after specific nodes to create deterministic breakpoints. This is useful for stepwise debugging and unit style tests.

## How the interactive runner works

* Each demo builds a compiled graph and starts a new thread id for clean state.
* When an interrupt occurs, the terminal prints the payload and waits for input.
* You can paste raw strings or JSON. For the tool wrapper in Pattern E you can provide:

  * `{"type": "accept"}`
  * `{"type": "edit", "args": {"args": {"query": "weather in Zurich"}}}`
  * `{"type": "response", "args": "Skip for now"}`

For parallel interrupts the runner prints a numbered list and collects a resume value per interrupt id, then resumes once with a single `Command(resume=...)`.

## Dependencies

* `langgraph` for graphs, interrupts, checkpoints, and tasks
* `langchain_openai` for LLM bindings
* `python-dotenv` for environment loading
* `requests` used in Pattern B for a real HTTP GET

## How to run

1. Create a `.env` file with your keys. At least one of `OPENAI_API_KEY` or `NEBIUS_API_KEY` must be set.
2. Run the notebook and choose a demo from the menu printed by `main()`.
3. Follow the terminal prompts to provide feedback or approvals.

## Why this matters

Real systems in finance, research, and operations often need both autonomy and control. These patterns show how to add precise human control without fighting the agent architecture. Each pattern composes cleanly, so you can start small, measure impact, and expand to richer supervision where it adds the most value.


In [None]:
!pip install -q langgraph==0.6.7 langchain-openai==0.3.33 python-dotenv==1.1.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.3/153.3 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.0/75.0 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m447.5/447.5 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.9/43.9 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.1/56.1 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m216.7/216.7 kB[0m [31m16.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# --- API Key Setup ---
# Option 1 (preferred): create a `.env` file in your project folder with:
# OPENAI_API_KEY=your_openai_key_here
# SERPAPI_API_KEY=your_serpapi_key_here
#
# Option 2: set it directly in the notebook with magic:
# %env OPENAI_API_KEY=your_openai_key_here
# %env SERPAPI_API_KEY=your_serpapi_key_here

from dotenv import load_dotenv
import os

# Load from .env if available
load_dotenv()

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
serp_api_key = os.getenv("SERPAPI_API_KEY")
NEBIUS_API_KEY = os.getenv("NEBIUS_API_KEY")

# Fallback: ask if still missing
if not OPENAI_API_KEY:
    print("⚠️ OPENAI_API_KEY not found. You can set it with `%env` in the notebook or enter it below.")
    OPENAI_API_KEY = input("Enter your OPENAI_API_KEY: ").strip()

if not serp_api_key:
    print("⚠️ SERPAPI_API_KEY not found. You can set it with `%env` in the notebook or enter it below.")
    serp_api_key = input("Enter your SERPAPI_API_KEY: ").strip()

if not NEBIUS_API_KEY:
    print("⚠️ NEBIUS_API_KEY not found. You can set it with `%env` in the notebook or enter it below.")
    serp_api_key = input("Enter your NEBIUS_API_KEY: ").strip()

print("✅ API keys loaded successfully!")



# Imports

In [None]:
from __future__ import annotations
import os, uuid, json, re, sys
from typing import Any, Dict, List, Optional, TypedDict, Literal

from dotenv import load_dotenv

from langgraph.graph import StateGraph
from langgraph.constants import START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.graph.message import add_messages

from langchain_openai import ChatOpenAI
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool, BaseTool


# LLM setup (Nebius if available, else OpenAI)

In [None]:
NEBIUS_API_KEY = os.getenv("NEBIUS_API_KEY")
if NEBIUS_API_KEY:
    LLM = ChatOpenAI(
        model="meta-llama/Llama-3.3-70B-Instruct-fast",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
        api_key=NEBIUS_API_KEY,
        base_url="https://api.studio.nebius.ai/v1/",
    )
else:
    LLM = ChatOpenAI(model="gpt-4o-mini", temperature=0)

CHECKPOINTER = InMemorySaver()

def jdump(x):
    try:
        return json.dumps(x, indent=2, ensure_ascii=False, default=str)
    except Exception:
        return str(x)



# Pattern A: Human feedback loop on content generation

In [None]:
class AState(TypedDict, total=False):
    linkedin_topic: str
    generated_post: str
    human_feedback: List[str]

def a_model(state: AState) -> AState:
    topic = state["linkedin_topic"]
    fb = state.get("human_feedback", [])
    prompt = f"""
LinkedIn Topic: {topic}
Most recent human feedback: {fb[-1] if fb else "No feedback yet"}

Write a concise LinkedIn post. Consider feedback if present.
"""
    resp = LLM.invoke(prompt).content
    print("\n[model] Draft:\n" + resp + "\n")
    return {"generated_post": resp, "human_feedback": fb}

def a_human(state: AState):
    print("\n[human] awaiting feedback. Type done to finish")
    payload = {
        "generated_post": state["generated_post"],
        "message": "Provide feedback or type done"
    }
    feedback = interrupt(payload)
    print("[human] feedback:", feedback)
    if isinstance(feedback, str) and feedback.strip().lower() in {"done", "quit", "exit"}:
        return Command(goto="a_end", update={"human_feedback": state.get("human_feedback", []) + ["Finalised"]})
    return Command(goto="a_model", update={"human_feedback": state.get("human_feedback", []) + [str(feedback)]})

def a_end(state: AState) -> AState:
    print("\n[end] Final post:\n" + state["generated_post"])
    print("[end] Feedback trail:", state.get("human_feedback", []))
    return state

def build_graph_A():
    g = StateGraph(AState)
    g.add_node("a_model", a_model)
    g.add_node("a_human", a_human)
    g.add_node("a_end", a_end)
    g.set_entry_point("a_model")
    g.add_edge("a_model", "a_human")
    g.add_edge("a_end", END)
    return g.compile(checkpointer=CHECKPOINTER)


# Pattern B: Approval gate before critical call

In [None]:
class BState(TypedDict, total=False):
    proposed_request: Dict[str, Any]
    api_result: Dict[str, Any]
    decision: str

def b_propose(state: BState) -> BState:
    prompt = "Return only JSON with keys url and params for GET to https://httpbin.org/get using q and limit."
    text = LLM.invoke(prompt).content
    m = re.search(r"\{.*\}", text, re.S)
    data = {"url": "https://httpbin.org/get", "params": {"q": "fallback", "limit": 1}}
    if m:
        try:
            data = json.loads(m.group(0))
        except Exception:
            pass
    return {"proposed_request": data}

def b_gate(state: BState) -> Command[Literal["b_call", "b_propose"]]:
    v = interrupt({
        "question": "Approve or revise request",
        "proposed_request": state["proposed_request"],
        "schema": {
            "type": "object",
            "properties": {
                "action": {"enum": ["approve", "revise"]},
                "update": {"type": "object"}
            },
            "required": ["action"]
        }
    })
    action = v.get("action")
    if action == "approve":
        return Command(goto="b_call", update={"decision": "approved"})
    upd = v.get("update") or {}
    new_req = state["proposed_request"].copy()
    if "url" in upd:
        new_req["url"] = upd["url"]
    if isinstance(upd.get("params"), dict):
        new_req.setdefault("params", {}).update(upd["params"])
    return Command(goto="b_gate", update={"proposed_request": new_req, "decision": "revised"})

def b_call(state: BState) -> BState:
    import requests
    r = requests.get(state["proposed_request"]["url"], params=state["proposed_request"]["params"], timeout=10)
    return {"api_result": {"status_code": r.status_code, "url": r.url}}

def build_graph_B():
    g = StateGraph(BState)
    g.add_node("b_propose", b_propose)
    g.add_node("b_gate", b_gate)
    g.add_node("b_call", b_call)
    g.set_entry_point("b_propose")
    g.add_edge("b_propose", "b_gate")
    g.add_edge("b_gate", "b_call")
    g.add_edge("b_call", END)
    return g.compile(checkpointer=CHECKPOINTER)

# Pattern C: Review and edit state

In [None]:
class CState(TypedDict, total=False):
    summary: str

def c_write(state: CState) -> CState:
    text = LLM.invoke("Write 2 sentences about why human in the loop matters for agents").content
    return {"summary": text}

def c_edit(state: CState) -> CState:
    res = interrupt({
        "task": "Edit the summary text",
        "summary": state["summary"],
        "schema": {
            "type": "object",
            "properties": {"edited_text": {"type": "string"}},
            "required": ["edited_text"]
        }
    })
    return {"summary": res["edited_text"]}

def build_graph_C():
    g = StateGraph(CState)
    g.add_node("c_write", c_write)
    g.add_node("c_edit", c_edit)
    g.set_entry_point("c_write")
    g.add_edge("c_write", "c_edit")
    g.add_edge("c_edit", END)
    return g.compile(checkpointer=CHECKPOINTER)

# Pattern D: Parallel interrupts with single resume map

In [None]:
class DState(TypedDict, total=False):
    text_1: str
    text_2: str

def d_h1(state: DState):
    v = interrupt({"text_to_revise": state["text_1"]})
    return {"text_1": v}

def d_h2(state: DState):
    v = interrupt({"text_to_revise": state["text_2"]})
    return {"text_2": v}

def build_graph_D():
    g = StateGraph(DState)
    g.add_node("d_h1", d_h1)
    g.add_node("d_h2", d_h2)
    g.add_edge(START, "d_h1")
    g.add_edge(START, "d_h2")
    g.add_edge("d_h1", END)
    g.add_edge("d_h2", END)
    return g.compile(checkpointer=CHECKPOINTER)

# Pattern E: Tool call review in a tiny ReAct loop

In [None]:
def add_hitl(tool_obj: BaseTool | Any) -> BaseTool:
    if not isinstance(tool_obj, BaseTool):
        tool_obj = tool(tool_obj)

    @tool(tool_obj.name, description=tool_obj.description, args_schema=tool_obj.args_schema)
    def wrapped(**tool_input):
        request = [{
            "action_request": {"action": tool_obj.name, "args": tool_input},
            "config": {"allow_accept": True, "allow_edit": True, "allow_respond": True},
            "description": "Review this tool call"
        }]
        response = interrupt(request)[0]
        if response["type"] == "accept":
            return tool_obj.invoke(tool_input)
        if response["type"] == "edit":
            new_args = response["args"]["args"]
            return tool_obj.invoke(new_args)
        if response["type"] == "response":
            return response["args"]
        raise ValueError("Unsupported interrupt response type")
    return wrapped

@tool("echo_search")
def echo_search(query: str) -> str:
    """Demo tool that simulates a search by echoing the query."""
    return f"Search results for: {query}"

WRAPPED_SEARCH = add_hitl(echo_search)

@task
def e_call_model(messages: List[Dict[str, Any]]):
    return LLM.bind_tools([WRAPPED_SEARCH]).invoke(messages)

@task
def e_call_tool(tool_call: Dict[str, Any]) -> ToolMessage:
    obs = WRAPPED_SEARCH.invoke(tool_call["args"])
    return ToolMessage(content=obs, tool_call_id=tool_call["id"])

from langgraph.func import entrypoint as ep

@ep(checkpointer=CHECKPOINTER)
def e_agent(messages: List[Dict[str, Any]], previous: Optional[List[Dict[str, Any]]] = None):
    if previous is not None:
        messages = add_messages(previous, messages)
    llm_msg = e_call_model(messages).result()
    while True:
        tcs = getattr(llm_msg, "tool_calls", None) or []
        if not tcs:
            break
        tool_results = [e_call_tool(tc).result() for tc in tcs]
        messages = add_messages(messages, [llm_msg, *tool_results])
        llm_msg = e_call_model(messages).result()
    messages = add_messages(messages, llm_msg)
    return ep.final(value=llm_msg, save=messages)

# Pattern F: Static interrupts for debugging

In [None]:
def build_graph_F():
    class S(TypedDict, total=False):
        x: int

    def a(state: S) -> S:
        return {"x": 1}

    def b(state: S) -> S:
        return {"x": state["x"] + 1}

    g = StateGraph(S)
    g.add_node("a", a)
    g.add_node("b", b)
    g.set_entry_point("a")
    g.add_edge("a", "b")
    g.add_edge("b", END)
    return g.compile(
        checkpointer=CHECKPOINTER,
        interrupt_before=["a"],
        interrupt_after=["b"]
    )



# Interactive runner

In [None]:
def wait_for_interrupt_and_prompt(app, cfg):
    """
    Drive interrupts from the terminal.
    Supports single payloads and lists used by wrapped tools.
    Also supports parallel interrupts by auto building a resume map.
    """
    state = app.get_state(cfg)
    ints = getattr(state, "interrupts", []) or []
    if not ints:
        print("No interrupts pending")
        return None

    if len(ints) > 1:
        print("\nMultiple interrupts pending:")
        for i, it in enumerate(ints, 1):
            print(f"[{i}] id={it.interrupt_id} value={jdump(it.value)}")
        print("Enter values per interrupt. Leave blank to echo original.")
        resume_map = {}
        for it in ints:
            val = input(f"Value for {it.interrupt_id}: ").strip()
            if val:
                # Try JSON, else raw string
                try:
                    resume_map[it.interrupt_id] = json.loads(val)
                except Exception:
                    resume_map[it.interrupt_id] = val
            else:
                resume_map[it.interrupt_id] = it.value
        return Command(resume=resume_map)

    # Single interrupt path
    it = ints[0]
    print("\nInterrupt payload:")
    print(jdump(it.value))
    print("Enter resume value. For tool wrapper use examples like")
    print("  {\"type\": \"accept\"}")
    print("  {\"type\": \"edit\", \"args\": {\"args\": {\"query\": \"weather in NY\"}}}")
    print("  {\"type\": \"response\", \"args\": \"Skip tool right now\"}")
    raw = input("resume> ").strip()
    if not raw:
        val = it.value
    else:
        try:
            val = json.loads(raw)
        except Exception:
            val = raw
    return Command(resume=val)

def run_pattern_A():
    app = build_graph_A()
    cfg = {"configurable": {"thread_id": f"A-{uuid.uuid4()}"}}
    topic = input("Enter LinkedIn topic: ").strip() or "Human in the loop for agents"
    stream = app.stream({"linkedin_topic": topic, "human_feedback": []}, config=cfg)
    while True:
        try:
            step = next(stream)
        except StopIteration:
            break
        if "__interrupt__" in step:
            cmd = wait_for_interrupt_and_prompt(app, cfg)
            stream = app.stream(cmd, cfg)
    print("Done A")

def run_pattern_B():
    app = build_graph_B()
    cfg = {"configurable": {"thread_id": f"B-{uuid.uuid4()}"}}
    _ = app.invoke({}, config=cfg)
    while True:
        cmd = wait_for_interrupt_and_prompt(app, cfg)
        _ = app.invoke(cmd, config=cfg)
        state = app.get_state(cfg)
        if not state.interrupts:
            print("Final state:", state.values)
            break
    print("Done B")

def run_pattern_C():
    app = build_graph_C()
    cfg = {"configurable": {"thread_id": f"C-{uuid.uuid4()}"}}
    _ = app.invoke({}, config=cfg)
    cmd = wait_for_interrupt_and_prompt(app, cfg)
    final = app.invoke(cmd, config=cfg)
    print("Final C:", final)

def run_pattern_D():
    app = build_graph_D()
    cfg = {"configurable": {"thread_id": f"D-{uuid.uuid4()}"}}
    _ = app.invoke({"text_1": "alpha", "text_2": "beta"}, config=cfg)
    cmd = wait_for_interrupt_and_prompt(app, cfg)
    final = app.invoke(cmd, config=cfg)
    print("Final D:", final)

def run_pattern_E():
    cfg = {"configurable": {"thread_id": f"E-{uuid.uuid4()}"}}
    user_msg = {"role": "user", "content": "Search for current weather in San Francisco"}
    stream = e_agent.stream([user_msg], cfg)
    while True:
        try:
            step = next(stream)
        except StopIteration:
            break
        if "__interrupt__" in step:
            cmd = wait_for_interrupt_and_prompt(e_agent, cfg)
            stream = e_agent.stream(cmd, cfg)
        else:
            print(step)
    print("Done E")

def run_pattern_F():
    app = build_graph_F()
    cfg = {"configurable": {"thread_id": f"F-{uuid.uuid4()}"}}
    _ = app.invoke({}, config=cfg)
    print("Breakpoint before a recorded. Resuming")
    _ = app.invoke(None, config=cfg)
    print("Breakpoint after b recorded. Resuming")
    final = app.invoke(None, config=cfg)
    print("Final F:", final)

def main():
    menu = """
Pick a demo
1. Human feedback loop for writing
2. Approval gate before API call
3. Review and edit state
4. Parallel interrupts with resume map
5. Tool call review in a tiny ReAct loop
6. Static interrupts for debugging
q. Quit
> """
    while True:
        choice = input(menu).strip().lower()
        if choice == "1":
            run_pattern_A()
        elif choice == "2":
            run_pattern_B()
        elif choice == "3":
            run_pattern_C()
        elif choice == "4":
            run_pattern_D()
        elif choice == "5":
            run_pattern_E()
        elif choice == "6":
            run_pattern_F()
        elif choice in {"q", "quit", "exit"}:
            sys.exit(0)
        else:
            print("Unknown choice")

if __name__ == "__main__":
    main()



Pick a demo
1. Human feedback loop for writing
2. Approval gate before API call
3. Review and edit state
4. Parallel interrupts with resume map
5. Tool call review in a tiny ReAct loop
6. Static interrupts for debugging
q. Quit
> 1
Enter LinkedIn topic: Ai agents

[model] Draft:
"As we continue to push the boundaries of innovation, AI agents are revolutionizing the way we work and interact. From virtual assistants to intelligent chatbots, these agents are streamlining processes, enhancing customer experiences, and unlocking new possibilities. What are your thoughts on the future of AI agents? How do you see them transforming your industry? Let's start the conversation! #AI #ArtificialIntelligence #Innovation"


[human] awaiting feedback. Type done to finish

Interrupt payload:
{
  "generated_post": "\"As we continue to push the boundaries of innovation, AI agents are revolutionizing the way we work and interact. From virtual assistants to intelligent chatbots, these agents are streaml

SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
