In [69]:
from dotenv import load_dotenv, find_dotenv
import os

In [70]:
_ = load_dotenv(find_dotenv(".env.local"))

model_list = ["kimi", "deepseek", "doubao"]
select_model = "deepseek"

if select_model not in model_list:
    raise Exception("select model is not valid!")
elif select_model == "kimi":
    api_key = os.getenv("KIMI_API_KEY")
    api_url = os.getenv("KIMI_API_URL")
    model_name = os.getenv("KIMI_MODEL")
elif select_model == "deepseek":
    api_key = os.getenv("DEEPSEEK_API_KEY")
    api_url = os.getenv("DEEPSEEK_API_URL")
    model_name = os.getenv("DEEPSEEK_MODEL")
elif select_model == "doubao":
    api_key = os.getenv("ARK_API_KEY")
    api_url = os.getenv("ARK_API_URL")
    model_name = os.getenv("ARK_MODEL")
print(f"current used model is {model_name}")

current used model is deepseek-chat


# 测试mcp server

In [71]:
import asyncio
import httpx
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

In [72]:
async def test_mcp_service():
    """
    测试MCP服务
    """
    # 定义服务URL
    SERVICE_URL= "http://localhost:8000/mcp"

    try:
        # 创建流式client
        transport = StreamableHttpTransport(url=SERVICE_URL)
        # 创建会话
        async with Client(transport) as client:
            print(f"成功连接到MCP服务：{SERVICE_URL}")

            # 发送ping请求，进行测试
            await client.ping()
            print("服务心跳检测成功")

            # 获取服务端注册的所有工具
            tools = await client.list_tools()
            tool_names = [tool.name for tool in tools]
            print(f"可用工具列表：{', '.join(tool_names)}")

            # 调用工具
            # 1. 天气工具
            weather_results = await client.call_tool("weather", {"city": "北京"})
            weather_data = eval(weather_results.content[0].text)
            print(f"北京天气：温度={weather_data['temp']}, 天气={weather_data['condition']}")

            # 2. 股票工具
            stock_results = await client.call_tool("stock", {"code": "600519"})
            stock_data = eval(stock_results.content[0].text)
            print(f"股票查询：名称={stock_data['name']}, 价格={stock_data['price']}")

            # 3.错误测试
            try:
                error_results = await client.call_tool("weather", {"city": "东京"})
                error_data = eval(error_results.content[0].text)
                if error_data and "error" in error_data:
                    print(f"错误处理测试：{error_data['error']} - 符合预期")
            except Exception as e:
                print(f"意外错误：{e}")
    except httpx.ConnectError:
        print(f"无法连接到MCP服务{SERVICE_URL}，请检查服务是否启动")
    except Exception as e:
        print(f"测试失败：{e}")

In [73]:
print("="*50)
print("MCP服务测试开始")
print("="*50)
# notobook直接运行
await test_mcp_service()
# python运行
# asyncio.run(test_mcp_service())

MCP服务测试开始
成功连接到MCP服务：http://localhost:8000/mcp
服务心跳检测成功
可用工具列表：weather, stock
北京天气：温度=25, 天气=晴
股票查询：名称=贵州茅台, 价格=1825.0
错误处理测试：未找到该城市 - 符合预期


# 使用LLM调用MCP

In [74]:
import asyncio
from openai import AsyncOpenAI
from fastmcp import Client

# 定义服务URL
SERVICE_URL= "http://localhost:8000/mcp"

async def query_mcp_tool(tool_name: str, params: dict):
    """
    调用MCP工具
    :param tool_name: 工具名称
    :param params: 工具参数
    :return: 工具返回结果
    """

    async with Client(SERVICE_URL) as client:
        return await client.call_tool(tool_name, params)

async def chat_with_tools():
    """
    使用MCP工具进行对话
    :return: 对话结果
    """
    # 创建OpenAI客户端
    llm_client = AsyncOpenAI(
        api_key=api_key,
        base_url=api_url,
    )

    # 获取MCP服务的工具列表
    async with Client(SERVICE_URL) as mcp_client:
        tools = await mcp_client.list_tools()

        # 转换为OpenAI工具格式
        tool_schemas = []
        for tool in tools:
            tool_schemas.append({
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": {
                            "type": tool.inputSchema.get("type", "object"),
                            "properties": {
                                prop_name: prop_def 
                                for prop_name, prop_def in tool.inputSchema.get("properties", {}).items()
                            },
                            "required": tool.inputSchema.get("required", []),
                        }
                    }
                }
            )

    # 创建对话
    user_query = "查询北京天气和贵族茅台股份"

    # 调用LLM, 模型自动选择MCP工具
    response = await llm_client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": user_query}],
        tools=tool_schemas,
        tool_choice="auto",
    )

    # 处理工具调用请求
    message = response.choices[0].message
    if message.tool_calls:
        print(f"工具调用请求：\n{message.tool_calls}")
        
        # 工具执行
        tool_results = []
        for call in message.tool_calls:
            print(f"执行工具：{call.function.name}")
            # 调用MCP工具并获取结果
            result = await query_mcp_tool(
                call.function.name,
                eval(call.function.arguments)  # 解析字符串为dict
            )
            tool_results.append(result)
            print(f"工具调用返回结果：{result}")
        
        # 构建工具调用的结果消息
        tool_messages = []
        for i, call in enumerate(message.tool_calls):
            tool_messages.append({
                "role": "tool",
                "tool_call_id": call.id,
                "name": call.function.name,
                "content": tool_results[i].content[0].text
            })

        # 调用LLM，使用工具调用结果更新对话
        final_response = await llm_client.chat.completions.create(
            model=model_name,
            messages=[
                {"role": "user", "content": user_query},
                message,  # 包含工具调用的请求消息
                *tool_messages
            ]
        )
        print(f"最终回复：\n{final_response.choices[0].message.content}")
    else:
        print(f"直接回复：\n{response.choices[0].message.content}")


In [75]:
print("="*50)
print("MCP工具调用测试开始")
print("="*50)
await chat_with_tools()

MCP工具调用测试开始
工具调用请求：
[ChatCompletionMessageToolCall(id='call_0_d1fa3e13-49fe-4c07-842f-4254f36d87b2', function=Function(arguments='{"city": "北京"}', name='weather'), type='function', index=0), ChatCompletionMessageToolCall(id='call_1_f2eabb8f-3e46-4a8b-8272-3ea374e360cc', function=Function(arguments='{"code": "600519"}', name='stock'), type='function', index=1)]
执行工具：weather
工具调用返回结果：CallToolResult(content=[TextContent(type='text', text='{\n  "temp": 25,\n  "condition": "晴"\n}', annotations=None, meta=None)], structured_content={'temp': 25, 'condition': '晴'}, data={'temp': 25, 'condition': '晴'}, is_error=False)
执行工具：stock
工具调用返回结果：CallToolResult(content=[TextContent(type='text', text='{\n  "name": "贵州茅台",\n  "price": 1825.0\n}', annotations=None, meta=None)], structured_content={'name': '贵州茅台', 'price': 1825.0}, data={'name': '贵州茅台', 'price': 1825.0}, is_error=False)
最终回复：
北京当前天气为晴，气温25℃；贵州茅台（600519）的股票价格为1825.0元。
