## LangGraph 多智能体协作中文指南
在单个领域中，通常一个智能体能够有效地使用一些工具，但即使是使用强大的模型（例如 GPT-4），它在使用大量工具时效果可能会有所降低。

一种解决复杂任务的方法是采用“分而治之”的方式：为每个任务或领域创建一个专门的智能体，并将任务路由到正确的“专家”。

本指南灵感来自 Wu 等人的论文《AutoGen: 通过多智能体对话实现下一代 LLM 应用》 展示了使用 LangGraph 进行多智能体协作的一种方法。

### 工作流程概述
工作流程清晰地展示了多智能体协作的核心步骤，便于理解 LangGraph 的实现方法。

1. 定义工具：为每个智能体提供专用的工具，例如 Tavily 搜索工具和 Python REPL 工具，用于执行特定任务。
2. 定义辅助函数：agent_node：将每个智能体与对应任务进行关联，定义图中的智能体节点，使其能够处理特定任务。
3. 定义辅助函数：create_agent：为每个任务创建独立的智能体，例如研究智能体、图表生成器智能体等。每个智能体使用独立的语言模型和工具。
4. 定义研究智能体及节点: Researcher: 研究智能体使用 Tavily 搜索工具，回应用户提问。
5. 定义图表生成器智能体及节点: Chart_Generator: 根据提供的数据，在沙盒环境执行 Python 代码生成图表。
6. 导入预构建的工具节点: ToolNode: 将2中定义的 Tavily 搜索工具和 Python REPL 工具作为一个工具节点，这样可以方便地在工作流中使用这些工具。
7. 建立智能体节点间通信: AgentState：通过 LangGraph 实现智能体间通信，智能体能够共享状态并相互协作完成复杂任务。
8. 定义工作流（状态图)：创建状态图以管理多智能体协作的流程，包含任务路由和边逻辑，确保正确的智能体按顺序执行。
9. 执行工作流：根据状态图执行多智能体任务，通过工具调用和智能体协作，完成目标任务并生成最终输出。

In [36]:
%%capture --no-stderr
%pip install langchain langgraph langchain-ollama tavily-python

In [None]:
import os

# from dotenv import load_dotenv
# load_dotenv()

1. 定义工具
接下来我们定义一些未来智能体将使用的工具。

注释说明：
- tavily_tool: 定义了一个 Tavily 搜索工具，可以搜索最多 5 条结果。
- repl: 定义了一个 Python REPL 工具，用于执行 Python 代码块。
- python_repl 函数：这是一个装饰的工具函数，接受 Python 代码作为输入，并通过 PythonREPL 环境执行代码。成功执行后返回执行的代码和输出。如果发生错误，则捕获并返回错误信息。

In [38]:
from typing import Annotated
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL

# Tavily 搜索工具，用于搜索
tavily_tool = TavilySearchResults(max_results=5)

# Python REPL 工具，用于执行 Python 代码
repl = PythonREPL()

@tool
def python_repl(code: Annotated[str, "The python code to execute to generate your chart."]):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    return f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"



### 2. 辅助函数：智能体节点
下面我们定义智能体节点函数（agent_node)，然后使用它分别定义2个智能体节点：

- Researcher
- Chart_Generator

#### 注释说明：
- agent_node 函数是一个辅助函数，用于创建一个智能体节点。它接受当前的 state（状态）、agent（智能体） 和 name（智能体的名称），并返回一个新的状态字典，包含消息和发送者。
- research_agent: 使用 create_agent 函数创建了一个研究智能体，使用 research_llm 作为语言模型，并且绑定了 tavily_tool 搜索工具。
- chart_agent: 同样使用 create_agent 创建了图表生成器智能体，使用 chart_llm 作为语言模型，并绑定了 python_repl 代码执行工具。
- functools.partial: 用于创建特定名称的智能体节点，例如 "Researcher" 和 "Chart_Generator"，并与各自的智能体绑定。

In [39]:
from langchain_core.messages import AIMessage, ToolMessage

# 辅助函数：为智能体创建一个节点
def agent_node(state, agent, name):
    # 修正名称格式，移除空格并确保只包含合法字符
    name = name.replace(" ", "_").replace("-", "_")  # 确保符合正则表达式要求

    # 调用智能体，获取结果
    result = agent.invoke(state)
    
    # 将智能体的输出转换为适合追加到全局状态的格式
    if isinstance(result, ToolMessage):
        pass  # 如果是工具消息，跳过处理
    else:
        # 将结果转换为 AIMessage，并排除部分字段
        result = AIMessage(**result.dict(exclude={"type", "name"}), name=name)
    
    # 返回更新后的状态，包括消息和发送者
    return {
        "messages": [result],  # 包含新生成的消息
        # 我们使用严格的工作流程，通过记录发送者来知道接下来传递给谁
        "sender": name,
    }

### 关于 AIMessage 构造
AIMessage 是 LangChain 中用于表示 AI 模型回复的类，它封装了 AI 生成的文本或内容。为了让 Python 初学者更好地理解，我们可以从以下几个方面详细说明 AIMessage 的构造方法及其相关概念。

AIMessage 构造方法简介
在代码中，AIMessage(**result.dict(exclude={"type", "name"}), name=name) 这段代码使用了 AIMessage 的构造方法。AIMessage 的目的是将 AI 生成的消息封装起来，方便后续处理和传递。这里的构造方法通过传递字典参数创建 AIMessage 对象。

AIMessage 类的常见构造参数：
- content: 这是消息的主要部分，通常是 AI 模型生成的文本内容。例如，一个简单的对话模型可能会生成一个包含回答问题的字符串。
- name: 可选参数，用于标识发送消息的 AI 模型或智能体的名称。在你的代码中，name=name 表示为智能体分配一个名称（如 "Researcher" 或 "Chart_Generator"），以便在不同智能体之间进行区分。
- additional_metadata: 有时候，消息不仅仅包含文本内容，还可能附加其他元数据，如调用的工具、时间戳等。
深入理解构造方法中的步骤：
- result.dict(): 这一部分将 result 对象转换为字典。字典是一种键值对的结构，便于存储和管理数据。Python 中的 dict() 方法会把 result 对象的所有属性转换成字典的形式，方便在构造 AIMessage 时传递这些数据。

- exclude={"type", "name"}: 在构造 AIMessage 时，使用了 exclude 参数来排除某些不必要的字段。type 和 name 这两个字段不会被传递给 AIMessage，这是因为它们可能不是 AI 消息本身的必要部分或已经在其他地方定义过。

- name=name: 这里的 name 参数表示智能体的名称，它是在 agent_node 函数中作为参数传递的。在构造 AIMessage 时，通过这个参数来标识消息的来源智能体是谁，比如 "Researcher" 或 "Chart_Generator"。

### 3. 辅助函数：创建智能体
以下助手函数将帮助我们创建智能体。这些智能体将成为图中的节点。

注释说明：
该函数 create_agent 用于创建一个智能体，通过为该智能体提供系统消息和可以使用的工具来指定其行为。
ChatPromptTemplate.from_messages 是用于构建该智能体的对话提示模板，系统消息告诉智能体它是如何与其他智能体协作的。
提示模板通过 partial 函数插入了系统消息和工具名称，使得智能体能够根据提供的工具执行任务。
最终，智能体被绑定到所提供的 LLM（大型语言模型）和工具列表中，构成一个完整的智能体逻辑。

In [40]:
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import END, StateGraph, START


# 创建智能体的函数，绑定 LLM（大型语言模型） 和工具
def create_agent(llm, tools, tool_message: str, custom_notice: str=""):
    """创建一个智能体。"""
    # 定义智能体的提示模板，包含系统消息和工具信息
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful AI assistant, collaborating with other assistants."
                " Use the provided tools to progress towards answering the question."
                " If you are unable to fully answer, that's OK, another assistant with different tools "
                " will help where you left off. Execute what you can to make progress."
                " If you or any of the other assistants have the final answer or deliverable,"
                " prefix your response with FINAL ANSWER so the team knows to stop."
                "\n{custom_notice}\n"
                " You have access to the following tools: {tool_names}.\n{tool_message}\n\n",
            ),
            MessagesPlaceholder(variable_name="messages"),  # 用于替换的消息占位符

        ]
    )

    # 将系统消息部分和工具名称插入到提示模板中
    prompt = prompt.partial(tool_message=tool_message, custom_notice=custom_notice)
    prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
      
    # 将提示模板与语言模型和工具绑定
    return prompt | llm.bind_tools(tools)

In [41]:
from langchain_openai import ChatOpenAI
import functools
# 为 Agent 配置各自的大模型
research_llm = ChatOpenAI(model="gpt-4o-mini")
chart_llm = ChatOpenAI(model="gpt-4o-mini")
table_llm = ChatOpenAI(model="gpt-4o-mini")

# 研究智能体及其节点
research_agent = create_agent(
    research_llm,  # 使用 research_llm 作为研究智能体的语言模型
    [tavily_tool],  # 研究智能体使用 Tavily 搜索工具
    tool_message=(
        "Before using the search engine, carefully think through and clarify the query."
        " Then, conduct a single search that addresses all aspects of the query in one go",
    ),
    custom_notice=(
        "Notice:\n"
        "Only gather and organize information. Do not generate code or give final conclusions, leave that for other assistants."
    ),
)

# 使用 functools.partial 创建研究智能体的节点，指定该节点的名称为 "Researcher"
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

In [42]:
chart_agent = create_agent(
    chart_llm,  # 使用 chart_llm 作为图表生成器智能体的语言模型
    [python_repl],  # 图表生成器智能体使用 Python REPL 工具
    tool_message="Create clear and user-friendly charts based on the provided data.",  # 系统消息，指导智能体如何生成图表
    custom_notice="Notice:\n"
    "If you have completed all tasks, respond with FINAL ANSWER.",
)

# 使用 functools.partial 创建图表生成器智能体的节点，指定该节点的名称为 "Chart_Generator"
chart_node = functools.partial(agent_node, agent=chart_agent, name="Chart_Generator")

In [43]:
# 表格生成器智能体 - 专门的提示词设计
table_agent = create_agent(
    table_llm,
    [python_repl],
    tool_message=(
        "You are a specialized Table Generator. Create professional, well-formatted tables "
        "based on the provided data. Focus on clarity, readability, and appropriate styling."
    ),
    custom_notice="Notice:\n"
    "If you have completed all tasks, respond with FINAL ANSWER.",
)

# 使用 functools.partial 创建图表生成器智能体的节点，指定该节点的名称为 "Table_Generator"
table_node = functools.partial(agent_node, agent=table_agent, name="Table_Generator")

In [44]:
from langgraph.prebuilt import ToolNode

# 定义工具列表，包括 Tavily 搜索工具和 Python REPL 工具
tools = [tavily_tool, python_repl]

# 创建工具节点，负责工具的调用
tool_node = ToolNode(tools)

In [45]:
import operator
from typing import Annotated, Sequence, TypedDict

# 定义图中传递的对象，包含消息和发送者信息
class AgentState(TypedDict):
    # messages 是传递的消息，使用 Annotated 和 Sequence 来标记类型
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # sender 是发送消息的智能体
    sender: str

In [None]:
# 创建一个状态图 workflow，使用 AgentState 来管理状态
workflow = StateGraph(AgentState)

# 将研究智能体节点、图表生成器智能体节点和工具节点添加到状态图中
workflow.add_node("Researcher", research_node)
# workflow.add_node("Chart_Generator", chart_node)
workflow.add_node("Table_Generator", table_node)
workflow.add_node("call_tool", tool_node)

In [47]:
from typing import Literal

# 路由器函数，用于决定下一步是执行工具还是结束任务
def router(state) -> Literal["call_tool", "__end__", "continue"]:
    messages = state["messages"]  # 获取当前状态中的消息列表
    last_message = messages[-1]  # 获取最新的一条消息
    
    # 如果最新消息包含工具调用，则返回 "call_tool"，指示执行工具
    if last_message.tool_calls:
        return "call_tool"
    
    # 如果最新消息中包含 "FINAL ANSWER"，表示任务已完成，返回 "__end__" 结束工作流
    if "FINAL ANSWER" in last_message.content:
        return "__end__"
    
    # 如果既没有工具调用也没有完成任务，继续流程，返回 "continue"
    return "continue"

In [None]:
# 为 "Researcher" 智能体节点添加条件边，根据 router 函数的返回值进行分支
workflow.add_conditional_edges(
    "Researcher",
    router,  # 路由器函数决定下一步
    {
        # "continue": "Chart_Generator",  # 如果 router 返回 "continue"，则传递到 Chart_Generator
        "continue": "Table_Generator",  # 如果 router 返回 "continue"，则传递到 Chart_Generator
        "call_tool": "call_tool",  # 如果 router 返回 "call_tool"，则调用工具
        "__end__": END  # 如果 router 返回 "__end__"，则结束工作流
    },
)

# 为 "Chart_Generator" 智能体节点添加条件边
workflow.add_conditional_edges(
    # "Chart_Generator",
    "Table_Generator",
    router,  # 同样使用 router 函数决定下一步
    {
        "continue": "Researcher",  # 如果 router 返回 "continue"，则回到 Researcher
        "call_tool": "call_tool",  # 如果 router 返回 "call_tool"，则调用工具
        "__end__": END  # 如果 router 返回 "__end__"，则结束工作流
    },
)

# 为 "call_tool" 工具节点添加条件边，基于“sender”字段决定下一个节点
# 工具调用节点不更新 sender 字段，这意味着边将返回给调用工具的智能体
workflow.add_conditional_edges(
    "call_tool",
    lambda x: x["sender"],  # 根据 sender 字段判断调用工具的是哪个智能体
    {
        "Researcher": "Researcher",  # 如果 sender 是 Researcher，则返回给 Researcher
        "Table_Generator": "Table_Generator",  # 如果 sender 是 Chart_Generator，则返回给 Chart_Generator
    },
)

In [49]:
# 添加开始节点，将流程从 START 节点连接到 Researcher 节点
workflow.add_edge(START, "Researcher")

# 编译状态图以便后续使用
graph = workflow.compile()

In [None]:
# 可视化图
from IPython.display import Image, display

try:
    display(
        Image(
            graph.get_graph(xray=True).draw_mermaid_png()
        )
    )
except Exception as e:
    print(f"Error generating graph: {e}")

In [None]:
events = graph.stream(
    {
        "messages": [
            HumanMessage(
                content="搜索2023年全球主要科技公司的营收数据"
            "and then plot a table with Python. End the task after generating the table。"
            )
        ],
    },
    # 设置最大递归限制
    {"recursion_limit": 20},
    stream_mode="values"
)

for event in events:
    if "messages" in event:
        event["messages"][-1].pretty_print()  # 打印消息内容