## Prerequisites

To execute this tutorial you will need:
* Python 3.10+
* AWS credentials configured
* Amazon Bedrock AgentCore SDK
* MCP (Model Context Protocol) library
* Docker running

In [None]:
!pip install --force-reinstall -U -r requirements.txt --quiet

## Creating MCP Server

Now let's create our MCP server with three simple tools. The server uses FastMCP with `stateless_http=True` which is required for AgentCore Runtime compatibility.

In [None]:
%%writefile mcp_server.py
from mcp.server.fastmcp import FastMCP
from typing import Dict, List, Optional
import uuid
import base64
from datetime import datetime

mcp = FastMCP(host="0.0.0.0", stateless_http=True)

# In-memory storage simulating INF platform
ocloud_db = {
    "oCloudId": "f078a1d3-56df-46c2-88a2-dd659aa3f6bd",
    "globalCloudId": "10a07219-4201-4b3e-a52d-81ab6a755d8a",
    "name": "INF O-Cloud Platform",
    "description": "O-RAN Infrastructure O-Cloud instance",
    "serviceUri": "https://128.224.115.51:30205"
}

resource_pools_db: Dict[str, dict] = {
    "f078a1d3-56df-46c2-88a2-dd659aa3f6bd": {
        "resourcePoolId": "f078a1d3-56df-46c2-88a2-dd659aa3f6bd",
        "name": "RegionOne",
        "description": "INF Platform Resource Pool",
        "oCloudId": "f078a1d3-56df-46c2-88a2-dd659aa3f6bd",
        "location": "Edge Site 1"
    }
}

deployment_managers_db: Dict[str, dict] = {
    "c765516a-a84e-30c9-b954-9c3031bf71c8": {
        "deploymentManagerId": "c765516a-a84e-30c9-b954-9c3031bf71c8",
        "name": "kubernetes-cluster",
        "description": "INF Kubernetes DMS",
        "oCloudId": "f078a1d3-56df-46c2-88a2-dd659aa3f6bd",
        "serviceUri": "https://128.224.115.51:6443",
        "profileSupportList": ["native_k8sapi"],
        "capabilities": {"OS": "low_latency"},
        "capacity": {"cpu": "32", "hugepages-2Mi": "2048", "hugepages-1Gi": "2048"}
    }
}

resource_types_db: Dict[str, dict] = {
    "60cba7be-e2cd-3b8c-a7ff-16e0f10573f9": {
        "resourceTypeId": "60cba7be-e2cd-3b8c-a7ff-16e0f10573f9",
        "name": "pserver",
        "description": "Physical Server resource type",
        "vendor": "Dell",
        "model": "PowerEdge R740"
    },
    "a45983bb-199a-30ec-b7a1-eab2455f333c": {
        "resourceTypeId": "a45983bb-199a-30ec-b7a1-eab2455f333c",
        "name": "cpu",
        "description": "CPU resource type",
        "vendor": "Intel",
        "model": "Xeon E5-2670 v2"
    }
}

resources_db: Dict[str, dict] = {
    "5b3a2da8-17da-466c-b5f7-972590c7baf2": {
        "resourceId": "5b3a2da8-17da-466c-b5f7-972590c7baf2",
        "resourceTypeId": "60cba7be-e2cd-3b8c-a7ff-16e0f10573f9",
        "resourcePoolId": "f078a1d3-56df-46c2-88a2-dd659aa3f6bd",
        "description": "controller-0;hostname:controller-0;personality:controller;administrative:unlocked;operational:enabled"
    }
}

subscriptions_db: Dict[str, dict] = {}
alarm_subscriptions_db: Dict[str, dict] = {}
alarms_db: Dict[str, dict] = {}

@mcp.tool()
def get_inventory_api_versions() -> dict:
    """Get O2 IMS inventory API versions"""
    return {
        "uriPrefix": "https://128.224.115.36:30205/o2ims-infrastructureInventory",
        "apiVersions": [{"version": "1.0.0"}]
    }

@mcp.tool()
def get_monitoring_api_versions() -> dict:
    """Get O2 IMS monitoring API versions"""
    return {
        "uriPrefix": "https://128.224.115.36:30205/o2ims-infrastructureMonitoring",
        "apiVersions": [{"version": "1.0.0"}]
    }

@mcp.tool()
def get_ocloud_info(fields: str = None, exclude_fields: str = None) -> dict:
    """Get O-Cloud information with field filtering support"""
    return ocloud_db

@mcp.tool()
def get_deployment_managers(filter_criteria: str = None, fields: str = None) -> List[dict]:
    """Get deployment manager list from INF platform"""
    return list(deployment_managers_db.values())

@mcp.tool()
def get_deployment_manager(deployment_manager_id: str, profile: str = None) -> dict:
    """Get deployment manager with optional Kubernetes profile"""
    if deployment_manager_id not in deployment_managers_db:
        return {"error": "Deployment manager not found"}
    
    dm = deployment_managers_db[deployment_manager_id].copy()
    
    if profile == "native_k8sapi":
        dm["extensions"] = {
            "profileName": "native_k8sapi",
            "profileData": {
                "cluster_api_endpoint": "https://128.224.115.51:6443",
                "cluster_ca_cert": base64.b64encode(b"mock-ca-cert-data").decode(),
                "admin_user": "kubernetes-admin",
                "admin_client_cert": base64.b64encode(b"mock-client-cert-data").decode(),
                "admin_client_key": base64.b64encode(b"mock-client-key-data").decode(),
                "helmcli_host_with_port": "128.224.115.34:30022",
                "helmcli_username": "helm",
                "helmcli_password": "password",
                "helmcli_kubeconfig": "/share/kubeconfig_c765516a.config"
            }
        }
    
    return dm

@mcp.tool()
def get_resource_pools(filter_criteria: str = None) -> List[dict]:
    """Get resource pools from INF platform"""
    return list(resource_pools_db.values())

@mcp.tool()
def get_resource_pool(resource_pool_id: str) -> dict:
    """Get specific resource pool"""
    if resource_pool_id not in resource_pools_db:
        return {"error": "Resource pool not found"}
    return resource_pools_db[resource_pool_id]

@mcp.tool()
def get_resources(resource_pool_id: str, filter_criteria: str = None) -> List[dict]:
    """Get resources in a resource pool"""
    if resource_pool_id not in resource_pools_db:
        return []
    return [r for r in resources_db.values() if r["resourcePoolId"] == resource_pool_id]

@mcp.tool()
def get_resource(resource_pool_id: str, resource_id: str) -> dict:
    """Get specific resource with hierarchical elements"""
    if resource_id not in resources_db:
        return {"error": "Resource not found"}
    
    resource = resources_db[resource_id].copy()
    if resource["resourcePoolId"] != resource_pool_id:
        return {"error": "Resource not found in specified pool"}
    
    # Add mock hierarchical elements for pserver
    if resource["resourceTypeId"] == "60cba7be-e2cd-3b8c-a7ff-16e0f10573f9":
        resource["elements"] = [{
            "resourceId": "eee8b101-6b7f-4f0a-b54b-89adc0f3f906",
            "resourceTypeId": "a45983bb-199a-30ec-b7a1-eab2455f333c",
            "resourcePoolId": resource_pool_id,
            "parentId": resource_id,
            "description": "cpu:0;core:0;thread:0;cpu_family:6;allocated_function:Platform"
        }]
    
    return resource

@mcp.tool()
def get_resource_types(filter_criteria: str = None) -> List[dict]:
    """Get resource types available in INF platform"""
    return list(resource_types_db.values())

@mcp.tool()
def get_resource_type(resource_type_id: str) -> dict:
    """Get specific resource type with alarm dictionary"""
    if resource_type_id not in resource_types_db:
        return {"error": "Resource type not found"}
    
    resource_type = resource_types_db[resource_type_id].copy()
    
    # Add alarm dictionary for pserver type
    if resource_type_id == "60cba7be-e2cd-3b8c-a7ff-16e0f10573f9":
        resource_type["alarmDictionary"] = {
            "id": "7e1e59c3-c99e-3d1c-9934-21548a3a699a",
            "alarmDictionaryVersion": "0.1",
            "entityType": "pserver",
            "vendor": "Dell",
            "managementInterfaceId": "O2IMS"
        }
    
    return resource_type

@mcp.tool()
def create_subscription(callback: str, consumer_subscription_id: str = None, filter_criteria: str = "") -> dict:
    """Create inventory subscription for SMO notifications"""
    subscription_id = str(uuid.uuid4())
    subscription = {
        "subscriptionId": subscription_id,
        "callback": callback,
        "consumerSubscriptionId": consumer_subscription_id or str(uuid.uuid4()),
        "filter": filter_criteria
    }
    subscriptions_db[subscription_id] = subscription
    return subscription

@mcp.tool()
def get_subscriptions() -> List[dict]:
    """Get all inventory subscriptions"""
    return list(subscriptions_db.values())

@mcp.tool()
def get_subscription(subscription_id: str) -> dict:
    """Get specific subscription"""
    if subscription_id not in subscriptions_db:
        return {"error": "Subscription not found"}
    return subscriptions_db[subscription_id]

@mcp.tool()
def delete_subscription(subscription_id: str) -> dict:
    """Delete inventory subscription"""
    if subscription_id not in subscriptions_db:
        return {"error": "Subscription not found"}
    del subscriptions_db[subscription_id]
    return {"message": "Subscription deleted"}

@mcp.tool()
def create_alarm_subscription(callback: str, consumer_subscription_id: str = None, filter_criteria: str = "") -> dict:
    """Create alarm subscription for SMO alarm notifications"""
    alarm_subscription_id = str(uuid.uuid4())
    subscription = {
        "alarmSubscriptionId": alarm_subscription_id,
        "callback": callback,
        "consumerSubscriptionId": consumer_subscription_id or str(uuid.uuid4()),
        "filter": filter_criteria
    }
    alarm_subscriptions_db[alarm_subscription_id] = subscription
    return subscription

@mcp.tool()
def get_alarm_subscriptions() -> List[dict]:
    """Get all alarm subscriptions"""
    return list(alarm_subscriptions_db.values())

@mcp.tool()
def get_alarm_subscription(alarm_subscription_id: str) -> dict:
    """Get specific alarm subscription"""
    if alarm_subscription_id not in alarm_subscriptions_db:
        return {"error": "Alarm subscription not found"}
    return alarm_subscriptions_db[alarm_subscription_id]

@mcp.tool()
def delete_alarm_subscription(alarm_subscription_id: str) -> dict:
    """Delete alarm subscription"""
    if alarm_subscription_id not in alarm_subscriptions_db:
        return {"error": "Alarm subscription not found"}
    del alarm_subscriptions_db[alarm_subscription_id]
    return {"message": "Alarm subscription deleted"}

@mcp.tool()
def get_alarms(filter_criteria: str = None) -> List[dict]:
    """Get alarm event records from INF platform"""
    return list(alarms_db.values())

@mcp.tool()
def get_alarm(alarm_event_record_id: str) -> dict:
    """Get specific alarm event record"""
    if alarm_event_record_id not in alarms_db:
        return {"error": "Alarm event record not found"}
    return alarms_db[alarm_event_record_id]

@mcp.tool()
def patch_alarm(alarm_event_record_id: str, alarm_acknowledged: bool = None, perceived_severity: str = None) -> dict:
    """Patch alarm event record (acknowledge or clear)"""
    if alarm_event_record_id not in alarms_db:
        return {"error": "Alarm event record not found"}
    
    alarm = alarms_db[alarm_event_record_id]
    if alarm_acknowledged is not None:
        alarm["alarmAcknowledged"] = alarm_acknowledged
        alarm["alarmAcknowledgeTime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    if perceived_severity == "5":  # CLEARED
        alarm["perceivedSeverity"] = "5"
        alarm["alarmChangedTime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    return {"message": "Alarm updated successfully"}

@mcp.tool()
def create_test_alarm(resource_id: str = "5b3a2da8-17da-466c-b5f7-972590c7baf2", severity: str = "1") -> dict:
    """Create test alarm for INF platform resource"""
    alarm_id = str(uuid.uuid4())
    alarm = {
        "alarmEventRecordId": alarm_id,
        "resourceTypeId": "60cba7be-e2cd-3b8c-a7ff-16e0f10573f9",
        "resourceId": resource_id,
        "alarmDefinitionId": "1197f463-b3d4-3aa3-9c14-faa493baa069",
        "probableCauseId": "f52054c9-6f3c-39a0-aab8-e00e01d8c4d3",
        "alarmRaisedTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "alarmAcknowledged": False,
        "perceivedSeverity": severity  # 0=CRITICAL, 1=MAJOR, 2=MINOR, 3=WARNING, 4=INDETERMINATE, 5=CLEARED
    }
    alarms_db[alarm_id] = alarm
    return {"alarmEventRecordId": alarm_id, "message": "Test alarm created for INF platform"}

@mcp.tool()
def simulate_smo_registration(smo_register_url: str, ocloud_global_id: str) -> dict:
    """Simulate O2 service registration with SMO"""
    return {
        "message": f"O2 service registered with SMO at {smo_register_url}",
        "oCloudGlobalId": ocloud_global_id,
        "registrationTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }

if __name__ == "__main__":
    mcp.run(transport="streamable-http")


### What This Code Does

* **FastMCP**: Creates an MCP server that can host your tools
* **@mcp.tool()**: Decorator that turns your Python functions into MCP tools
* **stateless_http=True**: Required for AgentCore Runtime compatibility
* **Tools**: Three simple tools demonstrating different types of operations

## Creating Local Testing Client

Before deploying to AgentCore Runtime, let's create a client to test our MCP server locally:

In [None]:
%%writefile my_mcp_client.py
import asyncio

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def main():
    mcp_url = "http://localhost:8000/mcp"
    headers = {}

    async with streamablehttp_client(mcp_url, headers, timeout=120, terminate_on_close=False) as (
        read_stream,
        write_stream,
        _,
    ):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tool_result = await session.list_tools()
            print("Available tools:")
            for tool in tool_result.tools:
                print(f"  - {tool.name}: {tool.description}")

if __name__ == "__main__":
    asyncio.run(main())

### Testing Locally

To test your MCP server locally:

1. **Terminal 1**: Start the MCP server
   ```bash
   python mcp_server.py
   ```
   
2. **Terminal 2**: Run the test client
   ```bash
   python my_mcp_client.py
   ```

You should see your three tools listed in the output.

## Setting up Amazon Cognito for Authentication

AgentCore Runtime requires authentication. We'll use Amazon Cognito to provide JWT tokens for accessing our deployed MCP server.

In [None]:
import sys
import os

# Get the current notebook's directory
current_dir = os.path.dirname(os.path.abspath('__file__' if '__file__' in globals() else '.'))

utils_dir = os.path.join(current_dir, '..')
utils_dir = os.path.abspath(utils_dir)

# Add to sys.path
sys.path.insert(0, utils_dir)
print("sys.path[0]:", sys.path[0])

from cognito_utils import create_agentcore_role, setup_cognito_user_pool

In [None]:
print("Setting up Amazon Cognito user pool...")
cognito_config = setup_cognito_user_pool()
print("Cognito setup completed ‚úì")
print(f"User Pool ID: {cognito_config.get('user_pool_id', 'N/A')}")
print(f"Client ID: {cognito_config.get('client_id', 'N/A')}")

## Step 5: Create IAM Execution Role

Before starting, let's create an IAM role for our AgentCore Runtime. This role provides the necessary permissions for the runtime to operate.

In [None]:
tool_name = "mcp_server_o2"
print(f"Creating IAM role for {tool_name}...")
agentcore_iam_role = create_agentcore_role(agent_name=tool_name)
print(f"IAM role created ‚úì")
print(f"Role ARN: {agentcore_iam_role['Role']['Arn']}")

## Configuring AgentCore Runtime Deployment

Next we will use our starter toolkit to configure the AgentCore Runtime deployment with an entrypoint, the execution role we just created and a requirements file. We will also configure the starter kit to auto create the Amazon ECR repository on launch.

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime
from boto3.session import Session
import time

boto_session = Session()
region = boto_session.region_name
print(f"Using AWS region: {region}")

required_files = ['mcp_server.py', 'requirements.txt']
for file in required_files:
    if not os.path.exists(file):
        raise FileNotFoundError(f"Required file {file} not found")
print("All required files found ‚úì")

agentcore_runtime = Runtime()


auth_config = {
    "customJWTAuthorizer": {
        "allowedClients": [
            cognito_config['client_id']
        ],
        "discoveryUrl": cognito_config['discovery_url'],
    }
}

print("Configuring AgentCore Runtime...")
response = agentcore_runtime.configure(
    entrypoint="mcp_server.py",
    execution_role=agentcore_iam_role['Role']['Arn'],
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=region,
    authorizer_configuration=auth_config,
    protocol="MCP",
    agent_name=tool_name
)
print("Configuration completed ‚úì")

In [None]:
agentcore_iam_role['Role']['Arn']

In [None]:
auth_config

## Launching MCP Server to AgentCore Runtime

Now that we've got a docker file, let's launch the MCP server to the AgentCore Runtime. This will create the Amazon ECR repository and the AgentCore Runtime.

In [None]:
print("Launching MCP server to AgentCore Runtime...")
print("This may take several minutes...")
launch_result = agentcore_runtime.launch(auto_update_on_conflict=True)
print("Launch completed ‚úì")
print(f"Agent ARN: {launch_result.agent_arn}")
print(f"Agent ID: {launch_result.agent_id}")

## Verify Deployment

To verify your MCP server was successfully deployed:

1. Go to the **AWS Console** search bar
2. Search for **AgentCore**
3. In the left navigation bar, click **AgentCore Runtime**
4. Look for a runtime named **`mcp_server_o2`** in the list
5. The status should show **CREATING** initially, then **READY** when deployment is complete

:::alert{type="info"}
**Note:** Deployment typically takes 3-5 minutes to complete.
:::

Let's also verify the deployment programmatically and wait for it to be ready.

## Checking AgentCore Runtime Status Programmatically

By now, the runtime should be ready. Let's verify the status programmatically:

In [None]:
print("Checking AgentCore Runtime status...")
status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
print(f"Initial status: {status}")

end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
while status not in end_status:
    print(f"Status: {status} - waiting...")
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']

if status == 'READY':
    print("‚úì AgentCore Runtime is READY!")
else:
    print(f"‚ö† AgentCore Runtime status: {status}")
    
print(f"Final status: {status}")

## Storing Configuration for Remote Access

Before we can invoke our deployed MCP server, let's store the Agent ARN and Cognito configuration in AWS Systems Manager Parameter Store and AWS Secrets Manager for easy retrieval:

In [None]:
import boto3
import json

ssm_client = boto3.client('ssm', region_name=region)
secrets_client = boto3.client('secretsmanager', region_name=region)

try:
    cognito_credentials_response = secrets_client.create_secret(
        Name='mcp_server/o2/cognito/credentials',
        Description='Cognito credentials for MCP server',
        SecretString=json.dumps(cognito_config)
    )
    print("‚úì Cognito credentials stored in Secrets Manager")
except secrets_client.exceptions.ResourceExistsException:
    secrets_client.update_secret(
        SecretId='mcp_server/o2/cognito/credentials',
        SecretString=json.dumps(cognito_config)
    )
    print("‚úì Cognito credentials updated in Secrets Manager")

agent_arn_response = ssm_client.put_parameter(
    Name='/mcp_server/o2/runtime/agent_arn',
    Value=launch_result.agent_arn,
    Type='String',
    Description='Agent ARN for MCP server',
    Overwrite=True
)
print("‚úì Agent ARN stored in Parameter Store")

client_id_response = ssm_client.put_parameter(
    Name='/mcp_server/o2/runtime/client_id',
    Value=cognito_config["client_id"],
    Type='String',
    Description='Client ID for auth',
    Overwrite=True
)

print("\nConfiguration stored successfully!")
print(f"Agent ARN: {launch_result.agent_arn}")
cognito_config

## Creating Remote Testing Client

Now let's create a client to test our deployed MCP server. This client will retrieve the necessary credentials from AWS and connect to the deployed server:

In [None]:
%%writefile my_mcp_client_remote.py
import asyncio
import boto3
import json
import sys
from boto3.session import Session

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def main():
    boto_session = Session()
    region = boto_session.region_name
    try:
        ssm_client = boto3.client('ssm', region_name=region)
        agent_arn_response = ssm_client.get_parameter(Name='/mcp_server/o2/runtime/agent_arn')
        agent_arn = agent_arn_response['Parameter']['Value']
        print(f"Retrieved Agent ARN: {agent_arn}")

        secrets_client = boto3.client('secretsmanager', region_name=region)
        response = secrets_client.get_secret_value(SecretId='mcp_server/o2/cognito/credentials')
        secret_value = response['SecretString']
        parsed_secret = json.loads(secret_value)

        cognito_client = boto3.client('cognito-idp', region_name=region)   
        auth_response = cognito_client.initiate_auth(
                    ClientId=ssm_client.get_parameter(Name='/mcp_server/o2/runtime/client_id')["Parameter"]["Value"],
                    AuthFlow='USER_PASSWORD_AUTH',
                    AuthParameters={
                        'USERNAME': 'testuser',
                        'PASSWORD': 'MyPassword123!'
                    }
                )
        bearer_token=auth_response['AuthenticationResult']['AccessToken']
        print("‚úì Retrieved bearer token from Secrets Manager")
        
    except Exception as e:
        print(f"Error retrieving credentials: {e}")
        sys.exit(1)
    
    if not agent_arn or not bearer_token:
        print("Error: AGENT_ARN or BEARER_TOKEN not retrieved properly")
        sys.exit(1)
    
    encoded_arn = agent_arn.replace(':', '%3A').replace('/', '%2F')
    mcp_url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
    headers = {
        "authorization": f"Bearer {bearer_token}",
        "Content-Type": "application/json"
    }
    print("HEADERS ARE:\n")
    print(headers)
    
    print(f"\nConnecting to: {mcp_url}")
    print("Headers configured ‚úì")

    try:
        async with streamablehttp_client(mcp_url, headers, timeout=120, terminate_on_close=False) as (
            read_stream,
            write_stream,
            _,
        ):
            async with ClientSession(read_stream, write_stream) as session:
                print("\nüîÑ Initializing MCP session...")
                await session.initialize()
                print("‚úì MCP session initialized")
                
                print("\nüîÑ Listing available tools...")
                tool_result = await session.list_tools()
                
                print("\nüìã Available MCP Tools:")
                print("=" * 50)
                for tool in tool_result.tools:
                    print(f"üîß {tool.name}")
                    print(f"   Description: {tool.description}")
                    if hasattr(tool, 'inputSchema') and tool.inputSchema:
                        properties = tool.inputSchema.get('properties', {})
                        if properties:
                            print(f"   Parameters: {list(properties.keys())}")
                    print()
                
                print(f"‚úÖ Successfully connected to MCP server!")
                print(f"Found {len(tool_result.tools)} tools available.")
                
    except Exception as e:
        print(f"‚ùå Error connecting to MCP server: {e}")
        sys.exit(1)

if __name__ == "__main__":
    asyncio.run(main())

## Testing Your Deployed MCP Server

Let's test our deployed MCP server using the remote client:

In [None]:
print("Testing deployed MCP server...")
print("=" * 50)
!python my_mcp_client_remote.py

## Invoking MCP Tools Remotely

Now let's create an enhanced client that not only lists tools but also invokes them to demonstrate the full MCP functionality:

In [None]:
%%writefile invoke_mcp_tools.py
import asyncio
import boto3
import json
import sys
from boto3.session import Session

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def main():
    boto_session = Session()
    region = boto_session.region_name
    
    print(f"Using AWS region: {region}")
    
    try:
        ssm_client = boto3.client('ssm', region_name=region)
        agent_arn_response = ssm_client.get_parameter(Name='/mcp_server/o2/runtime/agent_arn')
        agent_arn = agent_arn_response['Parameter']['Value']
        print(f"Retrieved Agent ARN: {agent_arn}")

        secrets_client = boto3.client('secretsmanager', region_name=region)
        response = secrets_client.get_secret_value(SecretId='mcp_server/o2/cognito/credentials')
        secret_value = response['SecretString']
        parsed_secret = json.loads(secret_value)

        cognito_client = boto3.client('cognito-idp', region_name=region)   
        auth_response = cognito_client.initiate_auth(
                    ClientId=ssm_client.get_parameter(Name='/mcp_server/o2/runtime/client_id')["Parameter"]["Value"],
                    AuthFlow='USER_PASSWORD_AUTH',
                    AuthParameters={
                        'USERNAME': 'testuser',
                        'PASSWORD': 'MyPassword123!'
                    }
                )
        bearer_token=auth_response['AuthenticationResult']['AccessToken']
        print("‚úì Retrieved bearer token from Secrets Manager")
        
    except Exception as e:
        print(f"Error retrieving credentials: {e}")
        sys.exit(1)
    
    encoded_arn = agent_arn.replace(':', '%3A').replace('/', '%2F')
    mcp_url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
    headers = {
        "authorization": f"Bearer {bearer_token}",
        "Content-Type": "application/json"
    }
    
    print(f"\nConnecting to: {mcp_url}")

    try:
        async with streamablehttp_client(mcp_url, headers, timeout=120, terminate_on_close=False) as (
            read_stream,
            write_stream,
            _,
        ):
            async with ClientSession(read_stream, write_stream) as session:
                print("\nüîÑ Initializing MCP session...")
                await session.initialize()
                print("‚úì MCP session initialized")
                
                print("\nüîÑ Listing available tools...")
                tool_result = await session.list_tools()
                
                print("\nüìã Available MCP Tools:")
                print("=" * 50)
                for tool in tool_result.tools:
                    print(f"üîß {tool.name}: {tool.description}")
                
                print("\nüß™ Testing MCP Tools:")
                print("=" * 50)
                    
                try:
                    print("\nüëã Testing O2 tool...")
                    search_result = await session.call_tool(
                        name="get_resource_pools",
                        arguments={"question": "Can you tell me about my resource pools?"}
                    )
                    print(f"   Result: {search_result.content[0].text}")
                except Exception as e:
                    print(f"   Error: {e}")

                print("\n‚úÖ MCP tool testing completed!")

                
    except Exception as e:
        print(f"‚ùå Error connecting to MCP server: {e}")
        sys.exit(1)

if __name__ == "__main__":
    asyncio.run(main())

## Test Tool Invocation

Let's test our MCP tools by actually invoking them:

In [None]:
print("Testing MCP tool invocation...")
print("=" * 50)
!python invoke_mcp_tools.py

## Next Steps

Now that you have successfully deployed an MCP server to AgentCore Runtime, you can:

1. **Add More Tools**: Extend your MCP server with additional tools
2. **Custom Authentication**: Implement custom JWT authorizers
3. **Integration**: Integrate with other AgentCore services