In [2]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableBranch

import os


llm = ChatOpenAI(base_url='https://api.deepseek.com', api_key=os.getenv('DEEPSEEK_SK'), model='deepseek-chat')
print('大模型初始化成功')

# 预定
def booking_handler(request: str) -> str:
    """模拟预定智能体处理器"""
    print('委托给预定处理器')
    return f"预定处理器已接受请求: {request}, 结果：模拟预定动作"


# 信息检索
def info_handler(request: str) -> str:
    """模拟信息检索智能体处理器"""
    print('委托给信息检索处理器')
    return f"信息检索处理器已接受请求: {request}, 结果：模拟信息检索"

# 兜底
def unclear_handler(request: str) -> str:
    """处理无法委托的智能体处理器"""
    print('处理不明确的请求')
    return f"协调者无法完成委托: {request}, 结果：请补充说明"

# 定义协调者路径
cooperator_prompt = ChatPromptTemplate.from_messages([
    ("system", """
        分析用户请求，判断由哪个执行器进行处理.
        - 如果涉及预定机票或酒店，输出'booker'.
        - 如果涉及一般信息问题，输出'info'.
        - 如果需求不明确或者不属于上述类型, 输出'unclear'.
        只输出一个词: 'booker'、'info'或'unclear'.
    """),
    ("user", "{request}")
])

cooperator_chain = cooperator_prompt | llm | StrOutputParser()

branches = {
    'booker': RunnablePassthrough.assign(output=lambda x:booking_handler(x['request']['request'])),
    'info': RunnablePassthrough.assign(output=lambda x:info_handler(x['request']['request'])),
    'unclear': RunnablePassthrough.assign(output=lambda x:unclear_handler(x['request']['request'])),
}

delegation_branch = RunnableBranch(
    (lambda x: x['decision'].strip() == 'booker', branches['booker']),
    (lambda x: x['decision'].strip() == 'info', branches['info']),
    branches['unclear'],
)

cooperator_agent = {
    'decision': cooperator_chain,
    'request': RunnablePassthrough()
} | delegation_branch | (lambda x: x['output'])


def main():
    request_a = '帮我预定7点去北京的机票'
    result_a = cooperator_agent.invoke({'request': request_a})
    print(f'问题: {request_a}, 结果: {result_a}')

    request_b = '意大利的首都是哪里'
    result_b = cooperator_agent.invoke({'request': request_b})
    print(f'问题: {request_b}, 结果: {result_b}')

    request_c = '讲讲量子物理'
    result_c = cooperator_agent.invoke({'request': request_c})
    print(f'问题: {request_c}, 结果: {result_c}')


if __name__ == '__main__':
    main()

大模型初始化成功
委托给预定处理器
问题: 帮我预定7点去北京的机票, 结果: 预定处理器已接受请求: 帮我预定7点去北京的机票, 结果：模拟预定动作
委托给信息检索处理器
问题: 意大利的首都是哪里, 结果: 信息检索处理器已接受请求: 意大利的首都是哪里, 结果：模拟信息检索
委托给信息检索处理器
问题: 讲讲量子物理, 结果: 信息检索处理器已接受请求: 讲讲量子物理, 结果：模拟信息检索


In [2]:
import uuid

from typing import Dict, Any, Optional
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner
from google.adk.tools import FunctionTool
from google.genai import types
from google.adk.events import Event

def booking_handler(request: str) -> str:
    """
    处理机票和酒店预定请求
    Args:
        request: 用户的预定请求
    Returns:
        预定处理确认信息
    """
    print('‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑ 预订处理器已调用 ‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑')
    return f'已模拟处理预定请求: {request}'


def info_handler(request: str) -> str:
    """
    处理一般信息请求
    Args:
        request: 用户问题
    Returns:
        信息检索处理结果
    """
    print('‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑ 信息处理器已调用 ‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑')
    return f'信息请求: {request}. 结果: 模拟信息检索'

def unclear_handler(request: str) -> str:
    """处理无法委托的请求。"""
    return f"协调者无法委托请求：'{request}'。请补充说明。"

# 创建工具
booking_tool = FunctionTool(booking_handler)
info_tool = FunctionTool(info_handler)

# 定义配备工具的专用子智能体
booking_agent = Agent(
    name='Booker',
    model='deepseek-chat',
    description='专门处理机票和酒店预订请求，通过 booking tool 实现。',
    tools=[booking_tool]
)

info_agent = Agent(
    name='Info',
    model='deepseek-chat',
    description='专门提供一般信息和答疑，通过 info tool 实现。',
    tools=[info_tool]
)

# 定义父智能体（协调者）
coordinator = Agent(
    name='Coordinator',
    model='deepseek-chat',
    instruction=(
        '''
        你是主协调者，只负责分析用户请求并委托给合适的专用智能体。
        不要直接回答用户。
        任何涉及机票或酒店预定的请求，委托给Booker智能体，
        其他一般信息问题，委托给Info智能体，
        '''
    ),
    description='负责将用户请求路由到正确专用智能体的协调者',
    sub_agents=[booking_agent, info_agent]
)


#  执行逻辑
async def run_coordinator(runner: InMemoryRunner, request: str):
    """用给定的请求，运行协调者智能体，并委托"""
    print(f"\n--- 协调者运行请求: '{request}' ---")
    final_result = ''

    try:
        user_id = "user_123"
        session_id = str(uuid.uuid4())

        await runner.session_service.create_session(
            app_name=runner.app_name,
            user_id=user_id,
            session_id=session_id
        )

        for event in runner.run(
            user_id=user_id,
            session_id=session_id,
            new_message=types.Content(
                role='user',
                parts=[types.Part(text=request)]
            )
        ):
            if event.is_final_response() and event.content:
                if hasattr(event.content, 'text') and event.content.text:
                    final_result = event.content.text
                elif event.content.parts:
                    text_parts = [part.text for part in event.content.parts if part.text]
                    final_result = "".join(text_parts)
                break

        print(f'协调者最终响应: {final_result}')
        return final_result

    except Exception as e:
        print(f'处理请求时发生错误: {e}')
        return f'处理请求时发生错误: {e}'

async def main():
    """主函数，运行ADK示例"""
    runner = InMemoryRunner(coordinator)

    result_a = await run_coordinator(runner, '帮我预定巴黎的酒店')
    print(result_a)

if __name__ == '__main__':
    import nest_asyncio
    import asyncio
    nest_asyncio.apply()
    asyncio.run(main())






Exception in thread Thread-3 (_asyncio_thread_main):
Traceback (most recent call last):
  File "D:\application\Anaconda\software\envs\agent-demo\Lib\threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "D:\application\Anaconda\software\envs\agent-demo\Lib\site-packages\ipykernel\ipkernel.py", line 788, in run_closure
    _threading_Thread_run(self)
  File "D:\application\Anaconda\software\envs\agent-demo\Lib\threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "D:\application\Anaconda\software\envs\agent-demo\Lib\site-packages\google\adk\runners.py", line 305, in _asyncio_thread_main
    asyncio.run(_invoke_run_async())
  File "D:\application\Anaconda\software\envs\agent-demo\Lib\site-packages\nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\application\Anaconda\software\envs\agent-demo\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
    return


--- 协调者运行请求: '帮我预定巴黎的酒店' ---
协调者最终响应: 

