# MCP Hub 测试

本笔记本用于测试 `src/autocoder/common/mcp_hub.py` 的功能。

In [None]:
# 导入必要的模块
import os
import sys
import asyncio
from pathlib import Path

# 将项目根目录添加到 Python 路径
project_root = str(Path(os.getcwd()).parent.parent)
if project_root not in sys.path:
    sys.path.append(project_root)

# 导入 MCP Hub
from src.autocoder.common.mcp_hub import McpHub

In [None]:
# 初始化 MCP Hub
settings_path = "/tmp/mcp_settings.json"
mcp_hub = McpHub(settings_path)

In [None]:
# 定义测试服务器配置
test_servers = {
    "test_server": {
        "command": "python",
        "args": ["-m", "mcp.server.example"],
        "env": {
            "TEST_ENV": "true"
        }
    }
}

# 更新服务器连接
async def test_connections():
    await mcp_hub.update_server_connections(test_servers)
    servers = mcp_hub.get_servers()
    print(f"Connected servers: {[s.name for s in servers]}")

await test_connections()

In [None]:
# 测试工具调用
async def test_tool_calls():
    try:
        result = await mcp_hub.call_tool("test_server", "test_tool", {"param": "value"})
        print(f"Tool call result: {result}")
    except Exception as e:
        print(f"Tool call failed: {str(e)}")

await test_tool_calls()

In [None]:
# 测试资源读取
async def test_resource_read():
    try:
        resource = await mcp_hub.read_resource("test_server", "test://resource")
        print(f"Resource content: {resource}")
    except Exception as e:
        print(f"Resource read failed: {str(e)}")

await test_resource_read()

In [None]:
# 关闭所有连接
async def shutdown():
    await mcp_hub.shutdown()
    print("All MCP connections closed")

await shutdown()

## 测试说明

1. 首先初始化 MCP Hub 并设置测试服务器配置
2. 测试服务器连接功能
3. 测试工具调用功能
4. 测试资源读取功能
5. 最后关闭所有连接

注意：实际测试时需要根据具体的 MCP 服务器实现调整测试参数。