# 使用 Amazon Bedrock AgentCore 部署 MCP Server 示例

## 概述

本教程将展示如何使用 Amazon Bedrock AgentCore Python SDK 将现有的 MCP server 工具部署到 AgentCore Runtime。我们将使用项目中 `server/app.py` 里的 MCP server 工具，包括：

- `search_website`: 网站搜索工具
- `count_s3_buckets`: S3 存储桶计数工具
- `generate_image_with_context`: 图像生成工具
- `get_comfyui_config`: ComfyUI 配置获取工具
- `generate_video_with_context`: 视频生成工具

### 教程详情

| 信息 | 详情 |
|:--------------------|:----------------------------------------------------------|
| 教程类型 | 工具托管 |
| 工具类型 | MCP server |
| 教程组件 | 在 AgentCore Runtime 上托管 MCP server |
| 示例复杂度 | 中等 |
| 使用的 SDK | Amazon BedrockAgentCore Python SDK 和 MCP |

### 主要特性

* 使用现有的 MCP server 工具
* 配置 OAuth 认证（Cognito）
* 部署到 Amazon Bedrock AgentCore Runtime
* 直接测试已部署的 MCP server


## 前提条件

执行本教程需要：
* Python 3.10+
* 已配置的 AWS 凭证
* Amazon Bedrock AgentCore SDK
* MCP (Model Context Protocol) 库
* Docker 运行环境
* SerpAPI API Key（用于网站搜索功能，可选）

In [None]:
# 安装必要的依赖
%pip install mcp bedrock-agentcore-starter-toolkit boto3 requests

## 创建适配的 MCP Server

我们需要创建一个适配 AgentCore Runtime 的 MCP server，它使用 FastMCP 并支持 stateless HTTP。
这个 MCP server 包含了原 `server/app.py` 中的所有工具功能。

In [None]:
%%writefile agentcore_mcp_server.py
from mcp.server.fastmcp import FastMCP
import boto3
import os
import requests
from typing import Dict
import base64
import json

# 创建 FastMCP 实例，配置为 stateless HTTP
mcp = FastMCP(host="0.0.0.0", stateless_http=True)

# 从环境变量获取 API 密钥
SERPAPI_API_KEY = os.environ.get('SERPAPI_API_KEY', 'your_serpapi_key_here')

@mcp.tool()
def search_website(search_term: str) -> Dict:
    """查询网站获取最新信息
    Args:
        search_term: 用户查询文本
        
    Returns:
        搜索结果字典
    """
    params = {
        "api_key": SERPAPI_API_KEY,
        "engine": "google",
        "q": search_term,
        "google_domain": "google.com",
        "gl": "us",
        "hl": "en"
    }

    url = "https://serpapi.com/search"
    try:
        response = requests.get(url, params=params, timeout=30)
        if response.status_code == 200:
            results = response.json()
            organic_results = results.get('organic_results', [])
            return {"search_result": organic_results}
        else:
            return {"search_result": f"Error: {response.status_code} - {response.text}"}
    except Exception as e:
        return {"search_result": f"Error: {str(e)}"}

@mcp.tool()
def count_s3_buckets() -> int:
    """计算 S3 存储桶的数量"""
    try:
        s3 = boto3.client('s3')
        response = s3.list_buckets()
        return len(response['Buckets'])
    except Exception as e:
        return f"Error counting S3 buckets: {str(e)}"

@mcp.tool()
def generate_image_with_context(
    prompt: str, 
    context_image_base64: str = None, 
    workflow_type: str = "text_to_image", 
    width: int = 1024, 
    height: int = 1024, 
    steps: int = 20, 
    cfg_scale: float = 7.0, 
    seed: int = -1
) -> Dict:
    """使用 ComfyUI 生成图像，支持可选的上下文图像

    Args:
        prompt: 图像生成的文本描述
        context_image_base64: Base64 编码的上下文图像（可选）
        workflow_type: 工作流类型 (text_to_image, image_to_image, inpainting)
        width: 图像宽度（默认：1024）
        height: 图像高度（默认：1024）
        steps: 采样步数（默认：20）
        cfg_scale: CFG 引导比例（默认：7.0）
        seed: 随机种子（-1 为随机，默认：-1）

    Returns:
        生成的图像数据和元数据
    """
    # 模拟图像生成响应（实际实现需要 ComfyUI 服务）
    return {
        "status": "success",
        "message": f"Mock image generated for prompt: {prompt}",
        "workflow_type": workflow_type,
        "dimensions": f"{width}x{height}",
        "parameters": {
            "steps": steps,
            "cfg_scale": cfg_scale,
            "seed": seed if seed != -1 else "random"
        },
        "has_context_image": context_image_base64 is not None
    }

@mcp.tool()
def get_comfyui_config() -> Dict:
    """获取 ComfyUI 配置和可用工作流

    Returns:
        配置信息，包括可用工作流和预设
    """
    return {
        'server_url': 'http://localhost:8188',
        'available_workflows': ['text_to_image', 'image_to_image', 'text_to_video', 'image_to_video'],
        'workflow_presets': {
            'fast': {'steps': 10, 'cfg_scale': 5.0},
            'balanced': {'steps': 20, 'cfg_scale': 7.0},
            'quality': {'steps': 30, 'cfg_scale': 9.0}
        },
        'optimal_dimensions': {
            'square': (1024, 1024),
            'portrait': (768, 1024),
            'landscape': (1024, 768),
            'wide': (1536, 768),
            'tall': (768, 1536)
        },
        'config': {
            'timeout': 300,
            'poll_interval': 1,
            'max_retries': 3,
            'enable_fallback': True
        }
    }

@mcp.tool()
def generate_video_with_context(
    prompt: str, 
    context_image_base64: str = None, 
    workflow_type: str = "text_to_video", 
    steps: int = 15, 
    cfg_scale: float = 6.0, 
    seed: int = -1, 
    frame_rate: int = 16
) -> Dict:
    """使用 ComfyUI 生成视频，支持可选的上下文图像

    Args:
        prompt: 视频生成的文本描述
        context_image_base64: Base64 编码的上下文图像（image_to_video 需要）
        workflow_type: 工作流类型 (text_to_video, image_to_video)
        steps: 采样步数（默认：15）
        cfg_scale: CFG 引导比例（默认：6.0）
        seed: 随机种子（-1 为随机，默认：-1）
        frame_rate: 视频帧率（默认：16）

    Returns:
        生成的视频数据和元数据
    """
    if workflow_type == "image_to_video" and not context_image_base64:
        return {"error": "image_to_video 工作流需要 context_image_base64 参数"}
    
    # 模拟视频生成响应（实际实现需要 ComfyUI 服务）
    return {
        "status": "success",
        "message": f"Mock video generated for prompt: {prompt}",
        "workflow_type": workflow_type,
        "parameters": {
            "steps": steps,
            "cfg_scale": cfg_scale,
            "seed": seed if seed != -1 else "random",
            "frame_rate": frame_rate
        },
        "has_context_image": context_image_base64 is not None
    }

if __name__ == "__main__":
    mcp.run(transport="streamable-http")

## 创建依赖文件

创建 requirements.txt 文件，包含所有必要的依赖：

In [None]:
%%writefile requirements.txt
mcp
boto3
requests
fastapi
uvicorn
starlette

## 设置 Amazon Cognito 认证

AgentCore Runtime 需要 OAuth 认证。我们将使用 Amazon Cognito 提供 JWT 令牌来访问已部署的 MCP server。

In [None]:
import boto3
import json
import time
from boto3.session import Session

def setup_cognito_user_pool():
    """设置 Cognito 用户池和客户端"""
    cognito_client = boto3.client('cognito-idp')
    
    try:
        # 创建用户池
        print("创建 Cognito 用户池...")
        user_pool_response = cognito_client.create_user_pool(
            PoolName='MCPServerUserPool',
            Policies={
                'PasswordPolicy': {
                    'MinimumLength': 8,
                    'RequireUppercase': False,
                    'RequireLowercase': False,
                    'RequireNumbers': False,
                    'RequireSymbols': False
                }
            }
        )
        
        user_pool_id = user_pool_response['UserPool']['Id']
        print(f"✓ 用户池创建成功: {user_pool_id}")
        
        # 创建应用客户端
        print("创建应用客户端...")
        client_response = cognito_client.create_user_pool_client(
            UserPoolId=user_pool_id,
            ClientName='MCPServerClient',
            GenerateSecret=False,
            ExplicitAuthFlows=[
                'ALLOW_USER_PASSWORD_AUTH',
                'ALLOW_REFRESH_TOKEN_AUTH'
            ]
        )
        
        client_id = client_response['UserPoolClient']['ClientId']
        print(f"✓ 应用客户端创建成功: {client_id}")
        
        # 创建测试用户
        print("创建测试用户...")
        cognito_client.admin_create_user(
            UserPoolId=user_pool_id,
            Username='testuser',
            TemporaryPassword='TempPass123!',
            MessageAction='SUPPRESS'
        )
        
        # 设置永久密码
        cognito_client.admin_set_user_password(
            UserPoolId=user_pool_id,
            Username='testuser',
            Password='MyPassword123!',
            Permanent=True
        )
        print("✓ 测试用户创建成功")
        
        # 获取访问令牌
        print("获取访问令牌...")
        auth_response = cognito_client.initiate_auth(
            ClientId=client_id,
            AuthFlow='USER_PASSWORD_AUTH',
            AuthParameters={
                'USERNAME': 'testuser',
                'PASSWORD': 'MyPassword123!'
            }
        )
        
        access_token = auth_response['AuthenticationResult']['AccessToken']
        print("✓ 访问令牌获取成功")
        
        # 获取区域
        session = Session()
        region = session.region_name or 'us-east-1'
        
        discovery_url = f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}/.well-known/openid-configuration"
        
        return {
            'user_pool_id': user_pool_id,
            'client_id': client_id,
            'discovery_url': discovery_url,
            'bearer_token': access_token,
            'region': region
        }
        
    except Exception as e:
        print(f"❌ Cognito 设置失败: {e}")
        raise

def create_agentcore_role(agent_name: str):
    """创建 AgentCore IAM 执行角色"""
    iam_client = boto3.client('iam')
    
    role_name = f"AgentCore-{agent_name}-ExecutionRole"
    
    # 信任策略
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "bedrock-agentcore.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    try:
        # 创建角色
        role_response = iam_client.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description=f"Execution role for AgentCore {agent_name}"
        )
        
        # 附加基本执行策略
        policy_document = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents",
                        "s3:ListAllMyBuckets",
                        "s3:GetBucketLocation"
                    ],
                    "Resource": "*"
                }
            ]
        }
        
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=f"{agent_name}-ExecutionPolicy",
            PolicyDocument=json.dumps(policy_document)
        )
        
        print(f"✓ IAM 角色创建成功: {role_name}")
        return role_response
        
    except Exception as e:
        print(f"❌ IAM 角色创建失败: {e}")
        raise

In [None]:
print("设置 Amazon Cognito 用户池...")
cognito_config = setup_cognito_user_pool()
print("\n✅ Cognito 设置完成")
print(f"用户池 ID: {cognito_config.get('user_pool_id', 'N/A')}")
print(f"客户端 ID: {cognito_config.get('client_id', 'N/A')}")
print(f"发现 URL: {cognito_config.get('discovery_url', 'N/A')}")

## 创建 IAM 执行角色

为 AgentCore Runtime 创建必要的 IAM 角色：

In [None]:
tool_name = "mcp_server_sample"
print(f"为 {tool_name} 创建 IAM 角色...")
agentcore_iam_role = create_agentcore_role(agent_name=tool_name)
print(f"\n✅ IAM 角色创建完成")
print(f"角色 ARN: {agentcore_iam_role['Role']['Arn']}")

## 配置 AgentCore Runtime 部署

使用 AgentCore SDK 配置部署，包括入口点、执行角色和认证配置：

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime
import os

# 获取当前区域
boto_session = Session()
region = boto_session.region_name or 'us-east-1'
print(f"使用 AWS 区域: {region}")

# 检查必需文件
required_files = ['agentcore_mcp_server.py', 'requirements.txt']
for file in required_files:
    if not os.path.exists(file):
        raise FileNotFoundError(f"必需文件 {file} 未找到")
print("✓ 所有必需文件已找到")

# 创建 AgentCore Runtime 实例
agentcore_runtime = Runtime()

# 配置认证
auth_config = {
    "customJWTAuthorizer": {
        "allowedClients": [
            cognito_config['client_id']
        ],
        "discoveryUrl": cognito_config['discovery_url'],
    }
}

print("配置 AgentCore Runtime...")
response = agentcore_runtime.configure(
    entrypoint="agentcore_mcp_server.py",
    execution_role=agentcore_iam_role['Role']['Arn'],
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=region,
    authorizer_configuration=auth_config,
    protocol="MCP",
    agent_name=tool_name
)
print("✅ 配置完成")

## 启动 MCP Server 到 AgentCore Runtime

现在将 MCP server 部署到 AgentCore Runtime：

In [None]:
print("启动 MCP server 到 AgentCore Runtime...")
print("这可能需要几分钟时间...")
launch_result = agentcore_runtime.launch()
print("✅ 启动完成")
print(f"Agent ARN: {launch_result.agent_arn}")
print(f"Agent ID: {launch_result.agent_id}")

## 检查 AgentCore Runtime 状态

检查部署状态并等待就绪：

In [None]:
import time

print("检查 AgentCore Runtime 状态...")
status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
print(f"初始状态: {status}")

end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
while status not in end_status:
    print(f"状态: {status} - 等待中...")
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']

if status == 'READY':
    print("✅ AgentCore Runtime 已就绪！")
else:
    print(f"⚠ AgentCore Runtime 状态: {status}")
    
print(f"最终状态: {status}")

## 存储配置信息

将 Agent ARN 和 Cognito 配置存储到 AWS Systems Manager 和 Secrets Manager：

In [None]:
ssm_client = boto3.client('ssm', region_name=region)
secrets_client = boto3.client('secretsmanager', region_name=region)

try:
    # 存储 Cognito 凭证到 Secrets Manager
    try:
        cognito_credentials_response = secrets_client.create_secret(
            Name='mcp_server_sample/cognito/credentials',
            Description='MCP server 的 Cognito 凭证',
            SecretString=json.dumps(cognito_config)
        )
        print("✓ Cognito 凭证已存储到 Secrets Manager")
    except secrets_client.exceptions.ResourceExistsException:
        secrets_client.update_secret(
            SecretId='mcp_server_sample/cognito/credentials',
            SecretString=json.dumps(cognito_config)
        )
        print("✓ Cognito 凭证已更新到 Secrets Manager")

    # 存储 Agent ARN 到 Parameter Store
    agent_arn_response = ssm_client.put_parameter(
        Name='/mcp_server_sample/runtime/agent_arn',
        Value=launch_result.agent_arn,
        Type='String',
        Description='MCP server 的 Agent ARN',
        Overwrite=True
    )
    print("✓ Agent ARN 已存储到 Parameter Store")

    print("\n✅ 配置信息存储成功！")
    print(f"Agent ARN: {launch_result.agent_arn}")
    
except Exception as e:
    print(f"❌ 存储配置时出错: {e}")

## 创建远程测试客户端

创建一个客户端来测试已部署的 MCP server：

In [None]:
# 直接在 notebook 中测试已部署的 MCP server
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def test_deployed_mcp_server():
    """测试已部署的 MCP server"""
    try:
        # 从存储的配置获取信息
        ssm_client = boto3.client('ssm', region_name=region)
        agent_arn_response = ssm_client.get_parameter(Name='/mcp_server_sample/runtime/agent_arn')
        agent_arn = agent_arn_response['Parameter']['Value']
        print(f"Agent ARN: {agent_arn}")

        secrets_client = boto3.client('secretsmanager', region_name=region)
        response = secrets_client.get_secret_value(SecretId='mcp_server_sample/cognito/credentials')
        secret_value = response['SecretString']
        parsed_secret = json.loads(secret_value)
        bearer_token = parsed_secret['bearer_token']
        print("✓ 获取到认证信息")
        
        # 构建 MCP URL
        encoded_arn = agent_arn.replace(':', '%3A').replace('/', '%2F')
        mcp_url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
        headers = {
            "authorization": f"Bearer {bearer_token}",
            "Content-Type": "application/json"
        }
        
        print(f"\n连接到 AgentCore Runtime...")
        
        async with streamablehttp_client(mcp_url, headers, timeout=120, terminate_on_close=False) as (
            read_stream,
            write_stream,
            _,
        ):
            async with ClientSession(read_stream, write_stream) as session:
                print("🔄 初始化 MCP 会话...")
                await session.initialize()
                print("✓ MCP 会话初始化成功")
                
                print("\n📋 获取可用工具列表...")
                tool_result = await session.list_tools()
                
                print("\n可用的 MCP 工具:")
                print("=" * 40)
                for tool in tool_result.tools:
                    print(f"🔧 {tool.name}: {tool.description}")
                
                print(f"\n✅ 成功！找到 {len(tool_result.tools)} 个工具")
                
                # 测试一个简单的工具
                print("\n🧪 测试 count_s3_buckets 工具...")
                try:
                    result = await session.call_tool(
                        name="count_s3_buckets",
                        arguments={}
                    )
                    print(f"S3 存储桶数量: {result.content[0].text}")
                except Exception as e:
                    print(f"测试错误: {e}")
                
                # 测试图像生成工具
                print("\n🎨 测试 generate_image_with_context 工具...")
                try:
                    result = await session.call_tool(
                        name="generate_image_with_context",
                        arguments={"prompt": "一只可爱的小猫"}
                    )
                    response_data = json.loads(result.content[0].text)
                    print(f"生成状态: {response_data.get('status', 'N/A')}")
                    print(f"消息: {response_data.get('message', 'N/A')}")
                except Exception as e:
                    print(f"测试错误: {e}")
                
                print("\n🎉 MCP server 测试完成！")
                
    except Exception as e:
        print(f"❌ 测试失败: {e}")

# 运行测试
await test_deployed_mcp_server()

## 测试已部署的 MCP Server

上面的代码单元格将直接测试已部署到 AgentCore Runtime 的 MCP server。

## 下一步

现在您已经成功将 MCP server 部署到 AgentCore Runtime，您可以：

1. **添加更多工具**: 扩展您的 MCP server 以包含更多工具
2. **自定义认证**: 实现自定义 JWT 授权器
3. **集成**: 与其他 AgentCore 服务集成
4. **监控**: 设置日志和监控
5. **优化**: 根据使用情况优化性能

## 清理资源（可选）

如果您想清理本教程中创建的资源，请运行以下单元格：

In [None]:
import boto3

def cleanup_resources():
    """清理创建的 AWS 资源"""
    print("🗑️ 开始清理过程...")
    
    try:
        # 删除 AgentCore Runtime
        print("删除 AgentCore Runtime...")
        agentcore_control_client = boto3.client('bedrock-agentcore-control', region_name=region)
        runtime_delete_response = agentcore_control_client.delete_agent_runtime(
            agentRuntimeId=launch_result.agent_id,
        )
        print("✓ AgentCore Runtime 删除已启动")

        # 删除 ECR 仓库
        print("删除 ECR 仓库...")
        ecr_client = boto3.client('ecr', region_name=region)
        ecr_repo_name = launch_result.ecr_uri.split('/')[1]
        ecr_client.delete_repository(
            repositoryName=ecr_repo_name,
            force=True
        )
        print("✓ ECR 仓库已删除")

        # 删除 IAM 角色
        print("删除 IAM 角色策略...")
        iam_client = boto3.client('iam')
        role_name = agentcore_iam_role['Role']['RoleName']
        
        # 删除角色策略
        policies = iam_client.list_role_policies(
            RoleName=role_name,
            MaxItems=100
        )

        for policy_name in policies['PolicyNames']:
            iam_client.delete_role_policy(
                RoleName=role_name,
                PolicyName=policy_name
            )
        
        # 删除角色
        iam_client.delete_role(RoleName=role_name)
        print("✓ IAM 角色已删除")

        # 删除 Parameter Store 参数
        try:
            ssm_client.delete_parameter(Name='/mcp_server_sample/runtime/agent_arn')
            print("✓ Parameter Store 参数已删除")
        except ssm_client.exceptions.ParameterNotFound:
            print("ℹ️ Parameter Store 参数未找到")

        # 删除 Secrets Manager 密钥
        try:
            secrets_client.delete_secret(
                SecretId='mcp_server_sample/cognito/credentials',
                ForceDeleteWithoutRecovery=True
            )
            print("✓ Secrets Manager 密钥已删除")
        except secrets_client.exceptions.ResourceNotFoundException:
            print("ℹ️ Secrets Manager 密钥未找到")

        # 删除 Cognito 用户池
        try:
            cognito_client = boto3.client('cognito-idp')
            cognito_client.delete_user_pool(UserPoolId=cognito_config['user_pool_id'])
            print("✓ Cognito 用户池已删除")
        except Exception as e:
            print(f"ℹ️ 删除 Cognito 用户池时出错: {e}")

        print("\n✅ 清理完成！")
        
    except Exception as e:
        print(f"❌ 清理过程中出错: {e}")
        print("您可能需要手动清理一些资源。")

# 取消注释下面的行来执行清理
# cleanup_resources()

# 🎉 恭喜！

您已经成功：

✅ **创建了适配的 MCP server** 包含现有的工具  
✅ **配置了认证** 使用 Amazon Cognito  
✅ **部署到 AWS** 使用 AgentCore Runtime  
✅ **远程调用** 具有适当的认证  
✅ **学习了 MCP 概念** 和最佳实践  

您的 MCP server 现在在 Amazon Bedrock AgentCore Runtime 上运行，可以用于生产环境！

## 总结

在本教程中，您学习了如何：
- 将现有的 Lambda MCP server 工具适配到 FastMCP
- 为 AgentCore 兼容性配置 stateless HTTP 传输
- 使用 Amazon Cognito 设置 JWT 认证
- 在 AWS 上部署和管理 MCP server
- 直接测试已部署的 MCP server

已部署的 MCP server 现在可以集成到更大的 AI 应用程序和工作流中！

## 工具功能说明

本示例包含的工具：

1. **search_website**: 使用 SerpAPI 进行网站搜索
2. **count_s3_buckets**: 计算 AWS S3 存储桶数量
3. **generate_image_with_context**: 图像生成工具（模拟 ComfyUI）
4. **get_comfyui_config**: 获取 ComfyUI 配置信息
5. **generate_video_with_context**: 视频生成工具（模拟 ComfyUI）

这些工具展示了不同类型的功能：API 调用、AWS 服务集成、AI 生成任务等。