代码实现路由需定义可能路径及决策逻辑。LangChain 和 LangGraph 等框架提供了专用组件和结构,LangGraph 的状态图结构尤其适合路由逻辑的可视化和实现。

以下代码演示了使用 LangChain构建的简单智能体系统。
系统设置一个“协调者”,根据请求意图(预订、信息、或不明确)将用户请求路由到不同的“子智能体”处理器。系统利用语言模型分类请求,并委托给相应处理函数,模拟多智能体架构中的基本委托模式。

In [1]:
import os
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableBranch, RunnablePassthrough
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    api_key=os.getenv("API_KEY"),
    base_url=os.getenv("BASE_URL"),
    model="deepseek-chat",
    temperature=0,
)


# 定义模拟子智能体处理器
def booking_handler(request: str) -> str:
    """模拟预订智能体处理请求"""
    print("\n --- 委托给预订处理器 ---")
    return f"预订处理器已处理请求:'{request}'。结果:模拟预订动作。"


def info_handler(request: str) -> str:
    """模拟信息智能体处理请求。"""
    print("\n‐‐‐ 委托给信息处理器 ‐‐‐")
    return f"信息处理器已处理请求:'{request}'。结果:模拟信息检索。"


def unclear_handler(request: str) -> str:
    """处理无法委托的请求。"""
    print("\n‐‐‐ 处理不明确请求 ‐‐‐")
    return f"协调者无法委托请求:'{request}'。请补充说明。"


# ‐‐‐ 定义协调者路由链(等同于 ADK 协调者指令)‐‐‐
coordinator_router_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """分析用户请求,判断应由哪个专属处理器处理。
        ‐ 若请求涉及预订机票或酒店,输出 'booker'。
        ‐ 其他一般信息问题,输出 'info'。    
        ‐ 若请求不明确或不属于上述类别,输出 'unclear'。
        只输出一个词:'booker'、'info' 或 'unclear'。"""),
        ("user", "{request}"),
    ]
)

coordinator_router_chain = coordinator_router_prompt | llm | StrOutputParser()

# 定义委托逻辑
branches = {
    "booker": RunnablePassthrough.assign(output=lambda x: booking_handler(x["request"]["request"])),
    "info": RunnablePassthrough.assign(output=lambda x: info_handler(x["request"]["request"])),
    "unclear": RunnablePassthrough.assign(output=lambda x: unclear_handler(x["request"]["request"])),
}

delegation_branch = RunnableBranch(
    (lambda x: x['decision'].strip() == 'booker', branches["booker"]),
    (lambda x: x['decision'].strip() == 'info', branches["info"]),
    branches["unclear"],  # 默认分支
)

coordinator_agent = {
    "decision": coordinator_router_chain,
    "request": RunnablePassthrough()
} | delegation_branch | (lambda x: x['output'])

 # ‐‐‐ 示例用法 ‐‐‐
def main():
    if not llm:
        print("\n 因 LLM 初始化失败,跳过执行。")
        return
    
    print("‐‐‐ 预订请求示例 ‐‐‐")
    request_a = "帮我预订飞往伦敦的机票。"
    result_a = coordinator_agent.invoke({"request": request_a})
    print(f"最终结果 A: {result_a}")
    
    print("\n‐‐‐ 信息请求示例 ‐‐‐")
    request_b = "意大利的首都是哪里?"
    result_b = coordinator_agent.invoke({"request": request_b})
    print(f"最终结果 B: {result_b}")

    print("\n‐‐‐ 不明确请求示例 ‐‐‐")
    request_c = "讲讲量子物理。"
    result_c = coordinator_agent.invoke({"request": request_c})
    print(f"最终结果 C: {result_c}")


if __name__ == "__main__":
    main()



‐‐‐ 预订请求示例 ‐‐‐

 --- 委托给预订处理器 ---
最终结果 A: 预订处理器已处理请求:'帮我预订飞往伦敦的机票。'。结果:模拟预订动作。

‐‐‐ 信息请求示例 ‐‐‐

‐‐‐ 委托给信息处理器 ‐‐‐
最终结果 B: 信息处理器已处理请求:'意大利的首都是哪里?'。结果:模拟信息检索。

‐‐‐ 不明确请求示例 ‐‐‐

‐‐‐ 委托给信息处理器 ‐‐‐
最终结果 C: 信息处理器已处理请求:'讲讲量子物理。'。结果:模拟信息检索。


In [3]:
# 可循环调用

import os
from typing import Dict, Any

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

# ============================================
# 1. 配置 LLM
# ============================================

llm = ChatOpenAI(
    api_key=os.getenv("API_KEY"),
    base_url=os.getenv("BASE_URL"),
    model="deepseek-chat",
    temperature=0,
)

# ============================================
# 2. 定义子智能体 / 工具（这里先用简单的模拟函数）
# ============================================

def booking_handler(request: str) -> str:
    """预订处理器：模拟机票/酒店预订逻辑"""
    print("\n--- 预订处理器 ---")
    return f"【预订结果】已处理: {request}"

def info_handler(request: str) -> str:
    """信息处理器：模拟信息查询逻辑"""
    print("\n--- 信息处理器 ---")
    return f"【信息结果】已处理: {request}"

def unclear_handler(request: str) -> str:
    """兜底处理器：当 route 未知或不支持时使用"""
    print("\n--- 不明确处理器 ---")
    return f"【未能处理】请求不明确或路由未知: {request}"

# 把所有可用的“子智能体/工具”放进一个路由表里，方便按 route 名称调用
HANDLERS: Dict[str, Any] = {
    "booker": booking_handler,
    "info": info_handler,
}


# ============================================
# 3. Planner：用 LLM 把“用户的大任务”拆成多个子任务列表
#
#   输入：用户的一句话（str）
#   输出：一个 JSON 数组，每个元素是一个子任务:
#       { "route": "booker" | "info", "content": "交给该处理器的指令" }
#
#   这里是“可循环调用”的关键：把任务变成 List[task]
# ============================================

planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            f"""你是任务规划器。
用户会给你一个综合性请求，你需要把它拆成若干个可以按顺序执行的子任务列表。

每个子任务是一个 JSON 对象，必须包含字段:
- "route": 使用哪个处理器，只能是 "booker" 或 "info"
- "content": 交给该处理器的中文指令

要求输出：
- 只输出一个合法的 JSON 数组字符串。
- JSON 数组中的每个元素都是一个对象，包含两个字段：
  - route：字符串，只能是 "booker" 或 "info"
  - content：字符串，表示要发给该处理器的中文指令

其他要求：
- 不要在 JSON 外输出任何解释或额外文本。
- 如果用户请求很简单，只需要一个子任务也可以。
- 如果既要预订又要查询信息，就拆成两个及以上子任务。
"""),
        # 这里使用 {input}，使得整个链可以直接用 .invoke("用户输入") 调用
        ("user", "{input}"),
    ]
)

# JsonOutputParser 会把 LLM 输出的 JSON 字符串解析成 Python 对象（通常是 list[dict]）
planner_chain = (
    RunnableLambda(lambda x: {"input": x}) # 把字符串输入转成 dict
    | planner_prompt 
    | llm 
    | JsonOutputParser()
)
# 输入: str
# 输出: List[{"route": ..., "content": ...}]


# ============================================
# 4. Executor：对每个子任务分发到对应的子智能体（循环调用）
#
#   - dispatch_one_task: 单个任务 -> 单个结果
#   - dispatch_runnable.map(): 对列表中的每个任务都调用一次 dispatch_one_task
# ============================================

def dispatch_one_task(task: Dict[str, Any]) -> str:
    """
    根据 task["route"] 字段选择对应的处理器，
    再把 task["content"] 交给处理器执行。
    """
    route = task.get("route")
    content = task.get("content", "")

    # 根据 route 找到对应的 handler，找不到就走 unclear_handler
    handler = HANDLERS.get(route, unclear_handler)
    return handler(content)

# RunnableLambda 可以把一个普通 Python 函数包装成 LCEL 的 Runnable
dispatch_runnable = RunnableLambda(dispatch_one_task)

# .map()：表示“对列表里的每一项都跑一遍 dispatch_runnable”
# multi_tool_executor 的输入是 List[task]，输出是 List[result]
multi_tool_executor = dispatch_runnable.map()

# 组合成一个完整的“多工具执行链”：
#   输入: 用户原始请求（str）
#   1) planner_chain:         str -> List[task]
#   2) multi_tool_executor:   List[task] -> List[result]
#   3) RunnableLambda:        List[result] -> str（拼成一段文本）
multi_tool_chain = (
    planner_chain
    | multi_tool_executor
    | RunnableLambda(lambda results: "\n".join(results))  # 把每个子任务结果换行拼接
)


# ============================================
# 5. Aggregator：用 LLM 把“原始请求 + 子任务结果”整合成一段最终回复
# ============================================

aggregator_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """你是总协调者。
你会拿到：
- 用户的原始请求
- 各个子任务执行后的结果汇总（多行文本）

请根据这些信息，用自然的中文给用户一段清晰的最终答复：
- 可以分点说明每一步做了什么、结果如何
- 不要输出 JSON
- 保持语气专业、简洁
"""
        ),
        (
            "user",
            "用户原始请求:\n{input}\n\n子任务执行结果:\n{results}"
        ),
    ]
)

aggregator_chain = aggregator_prompt | llm | StrOutputParser()
# 输入: {"input": str, "results": str}
# 输出: str


# ============================================
# 6. 最终协调智能体：Coordinator
#
#   输入：用户一句话（str）
#   内部流程：
#       1) multi_tool_chain 根据用户输入拆分任务并循环调用各子智能体
#       2) aggregator_chain 用 LLM 整合原始请求 + 子任务结果
#   输出：最终一段自然语言答复（str）
# ============================================

coordinator_agent = (
    {
        # 把用户原始输入传给 "input" 字段
        "input": RunnablePassthrough(),
        # 同时把同样的输入送给 multi_tool_chain，得到 "results"
        "results": multi_tool_chain,
    }
    | aggregator_chain
)
# 类型：Runnable[str, str]


# ============================================
# 7. 示例：直接运行本文件时做一个简单测试
# ============================================

if __name__ == "__main__":
    # 例子：既要预订又要查信息，测试“循环调用多个子智能体”的效果
    user_input = "帮我预订明天去上海的高铁票，并顺便查一下上海明天天气。"

    print("====== 用户请求 ======")
    print(user_input)

    print("\n====== 协调智能体执行中 ======\n")
    final_answer = coordinator_agent.invoke(user_input)

    print("\n====== 最终答复 ======")
    print(final_answer)


帮我预订明天去上海的高铁票，并顺便查一下上海明天天气。



--- 预订处理器 ---

--- 信息处理器 ---

根据您的需求，我已经完成了以下两项任务：

1. **高铁票预订**：已为您成功预订明天前往上海的高铁票。预订信息已确认，请及时查看相关车次和座位详情。

2. **天气查询**：已查询上海明天的天气情况。具体预报信息已获取，建议您出行前查看以做好相应准备。

两项任务均已完成，如有其他需要，请随时告知。


In [4]:
import os
from typing import Dict, Any

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import (
    RunnablePassthrough,
    RunnableBranch,
    RunnableLambda,
)

# ============================================
# 1. 配置 DeepSeek LLM
# ============================================

llm = ChatOpenAI(
    api_key=os.getenv("API_KEY"),
    base_url=os.getenv("BASE_URL"),
    model="deepseek-chat",
    temperature=0,
)

# ============================================
# 2. 定义“子智能体”的处理函数（模拟工具/子 Agent）
# ============================================

def booking_handler(request: str) -> str:
    """
    处理机票、酒店等预订请求。
    这里用 print + 返回字符串模拟实际动作。
    """
    print("\n---------- 委托给【预订处理器】 ----------")
    return f"预订处理器已处理请求: '{request}'。结果: 模拟预订动作。"


def info_handler(request: str) -> str:
    """
    处理一般信息/问答请求。
    """
    print("\n---------- 委托给【信息处理器】 ----------")
    return f"信息处理器已处理请求: '{request}'。结果: 模拟信息检索。"


def unclear_handler(request: str) -> str:
    """
    当协调者判断无法明确路由时的兜底处理。
    """
    print("\n---------- 处理【不明确请求】 ----------")
    return f"协调者无法委托请求: '{request}'。请补充说明。"


# ============================================
# 3. 定义“协调者 Router”Prompt + 决策链
#
#   功能：分析用户请求 → 只输出 'booker' / 'info' / 'unclear'
#   注意：模板变量只有 {request}，避免出现 KeyError
# ============================================

coordinator_router_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """你是一个路由协调智能体。
你的任务是分析用户的请求，并判断应该由哪个专属处理器来处理。

规则：
- 若请求涉及预订机票或酒店（比如预订航班、酒店、住宿等），输出：booker
- 若是其他一般信息、知识问答、解释说明等问题，输出：info
- 若请求不明确或不属于上述类别，输出：unclear

只输出一个单词，不要额外解释：
- booker
- info
- unclear
""",
        ),
        ("user", "{request}"),
    ]
)

# ChatPromptTemplate 期望传入 dict，所以用一个小包装把 str → {"request": str}
coordinator_router_chain = (
    RunnableLambda(lambda x: {"request": x})
    | coordinator_router_prompt
    | llm
    | StrOutputParser()
)
# 现在：
#   输入: str (用户请求)
#   输出: str ("booker" / "info" / "unclear")


# ============================================
# 4. 定义 LCEL 分支逻辑（相当于“协调者委托给子智能体”）
#
#   输入: {"decision": str, "request": str}
#   输出: {"decision": str, "request": str, "output": str}
# ============================================

branches: Dict[str, Any] = {
    "booker": RunnablePassthrough.assign(
        output=lambda x: booking_handler(x["request"])
    ),
    "info": RunnablePassthrough.assign(
        output=lambda x: info_handler(x["request"])
    ),
    "unclear": RunnablePassthrough.assign(
        output=lambda x: unclear_handler(x["request"])
    ),
}

# RunnableBranch 会根据条件选择一个分支执行
delegation_branch = RunnableBranch(
    (lambda x: x["decision"].strip() == "booker", branches["booker"]),
    (lambda x: x["decision"].strip() == "info", branches["info"]),
    branches["unclear"],  # 默认兜底分支
)

# ============================================
# 5. 组合成一个“协调智能体” Coordinator Agent
#
#   输入: str（用户请求）
#   流程：
#       1) coordinator_router_chain: str -> "booker"/"info"/"unclear"
#       2) RunnablePassthrough:      str -> 保留原始 request
#       3) delegation_branch:        按 decision 选择对应 handler
#       4) lambda 只取 x["output"] 作为最终输出
# ============================================

coordinator_agent = (
    {
        # "decision"：给 router 链用
        "decision": coordinator_router_chain,
        # "request"：原始文本透传给后面的 handler
        "request": RunnablePassthrough(),  # 这里接收的是 str
    }
    | delegation_branch
    | (lambda x: x["output"])  # 最终只保留 output 字段
)
# 类型：Runnable[str, str]


# ============================================
# 6. 示例调用
# ============================================

def main():
    print("\n====== DeepSeek 协调者路由示例 ======\n")

    test_queries = [
        "帮我预订明天去上海的高铁票和酒店。",
        "世界最高的山峰是哪一座？",
        "随便说一个有趣的事实。",
        "查找下个月飞往东京的航班。",
    ]

    for i, q in enumerate(test_queries, start=1):
        print(f"\n>>> 用例 {i}：{q}")
        result = coordinator_agent.invoke(q)  # 直接传 str
        print(f"<<< 最终结果 {i}: {result}")


if __name__ == "__main__":
    main()





>>> 用例 1：帮我预订明天去上海的高铁票和酒店。

---------- 委托给【预订处理器】 ----------
<<< 最终结果 1: 预订处理器已处理请求: '帮我预订明天去上海的高铁票和酒店。'。结果: 模拟预订动作。

>>> 用例 2：世界最高的山峰是哪一座？

---------- 委托给【信息处理器】 ----------
<<< 最终结果 2: 信息处理器已处理请求: '世界最高的山峰是哪一座？'。结果: 模拟信息检索。

>>> 用例 3：随便说一个有趣的事实。

---------- 委托给【信息处理器】 ----------
<<< 最终结果 3: 信息处理器已处理请求: '随便说一个有趣的事实。'。结果: 模拟信息检索。

>>> 用例 4：查找下个月飞往东京的航班。

---------- 委托给【预订处理器】 ----------
<<< 最终结果 4: 预订处理器已处理请求: '查找下个月飞往东京的航班。'。结果: 模拟预订动作。
