# LangGraph 体验

## Chapter 3: LangGraph with Human

Human in the loop 是指在 Agent 的工作流中，插入人类的参与。这样的设计将便于更好地控制工作流的走向，迎着用户更加满意的方向。

其实前文改变、更新、操纵工作流的过程已经充分体现了 Human in the loop 的设计思想，都是借助了外部的因素来控制工作流的走向。但为了更加方便的实现用户在 Agent 决策过程中的作用，我们还能够让人类能够更加有选择性地参与到工作流中。

正如我们一直所说的，State 类是整个工作流中的关键变量，我们可以通过定制化的 State 类来更好实现 Human in the loop 的设计。

In [None]:
from dotenv import load_dotenv
import os
load_dotenv()

LLM_NAME = os.getenv("ZHIPU_LLM_NAME") or None
API_BASE = os.getenv("ZHIPU_API_BASE") or None
API_KEY = os.getenv("ZHIPU_API_KEY") or None

### 1. 自定义 State 类 —— State 与个工作流节点的有机结合

State 类对于整个有向图的构建是至关重要的，下文我们将构建一个含人的调用状态的 State 类，以便更好地实现 Human in the loop 的设计。

假定当前有一个业务，用户问了一个非常非常专业的问题，但是 LLM 发现自己不太能回答，则此时需要另一个专家来帮忙回答。则此时被找的专家即为我们要实现的 Human in the loop 中的人类！

In [None]:
from typing import Annotated
from typing_extensions import TypedDict

from langgraph.graph.message import add_messages


class State(TypedDict):
    messages: Annotated[list, add_messages]
    # This flag is new
    ask_human: bool

随后，我们定义一个 Pydantic 类，用于定义一个请求专家帮助的请求。

官方文档提示我们要用 pydantic v2 而不是 v1

In [None]:
from pydantic import BaseModel

class RequestAssistance(BaseModel):
    """Escalate the conversation to an expert. Use this if you are unable to assist directly or if the user requires support beyond your permissions.

    To use this function, relay the user's 'request' so the expert can provide the right guidance.
    """

    request: str

再次定义一个 chatbot 节点，但这次在两处有改动。

- 一是多绑定了一个工具，RequestAssistance，用于接受用户的请求并将其转发给专家。在 LangChain 中，工具的定义可以是一个 Pydantic 类，也可以是一个继承了 BaseTool 的类。
- 二是在 chatbot 函数中，当 LLM 认为自己不能回答的时候，会将 ask_human 设为 True，更新 state，以便后续的工作流能够更好地处理这种情况。

In [None]:
from langchain_openai import ChatOpenAI
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
from langchain_community.tools import DuckDuckGoSearchRun

wrapper = DuckDuckGoSearchAPIWrapper(max_results=5)
tool = DuckDuckGoSearchRun(api_wrapper=wrapper)
tools = [tool]
llm = ChatOpenAI(model=LLM_NAME, openai_api_key=API_KEY, openai_api_base=API_BASE)
llm_with_tools = llm.bind_tools(tools + [RequestAssistance])

def chatbot(state: State):
    response = llm_with_tools.invoke(state["messages"])
    ask_human = False
    if (
        response.tool_calls
        and response.tool_calls[0]["name"] == RequestAssistance.__name__
    ):
        ask_human = True
    return {"messages": [response], "ask_human": ask_human}

随后就是很普通的构建工具、构建图的过程了。

In [None]:
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph

graph_builder = StateGraph(State)

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools=[tool]))

In [None]:
from langchain_core.messages import AIMessage, ToolMessage

# 此处定义了一个工具函数，用于根据 AI 消息的工具请求消息，构建一个工具回答消息
def create_response(response: str, ai_message: AIMessage):
    return ToolMessage(
        content=response,
        tool_call_id=ai_message.tool_calls[0]["id"],
    )


def human_node(state: State):
    new_messages = []
    if not isinstance(state["messages"][-1], ToolMessage):
        # Typically, the user will have updated the state during the interrupt.
        # If they choose not to, we will include a placeholder ToolMessage to
        # let the LLM continue.
        new_messages.append(
            create_response("No response from human.", state["messages"][-1])
        )
    return {
        # Append the new messages
        "messages": new_messages,
        # Unset the flag
        "ask_human": False,
    }


graph_builder.add_node("human", human_node)

前文一直反反复复的调整反转 state 对象中 `ask_human` 的值，目的就是为了控制他是否能够选择 human 工具节点。有一种思想是在工具内部，查看 `state.ask_human` 的值，但是更为精准的做法则是使用条件边。

条件边的用法可见 ch1。

我们将上文定义的内容都组装成一个完整的有向图，然后使用 MemorySaver 来保存中间状态。如下文。

In [None]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START
from langgraph.prebuilt import tools_condition


def select_next_node(state: State):
    if state["ask_human"]:
        return "human"
    # Otherwise, we can route as before
    return tools_condition(state)


graph_builder.add_conditional_edges(
    "chatbot",
    select_next_node,
    {"human": "human", "tools": "tools", "__end__": "__end__"},
)

# The rest is the same
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge("human", "chatbot")
graph_builder.add_edge(START, "chatbot")
memory = MemorySaver()
graph = graph_builder.compile(
    checkpointer=memory,
    # We interrupt before 'human' here instead.
    interrupt_before=["human"],
)

让我们尝试问一个专业问题，引导 LLM 选取 human 节点，看看会发生什么吧！

In [None]:
user_input = "I need some expert guidance for building this AI agent. Could you request assistance for me?"
config = {"configurable": {"thread_id": "1"}}
# The config is the **second positional argument** to stream() or invoke()!
events = graph.stream(
    {"messages": [("user", user_input)]}, config, stream_mode="values"
)
for event in events:
    if "messages" in event:
        event["messages"][-1].pretty_print()

In [None]:
snapshot = graph.get_state(config)
snapshot.next

通过查看用户问询结束后的 graph 状态，我们可以看到下一个节点是 human 节点。

如果我们就这样推进下去，graph 就会进入 human 节点了，但是这个程序在设计 human 节点时，默认用户在 human 节点中获取不到任何信息，则我们此时可以通过人为添加对话来模拟 human 的回答。

In [None]:
ai_message = snapshot.values["messages"][-1]
human_response = (
    "We, the experts are here to help! We'd recommend you check out LangGraph to build your agent."
    " It's much more reliable and extensible than simple autonomous agents."
)
tool_message = create_response(human_response, ai_message)
graph.update_state(config, {"messages": [tool_message]})
for message_pieces in graph.get_state(config).values["messages"]:
    message_pieces.pretty_print()

在上面的案例中，我们又伪造了一个来自 human 的回答，最后再交给 LLM 看看他会怎么处理，最后我们再回顾一下这次对话的全部内容吧！

复习一下，再次调用 stream() 方法，第一个参数给 None，将会从断点处继续推进 graph 的状态。

In [None]:
events = graph.stream(None, config, stream_mode="values")
for message_pieces in graph.get_state(config).values["messages"]:
    message_pieces.pretty_print()

### 2. 倒带 （做不出😭😭😭）

若在某些对话过程当中，用户想要修改其中某句话的表述，想要回到某个特定的 LLM 上下文状态怎么办？

这种情况下就要用到 LangGraph 中 graph 对象的 `get_state_history` 了！

承接上文，让我们来构造一次普通的问答请求。为了避免前文消息的干扰，我们重新构造一个两轮对话，令 `thread_id` 为 3。

In [None]:
config = {"configurable": {"thread_id": "3"}}
graph.stream(
    {
        "messages": [
            ("user", "I'm learning LangGraph. Could you do some web research on it for me?")
        ]
    },
    config,
    stream_mode="values",
)

events = graph.stream(
    {
        "messages": [
            ("user", "Ya that's helpful. Maybe I'll build an autonomous agent with it!")
        ]
    },
    config,
    stream_mode="values",
)
for event in events:
    if "messages" in event:
        event["messages"][-1].pretty_print()

现在再写一个代码，来查看 graph 的状态历史吧。

In [None]:
to_replay = None
for state in graph.get_state_history(config):
    print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)
    print("-" * 80)
    if len(state.values["messages"]) == 6:
        # We are somewhat arbitrarily selecting a specific state based on the number of chat messages in the state.
        to_replay = state   # 这个 to_replay 定为 6，即第二轮对话结束后的状态——最新状态，有什么用？？

通过上面的代码，我们获取到了一个目标的状态点，打印一下看看？

In [None]:
print(to_replay.next)
print(to_replay.config)

我们发现，我们刚拿到的这个状态点下一步没事干（`to_replay.next` 内容为空），而且 config 也没什么特别的。但这并不重要，重点是我们可以根据这样一个检查点，重新推进对话。

In [None]:
# The `checkpoint_id` in the `to_replay.config` corresponds to a state we've persisted to our checkpointer.
for event in graph.stream(None, to_replay.config, stream_mode="values"):
    if "messages" in event:
        event["messages"][-1].pretty_print()

难道这个检查点的意义就局限于此？就这么无聊一个应用？

下文将介绍如何更新其中的某一个步骤，并且重新从那一个步骤开始推进对话。

（好烦，这个倒带做不出，不知道哪里出问题了😭😭😭）

In [None]:
to_replay_again = None
for state in graph.get_state_history(config):
    print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)
    print("-" * 80)
    # 这样写 if 是因为到 START 节点跟到上一轮的 END 节点的 messages 长度是一样的（都是4，可以观察之前的 state 陈列结果）。要倒带肯定是倒到有实际 message 的地方（即 START 处），所以这里的 if 语句是为了找到 START 节点，怕跟上一轮的 END 节点混淆。
    # 总结：为了找到 START 节点，规避掉上一轮的 END 节点。
    if len(state.values["messages"]) == 4:  
        # We are somewhat arbitrarily selecting a specific state based on the number of chat messages in the state.
        to_replay_again = state

print("\n\n replay node: ")
print(to_replay_again.metadata)
print(to_replay_again.next)
print(to_replay_again.config)

for event in graph.stream(None, to_replay_again.config, stream_mode="values"):
    if "messages" in event:
        event["messages"][-1].pretty_print()

In [None]:
last_message = to_replay_again.values["messages"][-1]

# 修改第二轮对话的用户输入
last_message.content = "Thanks you! I will take your information and build a chatbot with LangGraph! Before that, I also wanted to know about LangChain, could you help me with that?"

branch_config = graph.update_state(
    to_replay_again.config,
    {"messages": [last_message]},
)
print(branch_config)

In [None]:
graph.stream(None, branch_config, stream_mode="values")

In [None]:
for message_pieces in graph.get_state(config).values["messages"]:
    message_pieces.pretty_print()