# 第6天：MCP（Model Context Protocol）入门

## 学习目标

通过本节课程，你将：
1. 理解MCP的核心概念和架构
2. 掌握MCP Server的开发方法
3. 学会使用MCP Client调用服务
4. 了解MCP与现有AI技术栈的关系


## 1. MCP是什么？

### 1.1 背景和动机

在AI应用开发中，我们经常需要：
- 让LLM访问外部数据（数据库、API、文件）
- 让LLM使用工具（搜索、计算、执行代码）
- 在不同应用间共享这些能力

**问题是**：每个框架（LangChain、CrewAI、AutoGen）都有自己的工具格式，导致：
- 重复开发相同的集成
- 工具无法跨框架使用
- 缺乏标准化

**MCP的解决方案**：提供一个开放的、标准化的协议，让数据源和工具可以一次开发，到处使用。

### 1.2 MCP架构

```
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Host App  │     │ MCP Client  │     │ MCP Server  │
│ (Claude,    │────▶│  (协议层)   │────▶│ (资源/工具) │
│  LangChain) │     │             │     │             │
└─────────────┘     └─────────────┘     └─────────────┘
       ↑                    ↑                    ↑
   应用层              标准化协议            数据/工具层
```

**三个核心组件**：
1. **Host App（宿主应用）**：使用MCP的应用，如Claude Desktop、自定义Agent
2. **MCP Client（客户端）**：处理协议通信的客户端库
3. **MCP Server（服务器）**：暴露资源和工具的独立服务

### 1.3 核心概念

MCP定义了三种主要能力：

1. **Resources（资源）**
   - 结构化数据的只读访问
   - 例如：文件内容、数据库记录、API数据

2. **Tools（工具）**
   - LLM可以调用的函数
   - 例如：搜索、计算、文件操作

3. **Prompts（提示）**
   - 预定义的提示模板
   - 帮助用户更好地使用服务

## 2. 安装和环境准备

首先安装MCP SDK：

In [None]:
# 安装MCP Python SDK
!pip install mcp

## 3. 创建第一个MCP Server

让我们从一个简单的例子开始：创建一个提供数学计算工具的MCP Server。

In [None]:
# 注意：以下代码应该保存为独立的Python文件运行
# 这里展示代码结构，实际运行请参考 src/day6_mcp_server_simple.py

import asyncio
import json
from typing import Any, Dict

from mcp.server import Server, stdio_server
from mcp.server.models import InitializationOptions
from mcp.types import (
    Tool,
    TextContent,
    CallToolRequest,
    CallToolResult,
    ListToolsResult
)

# 创建MCP服务器实例
server = Server("math-tools")

# 定义可用的工具
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
    """
    列出所有可用的工具
    这个方法告诉客户端服务器提供哪些工具
    """
    return [
        Tool(
            name="add",
            description="将两个数字相加",
            inputSchema={
                "type": "object",
                "properties": {
                    "a": {"type": "number", "description": "第一个数"},
                    "b": {"type": "number", "description": "第二个数"}
                },
                "required": ["a", "b"]
            }
        ),
        Tool(
            name="multiply",
            description="将两个数字相乘",
            inputSchema={
                "type": "object",
                "properties": {
                    "a": {"type": "number", "description": "第一个数"},
                    "b": {"type": "number", "description": "第二个数"}
                },
                "required": ["a", "b"]
            }
        )
    ]

# 实现工具调用处理
@server.call_tool()
async def handle_call_tool(
    name: str,
    arguments: Dict[str, Any]
) -> list[TextContent]:
    """
    处理工具调用请求
    根据工具名称执行相应的操作
    """
    if name == "add":
        result = arguments["a"] + arguments["b"]
        return [TextContent(
            type="text",
            text=f"{arguments['a']} + {arguments['b']} = {result}"
        )]
    
    elif name == "multiply":
        result = arguments["a"] * arguments["b"]
        return [TextContent(
            type="text",
            text=f"{arguments['a']} × {arguments['b']} = {result}"
        )]
    
    else:
        raise ValueError(f"未知的工具: {name}")

# 运行服务器
async def main():
    """启动MCP服务器"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions()
        )

# 这段代码应该在独立脚本中运行
# if __name__ == "__main__":
#     asyncio.run(main())

### 代码解析

1. **Server创建**：`Server("math-tools")` 创建一个命名的MCP服务器

2. **工具定义**：通过 `@server.list_tools()` 装饰器定义可用工具列表
   - 每个工具都有名称、描述和输入模式
   - 输入模式使用JSON Schema定义参数

3. **工具实现**：`@server.call_tool()` 装饰器处理实际的工具调用
   - 接收工具名称和参数
   - 返回TextContent结果

4. **服务器运行**：使用 `stdio_server` 通过标准输入输出通信

## 4. 创建MCP Client调用服务

现在让我们创建一个客户端来调用MCP服务器：

In [None]:
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def test_math_tools():
    """
    测试数学工具MCP服务器
    """
    # 定义服务器连接参数
    server_params = StdioServerParameters(
        command="python",
        args=["src/day6_mcp_server_simple.py"],
        env=None
    )
    
    # 连接到MCP服务器
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 初始化连接
            await session.initialize()
            
            # 列出可用工具
            print("\n可用工具：")
            tools = await session.list_tools()
            for tool in tools.tools:
                print(f"- {tool.name}: {tool.description}")
            
            # 测试加法工具
            print("\n测试加法工具：")
            result = await session.call_tool(
                "add",
                arguments={"a": 10, "b": 25}
            )
            print(f"结果: {result.content[0].text}")
            
            # 测试乘法工具
            print("\n测试乘法工具：")
            result = await session.call_tool(
                "multiply",
                arguments={"a": 7, "b": 8}
            )
            print(f"结果: {result.content[0].text}")

# 在Jupyter中运行异步函数
# await test_math_tools()  # 在异步环境中运行

# 或者使用以下方式在同步环境中运行
# asyncio.run(test_math_tools())

## 5. 暴露资源的MCP Server

除了工具，MCP还可以暴露资源（只读数据）：

In [None]:
from mcp.types import Resource, ResourceContents, TextResourceContents
import os
from datetime import datetime

# 扩展服务器以提供资源
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
    """
    列出可用的资源
    资源是只读的数据，客户端可以请求查看
    """
    return [
        Resource(
            uri="file:///project/README.md",
            name="项目说明文档",
            description="项目的README文件内容",
            mimeType="text/markdown"
        ),
        Resource(
            uri="system://info",
            name="系统信息",
            description="当前系统的基本信息",
            mimeType="text/plain"
        )
    ]

@server.read_resource()
async def handle_read_resource(uri: str) -> ResourceContents:
    """
    读取资源内容
    根据URI返回相应的资源数据
    """
    if uri == "file:///project/README.md":
        # 读取实际文件
        try:
            with open("README.md", "r", encoding="utf-8") as f:
                content = f.read()
        except FileNotFoundError:
            content = "# 项目说明\n\n这是一个AI学习项目。"
        
        return TextResourceContents(
            uri=uri,
            mimeType="text/markdown",
            text=content
        )
    
    elif uri == "system://info":
        # 返回系统信息
        info = f"""
系统信息:
- 操作系统: {os.name}
- Python版本: {os.sys.version.split()[0]}
- 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 工作目录: {os.getcwd()}
"""
        return TextResourceContents(
            uri=uri,
            mimeType="text/plain",
            text=info.strip()
        )
    
    else:
        raise ValueError(f"未知的资源URI: {uri}")

## 6. 集成现有工具到MCP

让我们将之前课程中的一些工具改造为MCP格式：

In [None]:
# 将Day 1的搜索工具改造为MCP工具

import aiohttp
from typing import Optional

class SearchToolMCPServer:
    """
    将搜索功能封装为MCP服务器
    """
    
    def __init__(self):
        self.server = Server("search-tools")
        self._setup_handlers()
    
    def _setup_handlers(self):
        """设置MCP处理器"""
        
        @self.server.list_tools()
        async def handle_list_tools() -> list[Tool]:
            return [
                Tool(
                    name="web_search",
                    description="搜索网络信息",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "搜索查询词"
                            },
                            "max_results": {
                                "type": "integer",
                                "description": "最大结果数",
                                "default": 5
                            }
                        },
                        "required": ["query"]
                    }
                ),
                Tool(
                    name="wikipedia_search",
                    description="搜索维基百科",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "topic": {
                                "type": "string",
                                "description": "要搜索的主题"
                            }
                        },
                        "required": ["topic"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str,
            arguments: Dict[str, Any]
        ) -> list[TextContent]:
            
            if name == "web_search":
                results = await self._web_search(
                    arguments["query"],
                    arguments.get("max_results", 5)
                )
                return [TextContent(type="text", text=results)]
            
            elif name == "wikipedia_search":
                result = await self._wikipedia_search(arguments["topic"])
                return [TextContent(type="text", text=result)]
            
            else:
                raise ValueError(f"未知的工具: {name}")
    
    async def _web_search(self, query: str, max_results: int) -> str:
        """模拟网络搜索"""
        # 在实际应用中，这里会调用真实的搜索API
        results = [
            f"1. {query}相关结果1: 这是关于{query}的详细信息...",
            f"2. {query}相关结果2: 更多关于{query}的内容...",
            f"3. {query}相关结果3: {query}的最新发展..."
        ]
        return "\n".join(results[:max_results])
    
    async def _wikipedia_search(self, topic: str) -> str:
        """模拟维基百科搜索"""
        # 实际应用中会调用Wikipedia API
        return f"""
维基百科 - {topic}

{topic}是一个重要的概念。以下是关于{topic}的主要信息：

1. 定义：{topic}的基本定义和概述
2. 历史：{topic}的发展历程
3. 应用：{topic}在现实中的应用
4. 相关概念：与{topic}相关的其他概念
"""
    
    async def run(self):
        """运行MCP服务器"""
        async with stdio_server() as (read_stream, write_stream):
            await self.server.run(
                read_stream,
                write_stream,
                InitializationOptions()
            )

## 7. MCP与LangChain集成

让我们看看如何在LangChain中使用MCP服务：

In [None]:
from langchain.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field

class MCPToolWrapper(BaseTool):
    """
    将MCP工具包装为LangChain工具
    这样可以在LangChain Agent中使用MCP服务
    """
    name: str = "mcp_tool"
    description: str = "通过MCP调用外部工具"
    mcp_client: Any = Field(exclude=True)  # MCP客户端实例
    tool_name: str  # MCP工具名称
    
    class Config:
        arbitrary_types_allowed = True
    
    def _run(self, query: str) -> str:
        """同步运行（LangChain要求）"""
        # 在实际应用中，需要处理异步到同步的转换
        import asyncio
        return asyncio.run(self._arun(query))
    
    async def _arun(self, query: str) -> str:
        """异步运行MCP工具"""
        # 解析输入参数
        try:
            import json
            arguments = json.loads(query)
        except:
            arguments = {"query": query}  # 简单包装
        
        # 调用MCP工具
        result = await self.mcp_client.call_tool(
            self.tool_name,
            arguments=arguments
        )
        
        # 提取文本结果
        return result.content[0].text

# 使用示例
async def create_langchain_agent_with_mcp():
    """
    创建一个使用MCP工具的LangChain Agent
    """
    from langchain.agents import AgentExecutor, create_openai_tools_agent
    from langchain_openai import ChatOpenAI
    from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
    
    # 连接到MCP服务器
    server_params = StdioServerParameters(
        command="python",
        args=["src/day6_mcp_search_server.py"]
    )
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as mcp_session:
            await mcp_session.initialize()
            
            # 获取MCP工具列表
            mcp_tools = await mcp_session.list_tools()
            
            # 将MCP工具转换为LangChain工具
            langchain_tools = []
            for tool in mcp_tools.tools:
                wrapped_tool = MCPToolWrapper(
                    name=tool.name,
                    description=tool.description,
                    mcp_client=mcp_session,
                    tool_name=tool.name
                )
                langchain_tools.append(wrapped_tool)
            
            # 创建Agent
            llm = ChatOpenAI(model="gpt-3.5-turbo")
            
            prompt = ChatPromptTemplate.from_messages([
                ("system", "你是一个有帮助的助手，可以使用工具来回答问题。"),
                ("user", "{input}"),
                MessagesPlaceholder(variable_name="agent_scratchpad"),
            ])
            
            agent = create_openai_tools_agent(llm, langchain_tools, prompt)
            agent_executor = AgentExecutor(
                agent=agent,
                tools=langchain_tools,
                verbose=True
            )
            
            # 测试Agent
            result = await agent_executor.ainvoke({
                "input": "搜索关于MCP协议的信息"
            })
            
            print(f"Agent结果: {result['output']}")

## 8. MCP配置文件

MCP支持通过配置文件管理服务器连接：

In [None]:
# MCP配置文件示例 (mcp_config.json)
mcp_config = {
    "mcpServers": {
        "math-tools": {
            "command": "python",
            "args": ["src/day6_mcp_server_simple.py"],
            "env": {
                "PYTHONPATH": "."
            }
        },
        "search-tools": {
            "command": "python",
            "args": ["src/day6_mcp_search_server.py"],
            "env": {
                "PYTHONPATH": ".",
                "SEARCH_API_KEY": "your-api-key"  # 可以传递环境变量
            }
        },
        "database-tools": {
            "command": "python",
            "args": ["src/day6_mcp_database_server.py"],
            "env": {
                "DATABASE_URL": "postgresql://localhost/mydb"
            }
        }
    }
}

# 保存配置
import json
with open('mcp_config.json', 'w', encoding='utf-8') as f:
    json.dump(mcp_config, f, indent=2, ensure_ascii=False)

print("MCP配置文件已创建")

## 9. MCP最佳实践

### 9.1 设计原则

1. **单一职责**：每个MCP服务器应该专注于一个领域
2. **清晰的接口**：工具和资源的命名要直观
3. **错误处理**：提供有意义的错误信息
4. **异步优先**：充分利用异步特性提高性能

### 9.2 安全考虑

1. **输入验证**：始终验证工具参数
2. **权限控制**：限制敏感操作
3. **资源限制**：防止资源耗尽
4. **审计日志**：记录所有操作

## 10. 实战练习：构建文件管理MCP服务器

让我们创建一个更实用的例子 - 文件管理服务器：

In [None]:
import os
import aiofiles
from pathlib import Path
from typing import List

class FileManagerMCPServer:
    """
    文件管理MCP服务器
    提供文件浏览、读取和简单操作功能
    """
    
    def __init__(self, base_path: str = "."):
        self.base_path = Path(base_path).resolve()
        self.server = Server("file-manager")
        self._setup_handlers()
    
    def _setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools() -> list[Tool]:
            return [
                Tool(
                    name="list_files",
                    description="列出目录中的文件",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string",
                                "description": "目录路径（相对于基础路径）",
                                "default": "."
                            },
                            "pattern": {
                                "type": "string",
                                "description": "文件匹配模式（如 *.py）",
                                "default": "*"
                            }
                        }
                    }
                ),
                Tool(
                    name="read_file",
                    description="读取文件内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string",
                                "description": "文件路径"
                            },
                            "encoding": {
                                "type": "string",
                                "description": "文件编码",
                                "default": "utf-8"
                            }
                        },
                        "required": ["path"]
                    }
                ),
                Tool(
                    name="file_info",
                    description="获取文件信息",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string",
                                "description": "文件路径"
                            }
                        },
                        "required": ["path"]
                    }
                )
            ]
        
        @self.server.list_resources()
        async def handle_list_resources() -> list[Resource]:
            """列出重要文件作为资源"""
            resources = []
            
            # 查找项目中的重要文件
            important_files = [
                "README.md",
                "requirements.txt",
                ".env.example"
            ]
            
            for filename in important_files:
                file_path = self.base_path / filename
                if file_path.exists():
                    resources.append(
                        Resource(
                            uri=f"file:///{filename}",
                            name=filename,
                            description=f"项目文件: {filename}",
                            mimeType=self._get_mime_type(filename)
                        )
                    )
            
            return resources
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str,
            arguments: Dict[str, Any]
        ) -> list[TextContent]:
            
            if name == "list_files":
                result = await self._list_files(
                    arguments.get("path", "."),
                    arguments.get("pattern", "*")
                )
                return [TextContent(type="text", text=result)]
            
            elif name == "read_file":
                result = await self._read_file(
                    arguments["path"],
                    arguments.get("encoding", "utf-8")
                )
                return [TextContent(type="text", text=result)]
            
            elif name == "file_info":
                result = await self._file_info(arguments["path"])
                return [TextContent(type="text", text=result)]
            
            else:
                raise ValueError(f"未知的工具: {name}")
    
    async def _list_files(self, path: str, pattern: str) -> str:
        """列出文件"""
        try:
            target_path = (self.base_path / path).resolve()
            
            # 安全检查：确保路径在基础路径内
            if not str(target_path).startswith(str(self.base_path)):
                return "错误：访问被拒绝（路径超出允许范围）"
            
            files = list(target_path.glob(pattern))
            
            if not files:
                return f"在 {path} 中没有找到匹配 {pattern} 的文件"
            
            result = f"目录 {path} 中的文件:\n\n"
            for f in sorted(files):
                if f.is_file():
                    size = f.stat().st_size
                    result += f"📄 {f.name} ({size} bytes)\n"
                elif f.is_dir():
                    result += f"📁 {f.name}/\n"
            
            return result
            
        except Exception as e:
            return f"错误：{e}"
    
    async def _read_file(self, path: str, encoding: str) -> str:
        """读取文件内容"""
        try:
            file_path = (self.base_path / path).resolve()
            
            # 安全检查
            if not str(file_path).startswith(str(self.base_path)):
                return "错误：访问被拒绝"
            
            if not file_path.exists():
                return f"错误：文件不存在 - {path}"
            
            # 限制文件大小
            if file_path.stat().st_size > 1024 * 1024:  # 1MB
                return "错误：文件太大（超过1MB）"
            
            async with aiofiles.open(file_path, 'r', encoding=encoding) as f:
                content = await f.read()
            
            return f"文件内容 ({path}):\n\n{content}"
            
        except Exception as e:
            return f"错误：{e}"
    
    async def _file_info(self, path: str) -> str:
        """获取文件信息"""
        try:
            file_path = (self.base_path / path).resolve()
            
            if not str(file_path).startswith(str(self.base_path)):
                return "错误：访问被拒绝"
            
            if not file_path.exists():
                return f"错误：文件不存在 - {path}"
            
            stat = file_path.stat()
            
            info = f"""
文件信息: {path}
- 大小: {stat.st_size} bytes
- 修改时间: {datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')}
- 创建时间: {datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S')}
- 是否为目录: {'是' if file_path.is_dir() else '否'}
- 是否为文件: {'是' if file_path.is_file() else '否'}
- 文件扩展名: {file_path.suffix or '无'}
"""
            return info.strip()
            
        except Exception as e:
            return f"错误：{e}"
    
    def _get_mime_type(self, filename: str) -> str:
        """根据文件扩展名返回MIME类型"""
        ext = Path(filename).suffix.lower()
        mime_types = {
            ".txt": "text/plain",
            ".md": "text/markdown",
            ".py": "text/x-python",
            ".json": "application/json",
            ".yaml": "text/yaml",
            ".yml": "text/yaml",
            ".html": "text/html",
            ".css": "text/css",
            ".js": "text/javascript"
        }
        return mime_types.get(ext, "text/plain")

## 总结

通过今天的学习，我们掌握了：

1. **MCP的核心概念**
   - 标准化的工具和资源协议
   - Server/Client架构
   - Tools、Resources、Prompts三大组件

2. **MCP开发实践**
   - 创建MCP服务器
   - 实现工具和资源处理
   - 客户端调用方法

3. **集成方案**
   - 将现有工具改造为MCP格式
   - 与LangChain等框架集成
   - 配置管理和部署

4. **实战案例**
   - 数学计算工具
   - 搜索服务
   - 文件管理系统

### 下一步

明天（Day 7）我们将深入探讨：
- MCP在企业环境中的应用
- 复杂的集成场景
- 安全和权限管理
- 性能优化和监控

### 作业

1. 运行本节的所有示例代码
2. 尝试创建一个自己的MCP服务器（如：天气查询、数据库操作等）
3. 将Day 3的RAG系统改造为MCP服务
4. 思考：MCP如何改善你现有的AI应用架构？