# Supervisor Multi Agent - Human-In-The-Loop

![Supervisor HITL Interrupt/Resume](images/supervisor_hitl_interrupt_resume.png)

In [None]:
def print_tool_approval(payload):
    tool = payload.get("awaiting", "unknown_tool")
    args = payload.get("args", {})

    print("—-- Approval needed —--")
    print(f"Tool: {tool}")

    if isinstance(args, dict) and args:
        print("Parameters:")
        for k, v in args.items():
            print(f"  - {k}: {v}")
    elif args:
        print(f"Parameters: {args}")
    else:
        print("No parameters.")

### research_agent

In [None]:
from typing import List, Dict
from langchain_core.tools import tool
from langchain_tavily import TavilySearch
from langgraph.prebuilt import create_react_agent
from IPython.display import Image, display
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langgraph.types import interrupt


research_system_message = """
You are a Research Agent specializing in identifying one promising company for potential investment based on the user’s request.

Responsibilities:
- Interpret the user’s theme or sector (e.g., AI, renewable energy, EVs) and propose ONE company that best fits.
- Use the available tools to discover, verify, and cross-check information.
- Prefer recent, credible information and avoid speculation.

Behavior:
- Do NOT place or simulate trades. Your job ends at recommending a company.
- Keep research tight (aim for 2–3 tool calls); refine queries if results are noisy.
- Ask a brief clarifying question only if the request is too vague to proceed.
- Do not fabricate numbers or facts. Be concise, neutral, and risk-aware.
- Consider the current date when discussing recency or momentum.

Outputs:
- 1–2 sentences explaining why the company fits the request (clear, plain language).
- Final line: CHOSEN_COMPANY: <Company Name>
"""

FORBIDDEN_KEYWORDS = {
    "403 forbidden", "access denied", "captcha",
    "has been denied", "not authorized", "verify you are a human"
}

@tool
def web_search(query: str, max_results: int = 5) -> Dict[str, List[Dict[str, str]]]:
    """
    General-purpose web search.

    Use when you need recent or broader information from the web to answer the user's request
    (e.g., discover relevant entities, find supporting context, or gather up-to-date references).

    Parameters:
    - query (str): The search query in plain language.
    - max_results (int): Number of results to return (default 5, max 10).

    Returns:
    - {"results": [{"title": str, "url": str, "snippet": str}, ...]}

    Example:
    - query: "emerging AI hardware companies"
    """
    max_results = max(1, min(max_results, 10))
    tavily = TavilySearch(max_results=max_results)
    raw = tavily.invoke({"query": query})

    results = [
        {k: v for k, v in page.items() if k != "raw_content"}  # drop heavy field
        for page in raw["results"]
        if not any(
            k in ((page.get("content") or "").lower())
            for k in FORBIDDEN_KEYWORDS
        )
    ]

    return {"results": results}


from langchain_community.document_loaders import WikipediaLoader

@tool
def wiki_search(topic: str, max_results: int = 5) -> dict:
    """
    Fetch a concise encyclopedic summary for a single entity or topic.

    When to use:
      - You need neutral background about a company, product, person, or concept.

    How to format `topic` (VERY IMPORTANT):
      - Pass a short, Wikipedia-friendly title or entity name.
      - Avoid questions or long queries. Prefer canonical forms.
      - If you have noisy text, reduce it to the key noun phrase.

    Good examples:
      - "NVIDIA", "OpenAI", "Large language model", "Electric vehicle"
    Avoid:
      - "What is NVIDIA and why is it important?", "tell me about AI chips 2025"

    Parameters:
      - topic (str): Canonical page title or concise entity/topic.
    """
    max_results = max(1, min(max_results, 10))
    wiki = WikipediaLoader(query=topic, load_max_docs=max_results)
    raw = wiki.load()

    results = [
      {
        "title": doc.metadata["title"],
        "summary": doc.metadata["summary"],
        "source": doc.metadata["source"]
      }
      for doc in raw
    ]

    return {"results": results}


research_agent = create_react_agent(
    model="openai:gpt-4o-mini",
    tools=[web_search, wiki_search],
    prompt=research_system_message,

    name="research_agent"
)

display(Image(research_agent.get_graph().draw_mermaid_png()))

### trading_agent with interruption

In [None]:
import requests
import yfinance as yf
from pprint import pformat
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage
from IPython.display import Image, display
from langgraph.types import interrupt

@tool("lookup_stock")
def lookup_stock_symbol(company_name: str) -> str:
    """
    Converts a company name to its stock symbol using a financial API.

    Parameters:
        company_name (str): The full company name (e.g., 'Tesla').

    Returns:
        str: The stock symbol (e.g., 'TSLA') or an error message.
    """
    api_url = "https://www.alphavantage.co/query"
    params = {
        "function": "SYMBOL_SEARCH",
        "keywords": company_name,
        "apikey": "your_alphavantage_api_key"
    }
    
    response = requests.get(api_url, params=params)
    data = response.json()
    
    if "bestMatches" in data and data["bestMatches"]:
        return data["bestMatches"][0]["1. symbol"]
    else:
        return f"Symbol not found for {company_name}."


@tool("fetch_stock_data")
def fetch_stock_data_raw(stock_symbol: str) -> dict:
    """
    Fetches comprehensive stock data for a given symbol and returns it as a combined dictionary.

    Parameters:
        stock_symbol (str): The stock ticker symbol (e.g., 'TSLA').
        period (str): The period to analyze (e.g., '1mo', '3mo', '1y').

    Returns:
        dict: A dictionary combining general stock info and historical market data.
    """
    period = "1mo"
    try:
        stock = yf.Ticker(stock_symbol)

        # Retrieve general stock info and historical market data
        stock_info = stock.info  # Basic company and stock data
        stock_history = stock.history(period=period).to_dict()  # Historical OHLCV data

        # Combine both into a single dictionary
        combined_data = {
            "stock_symbol": stock_symbol,
            "info": stock_info,
            "history": stock_history
        }

        return pformat(combined_data)

    except Exception as e:
        return {"error": f"Error fetching stock data for {stock_symbol}: {str(e)}"}

trading_system_message = """
You are a financial advisor assistant. Use the provided tools to ground your answers
in up-to-date market data. Be concise, factual, and risk-aware.

Be decisive: when you have sufficient information to act, proceed with tool calls without
asking for confirmation. Only if information is missing or uncertain, ask a concise 
clarifying question.

When preparing or describing actions, include appropriate parameters (e.g., symbol, shares,
limit price, budgets) based on available data. Do not fabricate numbers or facts.
"""

@tool
def place_order(
    symbol: str,
    action: str,
    shares: int,
    limit_price: float,
    order_type: str = "limit",
) -> dict:
    """
    Execute a stock order.

    Parameters:
    - symbol: Ticker
    - action: "buy" or "sell"
    - shares: Number of shares to trade (pre-computed by the agent)
    - limit_price: Limit price per share
    - order_type: Order type, default "limit"

    Returns:
    - status: Execution result (simulated)
    - symbol
    - shares
    - limit_price
    - total_spent
    - type: Order type used
    - action
    """
    total_spent = round(int(shares) * limit_price, 2)
    return {
        "status": "filled",
        "symbol": symbol,
        "shares": int(shares),
        "limit_price": limit_price,
        "total_spent": total_spent,
        "type": order_type,
        "action": action,
    }


RISKY_TOOLS = {"place_order"}

def halt_on_risky_tools(state):
    last = state["messages"][-1]
    if isinstance(last, AIMessage) and getattr(last, "tool_calls", None):
        for tc in last.tool_calls:
            if tc.get("name") in RISKY_TOOLS:
                decision = interrupt({"awaiting": tc["name"], "args": tc.get("args", {})})

                # tool approved
                if isinstance(decision, dict) and decision.get("approved"):
                    return {}

                # tool rejected
                tool_msg = ToolMessage(
                    content=f"Cancelled by human. Continue without executing that tool and provide next steps.",
                    tool_call_id=tc["id"],
                    name=tc["name"]
                )
                return {"messages": [tool_msg]}

    return {}


trading_agent = create_react_agent(
    model="openai:gpt-4o-mini",
    tools=[lookup_stock_symbol, fetch_stock_data_raw, place_order],
    prompt=trading_system_message,

    version="v2",
    post_model_hook=halt_on_risky_tools,

    name="trading_agent"
)

display(Image(trading_agent.get_graph().draw_mermaid_png()))

### supervisor with checkpointer

In [None]:
from IPython.display import Image, display
from langgraph_supervisor import create_supervisor
from langchain_openai import ChatOpenAI
from datetime import datetime
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver

@tool
def current_timestamp() -> dict:
    """
    Return the current local timestamp.

    Returns:
    - {"iso": str, "epoch": int, "tz": str}
      where:
      - iso: ISO 8601 string with timezone offset
      - epoch: Unix epoch seconds
      - tz: timezone name/offset
    """
    now = datetime.now().astimezone()
    return {
        "iso": now.isoformat(),
        "epoch": int(now.timestamp()),
        "tz": str(now.tzinfo),
    }

supervisor_system_message = """
You are a Supervisor coordinating two specialists:

- research_agent: finds ONE suitable company matching the user’s request and explains why.
- trading_agent: given a company and a budget/action, determines the ticker, checks market data, sizes the order, and places a trade.

Your goal is to satisfy the user’s intent with minimal steps.

Clocking / Context:
- If the conversation does not already contain a “NOW” context, first obtain it by calling the tool `current_timestamp`.
- After obtaining it, post a single one-line note into the thread so it’s available to all subsequent steps, e.g.:
  "System context — NOW: {iso} ({tz}); epoch={epoch}"
- Use this “NOW” as the reference for recency. Do not call `current_timestamp` again unless the prior “NOW” is missing or clearly stale.


Routing:
- If the request is thematic or ambiguous, ask research_agent.
- If the request already names a company and includes an action/budget, use trading_agent.
- If a single key detail is missing (e.g., budget), ask once, then proceed.

Handoff:
- From research_agent expect: one company name + 1–2 sentence rationale.
- Pass that company (and any provided budget/action) to trading_agent.

Guardrails:
- Don’t invent data. Don’t place trades without explicit user budget/action.
- Prefer the current date context when judging recency.

Output:
- Briefly state what you delegated and the result. If blocked, ask only for what’s needed to proceed.
"""

supervisor = create_supervisor(
    agents=[research_agent, trading_agent],
    tools=[current_timestamp],
    model=ChatOpenAI(model="gpt-4o-mini"),
    prompt=supervisor_system_message,
    
    output_mode="full_history"
).compile(checkpointer=InMemorySaver())

display(Image(supervisor.get_graph(xray=1).draw_mermaid_png()))

### testing positive

In [None]:
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

response = supervisor.invoke({"messages": [
    HumanMessage(content="""I want you to invest $1,000 into the most promising company in the AI sector.  
    Please research the options, pick the best candidate, and then go ahead and place a buy order for me.
    """)
]}, config)
for message in response['messages']:
    message.pretty_print()

In [None]:
"__interrupt__" in response

In [None]:
interrupts = response["__interrupt__"]
print_tool_approval(interrupts[0].value)

In [None]:
from langgraph.types import Command

response = supervisor.invoke(Command(resume={"approved": True}), config=config)
for message in response['messages']:
    message.pretty_print()

### testing negative

In [None]:
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

response = supervisor.invoke({"messages": [
    HumanMessage(content="""I want you to invest $1,000 into the most promising company in the AI sector.  
    Please research the options, pick the best candidate, and then go ahead and place a buy order for me.
    """)
]}, config)
for message in response['messages']:
    message.pretty_print()

In [None]:
"__interrupt__" in response

In [None]:
interrupts = response["__interrupt__"]
print_tool_approval(interrupts[0].value)

In [None]:
from langgraph.types import Command

response = supervisor.invoke(Command(resume={"approved": False}), config=config)
for message in response['messages']:
    message.pretty_print()

### updating request

In [None]:
response = supervisor.invoke({"messages": [
    HumanMessage(content="""
    Let's buy only 3 shares of NVIDIA!
    """)
]}, config)
for message in response['messages']:
    message.pretty_print()

In [None]:
"__interrupt__" in response

In [None]:
interrupts = response["__interrupt__"]
print_tool_approval(interrupts[0].value)

In [None]:
from langgraph.types import Command

response = supervisor.invoke(Command(resume={"approved": True}), config=config)
for message in response['messages']:
    message.pretty_print()

### new session

In [None]:
response = supervisor.invoke({"messages": [
    HumanMessage(content="""
    Can you tell me how much money I still have?
    """)
]}, config)
for message in response['messages']:
    message.pretty_print()

In [None]:
response = supervisor.invoke({"messages": [
    HumanMessage(content="""
    Ok then let's invest the money I still have into another promissing company in the AI sector.
    """)
]}, config)
for message in response['messages']:
    message.pretty_print()

In [None]:
"__interrupt__" in response

In [None]:
interrupts = response["__interrupt__"]
print_tool_approval(interrupts[0].value)

## Reference Links

**1. LangGraph Graph API: Command + HITL**

https://langchain-ai.github.io/langgraph/concepts/low_level/

→ Overview of core Graph API primitives (Send, Command), Command vs conditional edges, navigating to parent, and HITL with interrupt()/Command(resume=...).

**2. Add human intervention (HITL) How‑To**

https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/add-human-in-the-loop/

→ Practical guide to pausing with interrupt() and resuming with Command(resume=...), including common patterns and key considerations.

**3. Discussion: Resuming interrupted tool in multi‑agent architecture (#4341)**

https://github.com/langchain-ai/langgraph/discussions/4341

→ Clarifies that with a parent-owned checkpointer, child interrupts bubble to the parent and resumption must be invoked on the parent (same thread_id).