# PoC - Octank Financials


Todo: Insert Image Here

Inbound - JWT via Cognito << any OAuth capable of generating a bearer token

Outbound Targets

1) Lambda: Calculator w/ an inline payload JSON spec
2) OpenAPI: NASA OpenAPI Spec w/ API Key credential
3) OpenAPI: NASA OpenAPI Spec w/ API Key credential OpenAPI Spec via 
3) finCalculation: Lambda's directly consumed using ??? <oauth or iAM>

Targets consumed 

- w/ Semantic Search enabled
- w/o Semantic Search enabled


## Prerequisites

To execute this tutorial you will need:
* Python 3.10+
* AWS credentials
* Amazon Bedrock AgentCore SDK
* Strands Agents

In [13]:
!pip install --force-reinstall -U -r requirements.txt --quiet

Import all the required Python libraries, and load environment variables

In [14]:
# Set AWS credentials if not using Amazon SageMaker notebook
import os
# os.environ['AWS_ACCESS_KEY_ID'] = '' # Set the access key
# os.environ['AWS_SECRET_ACCESS_KEY'] = '' # Set the secret key
os.environ['AWS_DEFAULT_REGION'] = os.environ.get('AWS_REGION', 'us-east-1')

In [93]:
from strands import Agent
from strands.models import BedrockModel
from strands.handlers import null_callback_handler

from strands.tools.mcp.mcp_client import MCPClient, MCPAgentTool

from mcp.client.streamable_http import streamablehttp_client
from mcp.types import Tool as MCPTool

import logging
import time
import json
import boto3
import requests
import utils
import utils2


GATEWAY_NAME = "octank-financials-v2"
GATEWAY_DESCRIPTION = "Gateway for Octank Financials"

Set up a logger

In [16]:
# Configure the root strands logger
logging.getLogger("strands").setLevel(logging.ERROR)  # INFO) #DEBUG) #

# Add a handler to see the logs
logging.basicConfig(
    format="%(levelname)s | %(name)s | %(message)s", handlers=[logging.StreamHandler()]
)

Check our boto3 version

In [17]:
boto3.__version__

'1.40.53'

Get our boto3 client for the AgentCore control plane API.

In [18]:
session = boto3.Session()
agentcore_client = session.client(
    "bedrock-agentcore-control",
)

# Setup Gateway

Calculator API's

In [19]:
with open("./calc/calc-api.json") as f:
    data = json.load(f)[0:3]
print(json.dumps(data, indent=4))

[
    {
        "name": "add_numbers",
        "description": "Adds firstNumber and secondNumber together to get the sum (firstNumber + secondNumber)",
        "inputSchema": {
            "type": "object",
            "properties": {
                "firstNumber": {
                    "type": "number",
                    "description": "first number to add"
                },
                "secondNumber": {
                    "type": "number",
                    "description": "second number to add"
                }
            },
            "required": [
                "firstNumber",
                "secondNumber"
            ]
        }
    },
    {
        "name": "subtract_numbers",
        "description": "Subtracts the subtrahend from the minuend to find the difference (minuend - subtrahend)",
        "inputSchema": {
            "type": "object",
            "properties": {
                "minuend": {
                    "type": "number",
                    "descripti

Lambda implementation of calculator

In [20]:
from IPython.display import display, Code

with open("./calc/lambda_function_code.py", "r") as f:
    code_content = f.read()
display(Code(code_content, language="python"))

In [21]:
#### Create a sample AWS Lambda function that you want to convert into MCP tools
calc_lambda_resp = utils.create_gateway_lambda(
    "calc/lambda_function_code.zip", lambda_function_name="calc_lambda_gateway"
)

if calc_lambda_resp is not None:
    if calc_lambda_resp["exit_code"] == 0:
        print(
            "Lambda function created with ARN: ",
            calc_lambda_resp["lambda_function_arn"],
        )
    else:
        print(
            "Lambda function creation failed with message: ",
            calc_lambda_resp["lambda_function_arn"],
        )

Reading code from zip file
Creating IAM role for lambda function
IAM role calc_lambda_gateway_lambda_iamrole already exists. Using the same ARN arn:aws:iam::165361166149:role/calc_lambda_gateway_lambda_iamrole
Creating lambda function
AWS Lambda function calc_lambda_gateway already exists. Using the same ARN arn:aws:lambda:us-east-1:165361166149:function:calc_lambda_gateway
Lambda function created with ARN:  arn:aws:lambda:us-east-1:165361166149:function:calc_lambda_gateway


In [22]:
calc_lambda_resp["lambda_function_arn"]

'arn:aws:lambda:us-east-1:165361166149:function:calc_lambda_gateway'

## Creating Cognito

In [23]:
cognito_response = utils.setup_cognito_user_pool()

bearer_token = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)

Creating Cognito User Pool: MCPServerPool
User Pool created with ID: us-east-1_tk0ZfLujQ
Creating Cognito App Client: MCPServerPoolClient
App Client created with ID: h9majoiq19ld8qs0nd3cqjb1n
Creating Cognito user: testuser
Setting permanent password for user: testuser
Pool ID: us-east-1_tk0ZfLujQ
Discovery URL: https://cognito-idp.us-east-1.amazonaws.com/us-east-1_tk0ZfLujQ/.well-known/openid-configuration
Client ID: h9majoiq19ld8qs0nd3cqjb1n
Authenticating user: testuser
Bearer token obtained successfully


In [24]:
gateway_role_arn = utils.create_gateway_iam_role(
    lambda_arns=[
        calc_lambda_resp["lambda_function_arn"],
        # restaurant_lambda_resp["lambda_function_arn"],
    ]
)

Creating IAM role: GatewaySearchAgentCoreRole
IAM role GatewaySearchAgentCoreRole already exists. Retrieving existing role...
Updated policy for existing role: arn:aws:iam::165361166149:role/GatewaySearchAgentCoreRole


### Let's create a few helper functions for using the control plane APIs

In [36]:
def read_apispec(json_file_path):
    try:
        # read json file and return contents as string
        with open(json_file_path, "r") as file:
            # Parse JSON to Python object
            api_spec = json.load(file)
            return api_spec

    except FileNotFoundError:
        return f"Error: File {json_file_path} not found"
    except Exception as e:
        return f"An unexpected error occurred: {str(e)}"


def list_gateways():
    response = agentcore_client.list_gateways()
    print(json.dumps(response, indent=2, default=str))
    return response

### Create Gateway helper function

Here is a helper function for creating an AgentCore Gateway given a name and
description. It uses Amazon Cognito as its IdP, and pulls the allowed client ID
and the discovery URL from environment variables, as those were already defined.
It also defaults to enabling semantic search on the resulting Gateway, and uses
a predefined IAM role.

In [37]:
def create_gateway(gateway_name, gateway_desc):
    # Use Cognito for Inbound OAuth to our Gateway
    auth_config = {
        "customJWTAuthorizer": {
            "allowedClients": [cognito_response["client_id"]],
            "discoveryUrl": cognito_response["discovery_url"],
        }
    }
    # Enable semantic search of tools
    search_config = {
        "mcp": {"searchType": "SEMANTIC", "supportedVersions": ["2025-03-26"]}
    }
    # Create the gateway
    response = agentcore_client.create_gateway(
        name=gateway_name,
        roleArn=gateway_role_arn,
        authorizerType="CUSTOM_JWT",
        description=gateway_desc,
        protocolType="MCP",
        authorizerConfiguration=auth_config,
        protocolConfiguration=search_config,
    )
    print(json.dumps(response, indent=2, default=str))
    return response["gatewayId"]

### Create Gateway Target helper function

This function creates a new AWS Lambda target on an existing Gateway.
Simply provide the gateway ID, the name and description of the new target,
the ARN of the existing AWS Lambda function, and the JSON schema describing
the interfaces to the tools you want to expose from the gateway.

In [38]:
def create_gatewaytarget(gateway_id, target_name, target_descr, lambda_arn, api_spec):
    # Add a Lambda target to the gateway
    response = agentcore_client.create_gateway_target(
        gatewayIdentifier=gateway_id,
        name=target_name,
        description=target_descr,
        targetConfiguration={
            "mcp": {
                "lambda": {
                    "lambdaArn": lambda_arn,
                    "toolSchema": {"inlinePayload": api_spec},
                }
            }
        },
        # Use IAM as credential provider
        credentialProviderConfigurations=[
            {"credentialProviderType": "GATEWAY_IAM_ROLE"}
        ],
    )
    return response["targetId"]

## Creating your AgentCore Gateway
Before we setup your first Gateway, let's take a quick look at how 
Gateway provides security, both for inbound requests to use MCP tools,
and outbound access from the Gateway to tools and resources.

![How does it work](images/gateway_secure_access.png)

Now let's create the gateway for this tutorial, providing a name and a description.

In [39]:
list_gateways()

{
  "ResponseMetadata": {
    "RequestId": "281311a2-9ac0-434d-8f25-b3f66d11823c",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 16 Oct 2025 16:43:09 GMT",
      "content-type": "application/json",
      "content-length": "2075",
      "connection": "keep-alive",
      "x-amzn-requestid": "281311a2-9ac0-434d-8f25-b3f66d11823c",
      "x-amzn-remapped-x-amzn-requestid": "69adcb80-e8bf-42e8-8a7f-a672e9638688",
      "x-amzn-remapped-content-length": "2075",
      "x-amzn-remapped-connection": "keep-alive",
      "x-amz-apigw-id": "SjIInGGhIAMEjhw=",
      "x-amzn-trace-id": "Root=1-68f1209d-2e35fa9a5ee1f11d6abf2fe4",
      "x-amzn-remapped-date": "Thu, 16 Oct 2025 16:43:09 GMT"
    },
    "RetryAttempts": 0
  },
  "items": [
    {
      "gatewayId": "ac-gateway-mcp-server-tv5rao2k84",
      "name": "ac-gateway-mcp-server",
      "status": "READY",
      "description": "AgentCore Gateway with MCP Server target",
      "createdAt": "2025-10-14 03:47:18.920371+00:00",


{'ResponseMetadata': {'RequestId': '281311a2-9ac0-434d-8f25-b3f66d11823c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 16 Oct 2025 16:43:09 GMT',
   'content-type': 'application/json',
   'content-length': '2075',
   'connection': 'keep-alive',
   'x-amzn-requestid': '281311a2-9ac0-434d-8f25-b3f66d11823c',
   'x-amzn-remapped-x-amzn-requestid': '69adcb80-e8bf-42e8-8a7f-a672e9638688',
   'x-amzn-remapped-content-length': '2075',
   'x-amzn-remapped-connection': 'keep-alive',
   'x-amz-apigw-id': 'SjIInGGhIAMEjhw=',
   'x-amzn-trace-id': 'Root=1-68f1209d-2e35fa9a5ee1f11d6abf2fe4',
   'x-amzn-remapped-date': 'Thu, 16 Oct 2025 16:43:09 GMT'},
  'RetryAttempts': 0},
 'items': [{'gatewayId': 'ac-gateway-mcp-server-tv5rao2k84',
   'name': 'ac-gateway-mcp-server',
   'status': 'READY',
   'description': 'AgentCore Gateway with MCP Server target',
   'createdAt': datetime.datetime(2025, 10, 14, 3, 47, 18, 920371, tzinfo=tzutc()),
   'updatedAt': datetime.datetime(2025, 10, 14, 3, 4

In [40]:
from botocore.exceptions import ClientError


# {
#       "gatewayId": "ac-gateway-mcp-server-tv5rao2k84",
#       "name": "ac-gateway-mcp-server",
#       "status": "READY",
#       "description": "AgentCore Gateway with MCP Server target",
# ...
#       "protocolType": "MCP"
#     }


# Check if gateway already exists
existing_gateways = list_gateways()
gatewayId = None

for gateway in existing_gateways.get('items', []):
    if gateway['name'] == GATEWAY_NAME:
        gatewayId = gateway['gatewayId']
        print(f"Gateway '{GATEWAY_NAME}' already exists with ID: {gatewayId}")
        break

if not gatewayId:
    print(f"Create gateway with name: {GATEWAY_NAME}")
    try:
        gatewayId = create_gateway(
            gateway_name=GATEWAY_NAME, gateway_desc=GATEWAY_DESCRIPTION
        )
        print(f"Gateway created with id: {gatewayId}.")
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConflictException':
            # Gateway exists, get its ID
            existing_gateways = list_gateways()
            for gateway in existing_gateways.get('gateways', []):
                if gateway['name'] == GATEWAY_NAME:
                    gatewayId = gateway['gatewayId']
                    print(f"Gateway '{GATEWAY_NAME}' already exists with ID: {gatewayId}")
                    break
        else:
            raise


{
  "ResponseMetadata": {
    "RequestId": "1bb61779-f149-4b9d-8b7f-e7897a98b97d",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 16 Oct 2025 16:44:42 GMT",
      "content-type": "application/json",
      "content-length": "2075",
      "connection": "keep-alive",
      "x-amzn-requestid": "1bb61779-f149-4b9d-8b7f-e7897a98b97d",
      "x-amzn-remapped-x-amzn-requestid": "a641968c-c7ad-4432-a747-6bd0d3e70b47",
      "x-amzn-remapped-content-length": "2075",
      "x-amzn-remapped-connection": "keep-alive",
      "x-amz-apigw-id": "SjIXKFbkIAMENRw=",
      "x-amzn-trace-id": "Root=1-68f120fa-3362c92c3e09617f19d66df8",
      "x-amzn-remapped-date": "Thu, 16 Oct 2025 16:44:42 GMT"
    },
    "RetryAttempts": 0
  },
  "items": [
    {
      "gatewayId": "ac-gateway-mcp-server-tv5rao2k84",
      "name": "ac-gateway-mcp-server",
      "status": "READY",
      "description": "AgentCore Gateway with MCP Server target",
      "createdAt": "2025-10-14 03:47:18.920371+00:00",


In [None]:
# print(f"Create gateway with name: {GATEWAY_NAME}")
# gatewayId = create_gateway(
#     gateway_name=GATEWAY_NAME, gateway_desc=GATEWAY_DESCRIPTION
# )
# print(f"Gateway created with id: {gatewayId}.")

# gateway w/o cognito

In [94]:
import boto3
import json
import time
from boto3.session import Session
import botocore
from botocore.exceptions import ClientError
import requests
import time

# def create_agentcore_gateway_role(gateway_name):
#     iam_client = boto3.client('iam')
#     agentcore_gateway_role_name = f'agentcore-{gateway_name}-role'
#     boto_session = Session()
    
#     region = boto_session.region_name
#     account_id = boto3.client("sts").get_caller_identity()["Account"]
#     role_policy = {
#         "Version": "2012-10-17",
#         "Statement": [{
#                 "Sid": "VisualEditor0",
#                 "Effect": "Allow",
#                 "Action": [
#                     "bedrock-agentcore:*",
#                     "bedrock:*",
#                     "agent-credential-provider:*",
#                     "iam:PassRole",
#                     "secretsmanager:GetSecretValue",
#                     "lambda:InvokeFunction"
#                 ],
#                 "Resource": "*"
#             }
#         ]
#     }

In [96]:
import time
import boto3
# CreateGateway with Amazon IAM.
gateway_client = boto3.client('bedrock-agentcore-control', region_name = os.environ['AWS_DEFAULT_REGION'])
 
#no need to run below if already run once
agentcore_gateway_iam_role = create_agentcore_gateway_role("sample-lambdagateway")

create_response = None
if agentcore_gateway_iam_role is None:
    print("Failed to create IAM role")
else:
    create_response = gateway_client.create_gateway(
        name='AgentCoreGateway-wo-Cognito',
        roleArn=agentcore_gateway_iam_role['Role']['Arn'],
        protocolType='MCP',
        authorizerType='AWS_IAM',
        description='AgentCore Gateway with AWS Lambda target type using Amazon IAM for ingress auth'
    )
    print(create_response)
# print("Agentcore gateway role ARN: ", agentcore_gateway_iam_role['Role']['Arn']) 

# Retrieve the GatewayID used for GatewayTarget creation
gatewayID = create_response["gatewayId"]
gatewayURL = create_response["gatewayUrl"]
print(gatewayID)
time.sleep(10)

Failed to create IAM role


TypeError: 'NoneType' object is not subscriptable

# Adding AgentCore Gateway Targets


In this tutorial, we assume you already have installed a pair of Lambda functions, one for doing simple
math calculations, and another that simulates creating a restaurant reservation. We'll add a Gateway Target
for each of these functions.

Once we've added those targets, we'll add additional targets to simply drive higher MCP tool counts that
help us demonstrate the power of AgentCore Gateway search.

### Target 1: Add Calculator Lambda Target w/ custom JSON Payload

In [41]:
calc_api_spec = read_apispec("./calc/calc-api.json")
print(f"API spec for calc has {len(calc_api_spec)} functions\n")
calc_lambda_arn = calc_lambda_resp["lambda_function_arn"]
print(f"Calc Lambda ARN: {calc_lambda_arn}")

time.sleep(5)
calcTargetId = create_gatewaytarget(
    gateway_id=gatewayId,
    lambda_arn=calc_lambda_arn,
    target_name="CalcTools",
    target_descr="Calculation Tools",
    api_spec=calc_api_spec,
)
print(f"CalcTools Target created with id: {calcTargetId} on gateway: {gatewayId}")

API spec for calc has 79 functions

Calc Lambda ARN: arn:aws:lambda:us-east-1:165361166149:function:calc_lambda_gateway


ConflictException: An error occurred (ConflictException) when calling the CreateGatewayTarget operation: A target with name 'CalcTools' already exists in this gateway

################### check till here ####################

### Target 2: Add OpenAPI Target w/ yaml spec

#### Upload the Zendesk support OpenAPI yaml file in S3

In [None]:
# # Create an S3 client
# session = boto3.session.Session()
# s3_client = session.client('s3')
# sts_client = session.client('sts')

# # Retrieve AWS account ID and region
# account_id = sts_client.get_caller_identity()["Account"]
# region = session.region_name
# # Define parameters
# # bucket_name = '' # Your s3 bucket to upload the OpenAPI json file.
# bucket_name = f'agentcore-gateway-{account_id}-{region}'
# file_path = 'openapi-specs/Zendesk-support-apis.yaml'
# object_key = 'Zendesk-support-apis.yaml'
# # Upload the file using put_object and read response
# try:
#     with open(file_path, 'rb') as file_data:
#         response = s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=file_data)

#     # Construct the ARN of the uploaded object with account ID and region
#     openapi_s3_uri = f's3://{bucket_name}/{object_key}'
#     print(f'Uploaded object S3 URI: {openapi_s3_uri}')
# except Exception as e:
#     print(f'Error uploading file: {e}')

Error uploading file: An error occurred (NoSuchBucket) when calling the PutObject operation: The specified bucket does not exist


In [None]:
# # Test 1: Basic imports
# print("Testing imports...")
# import boto3
# print("✓ boto3 imported")

# # Test 2: Session creation
# print("Testing session...")
# session = boto3.Session()
# print("✓ Session created")

# # Test 3: Credentials check
# print("Testing credentials...")
# creds = session.get_credentials()
# if creds:
#     print("✓ Credentials found")
# else:
#     print("✗ No credentials")
#     exit()

# # Test 4: STS call with timeout
# print("Testing STS...")
# try:
#     sts_client = session.client('sts')
#     account_id = sts_client.get_caller_identity()["Account"]
#     print(f"✓ Account: {account_id}")
# except Exception as e:
#     print(f"✗ STS failed: {e}")
#     exit()

# print("All tests passed!")


Testing imports...
✓ boto3 imported
Testing session...
✓ Session created
Testing credentials...
✓ Credentials found
Testing STS...
✓ Account: 165361166149
All tests passed!


### Create outbound auth credentials provider

In [50]:
from botocore.config import Config
from botocore.exceptions import ClientError
from pprint import pprint
ZENDESK_DOMAIN="<Zendek domain url>"
# ZENDESK_AUTH_ENDPOINT="https://<Zendeskl-domain>/oauth/authorizations/new"
# ZENDESK_TOKEN_ENDPOINT="https://<Zendesk-domain>/oauth/tokens"
ZENDESK_AUTH_ENDPOINT="https://example.zendesk.com/oauth/authorizations/new"
ZENDESK_TOKEN_ENDPOINT="https://example.zendesk.com/oauth/tokens"
ZENDESK_OAUTH_ISSUER="https://example.zendesk.com/oauth/tokens"
ZENDESK_CLIENT_ID="YOUR-CLIENT_ID" # Your Zendesk OAuth client -  client id 
ZENDESK_SECRET="YOUR-CLIENT_ID"  # Your Zendesk OAuth client -  client id 

sdk_config = Config(
    region_name=os.environ['AWS_DEFAULT_REGION'],
    retries={"max_attempts": 2, "mode": "standard"},
)

acps = boto3.client(
    service_name="bedrock-agentcore-control",
    config=sdk_config,
)

provider_config= {
    "customOauth2ProviderConfig": {
         "oauthDiscovery": {
             "authorizationServerMetadata": {
                 "issuer": ZENDESK_DOMAIN,
                 "authorizationEndpoint": ZENDESK_AUTH_ENDPOINT,
                 "tokenEndpoint": ZENDESK_TOKEN_ENDPOINT,
                 "issuer": ZENDESK_OAUTH_ISSUER,  # Add this
                 "responseTypes": ["token"]
             }
         },
         "clientId": ZENDESK_CLIENT_ID,
         "clientSecret": ZENDESK_SECRET
     }
 }

# response = acps.create_oauth2_credential_provider(
#     name="ZendeskOAuthTokenCfg", 
#     credentialProviderVendor="CustomOauth2", 
#     oauth2ProviderConfigInput=provider_config
# )


try:
    response = acps.create_oauth2_credential_provider(
        name="ZendeskOAuthTokenCfg", 
        credentialProviderVendor="CustomOauth2", 
        oauth2ProviderConfigInput=provider_config
    )
    print("Created new credential provider")
    print(json.dumps(response, indent=2, default=str))
    credentialProviderARN = response['credentialProviderArn']
    
except ClientError as e:
    if 'already exists' in str(e):
        print("Credential provider 'ZendeskOAuthTokenCfg' already exists")
        # Get existing provider ARN
        providers = acps.list_oauth2_credential_providers()
        for provider in providers.get('credentialProviders', []):
            if provider['name'] == 'ZendeskOAuthTokenCfg':
                credentialProviderARN = provider['credentialProviderArn']
                break
    else:
        raise

print(f"Egress Credentials provider ARN: {credentialProviderARN}")


pprint(response)
credentialProviderARN = response['credentialProviderArn']
pprint(f"Egress Credentials provider ARN, {credentialProviderARN}")

Credential provider 'ZendeskOAuthTokenCfg' already exists
Egress Credentials provider ARN: arn:aws:bedrock-agentcore:us-east-1:165361166149:token-vault/default/oauth2credentialprovider/ZendeskOAuthTokenCfg
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
                                      'content-length': '848',
                                      'content-type': 'application/json',
                                      'date': 'Thu, 16 Oct 2025 17:03:50 GMT',
                                      'x-amz-apigw-id': 'SjLKeH16oAMEnVw=',
                                      'x-amzn-remapped-connection': 'keep-alive',
                                      'x-amzn-remapped-content-length': '848',
                                      'x-amzn-remapped-date': 'Thu, 16 Oct '
                                                              '2025 17:03:50 '
                                                              'GMT',
                                      'x-amzn-rema

Create an OpenAPI Target

In [51]:
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import os

try:
    # Check credentials
    session = boto3.Session()
    if not session.get_credentials():
        raise NoCredentialsError()
    
    s3_client = session.client('s3')
    sts_client = session.client('sts')
    
    # Get account info
    account_id = sts_client.get_caller_identity()["Account"]
    region = session.region_name
    bucket_name = f'agentcore-gateway-{account_id}-{region}'
    
    # Check file exists
    file_path = 'openapi-specs/Zendesk-support-apis.yaml'
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    
    # Create bucket if needed
    try:
        s3_client.head_bucket(Bucket=bucket_name)
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=bucket_name)
            else:
                s3_client.create_bucket(
                    Bucket=bucket_name,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
        else:
            raise
    
    # Upload file
    with open(file_path, 'rb') as file_data:
        s3_client.put_object(Bucket=bucket_name, Key='Zendesk-support-apis.yaml', Body=file_data)
    
    openapi_s3_uri = f's3://{bucket_name}/Zendesk-support-apis.yaml'
    print(f'Success: {openapi_s3_uri}')

except NoCredentialsError:
    print("Error: AWS credentials not found")
except FileNotFoundError as e:
    print(f"Error: {e}")
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'AccessDenied':
        print("Error: Insufficient S3 permissions")
    elif error_code == 'BucketAlreadyExists':
        print("Error: Bucket name already taken")
    else:
        print(f"AWS Error: {error_code}")
except Exception as e:
    print(f"Unexpected error: {e}")


Success: s3://agentcore-gateway-165361166149-us-east-1/Zendesk-support-apis.yaml


IMPORTANT: Make sure server URL in the OpenAPI file is pointing to your own endpoint URL. Gateway reads the server URL from the OpenAPI file and calls the endpoint. Before uploading it to s3, please make sure you do this change.

In [54]:
gateway_client = boto3.client('bedrock-agentcore-control', region_name = os.environ['AWS_DEFAULT_REGION'])

# S3 Uri for OpenAPI spec file
openapi_s3_target_config = {
    "mcp": {
          "openApiSchema": {
              "s3": {
                  "uri": openapi_s3_uri
              }
          }
      }
}

credential_config = [
    {
        "credentialProviderType" : "OAUTH",
        "credentialProvider": {
            "oauthCredentialProvider": {
                "providerArn": credentialProviderARN, 
                "scopes": ["tickets:read", "read", "tickets:write", "write"] 
            }
        }
    }
  ]

target_name="DemoOpenAPIGW"
response = gateway_client.create_gateway_target(
    gatewayIdentifier=gatewayId,
    name=target_name,
    description='OpenAPI Target with S3Uri using SDK',
    targetConfiguration=openapi_s3_target_config,
    credentialProviderConfigurations=credential_config)

# Printing the request ID and timestamp for you to report the defects. Please include them while reporting issues/defects  
response_metadata = response['ResponseMetadata']

### Target 3: Add OpenAPI Target w/ API-Key

#### Upload the NASA Open API json file in S3

In [58]:
# Create an S3 client
session = boto3.session.Session()
s3_client = session.client('s3')
sts_client = session.client('sts')

# Retrieve AWS account ID and region
account_id = sts_client.get_caller_identity()["Account"]
region = session.region_name
# Define parameters
# Your s3 bucket to upload the OpenAPI json file.
bucket_name = f'agentcore-gateway-{account_id}-{region}'
file_path = 'openapi-specs/nasa_mars_insights_openapi.json'
object_key = 'nasa_mars_insights_openapi.json'
# Upload the file using put_object and read response
try:
    if region == "us-east-1":
        s3bucket = s3_client.create_bucket(
            Bucket=bucket_name
        )
    else:
        s3bucket = s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': region
            }
        )
    with open(file_path, 'rb') as file_data:
        response = s3_client.put_object(
            Bucket=bucket_name,
            Key=object_key,
            Body=file_data
        )

    # Construct the ARN of the uploaded object with account ID and region
    openapi_s3_uri = f's3://{bucket_name}/{object_key}'
    print(f'Uploaded object S3 URI: {openapi_s3_uri}')
except Exception as e:
    print(f'Error uploading file: {e}')

Uploaded object S3 URI: s3://agentcore-gateway-165361166149-us-east-1/nasa_mars_insights_openapi.json


#### Configure outbound auth and add gateway target

In [59]:
# S3 Uri for OpenAPI spec file
nasa_openapi_s3_target_config = {
    "mcp": {
          "openApiSchema": {
              "s3": {
                  "uri": openapi_s3_uri
              }
          }
      }
}

# API Key credentials provider configuration
api_key_credential_config = [
    {
        "credentialProviderType" : "API_KEY", 
        "credentialProvider": {
            "apiKeyCredentialProvider": {
                    "credentialParameterName": "api_key", # Replace this with the name of the api key name expected by the respective API provider. For passing token in the header, use "Authorization"
                    "providerArn": credentialProviderARN,
                    "credentialLocation":"QUERY_PARAMETER", # Location of api key. Possible values are "HEADER" and "QUERY_PARAMETER".
                    #"credentialPrefix": " " # Prefix for the token. Valid values are "Basic". Applies only for tokens.
            }
        }
    }
  ]

targetname='DemoOpenAPITargetS3NasaMars'
response = gateway_client.create_gateway_target(
    gatewayIdentifier=gatewayId,
    name=targetname,
    description='OpenAPI Target with API-Key',
    targetConfiguration=nasa_openapi_s3_target_config,
    credentialProviderConfigurations=api_key_credential_config)

### Target 4: Add OpenAPI Target w/ yaml spec - Octank Financials

In [61]:
from botocore.config import Config
from botocore.exceptions import ClientError
from pprint import pprint
OCTANK_DOMAIN="<Octank domain url>"

OCTANK_AUTH_ENDPOINT="https://example.octank.com/oauth/authorizations/new"
OCTANK_TOKEN_ENDPOINT="https://example.octank.com/oauth/tokens"
OCTANK_OAUTH_ISSUER="https://example.octank.com/oauth/tokens"
OCTANK_CLIENT_ID="YOUR-CLIENT_ID" # Your octank OAuth client -  client id 
OCTANK_SECRET="YOUR-CLIENT_ID"  # Your octank OAuth client -  client id 

sdk_config = Config(
    region_name=os.environ['AWS_DEFAULT_REGION'],
    retries={"max_attempts": 2, "mode": "standard"},
)

acps = boto3.client(
    service_name="bedrock-agentcore-control",
    config=sdk_config,
)

provider_config= {
    "customOauth2ProviderConfig": {
         "oauthDiscovery": {
             "authorizationServerMetadata": {
                 "issuer": OCTANK_DOMAIN,
                 "authorizationEndpoint": OCTANK_AUTH_ENDPOINT,
                 "tokenEndpoint": OCTANK_TOKEN_ENDPOINT,
                 "issuer": OCTANK_OAUTH_ISSUER, 
                 "responseTypes": ["token"]
             }
         },
         "clientId": OCTANK_CLIENT_ID,
         "clientSecret": OCTANK_SECRET
     }
 }

try:
    response = acps.create_oauth2_credential_provider(
        name="OctankOAuthTokenCfg", 
        credentialProviderVendor="CustomOauth2", 
        oauth2ProviderConfigInput=provider_config
    )
    print("Created new Octank credential provider")
    print(json.dumps(response, indent=2, default=str))
    credentialProviderARN = response['credentialProviderArn']
    
except ClientError as e:
    if 'already exists' in str(e):
        print("Credential provider 'OctankOAuthTokenCfg' already exists")
        # Get existing provider ARN
        providers = acps.list_oauth2_credential_providers()
        for provider in providers.get('credentialProviders', []):
            if provider['name'] == 'OctankOAuthTokenCfg':
                credentialProviderARN = provider['credentialProviderArn']
                break
    else:
        raise

print(f"Egress Credentials provider ARN: {credentialProviderARN}")


pprint(response)
credentialProviderARN = response['credentialProviderArn']
pprint(f"Egress Credentials provider ARN, {credentialProviderARN}")

Created new Octank credential provider
{
  "ResponseMetadata": {
    "RequestId": "2e03f582-95ba-4b8c-a8a7-aa9299f50c59",
    "HTTPStatusCode": 201,
    "HTTPHeaders": {
      "date": "Thu, 16 Oct 2025 18:31:36 GMT",
      "content-type": "application/json",
      "content-length": "842",
      "connection": "keep-alive",
      "x-amzn-requestid": "2e03f582-95ba-4b8c-a8a7-aa9299f50c59",
      "x-amzn-remapped-x-amzn-requestid": "cb65c2fb-f7cc-4de7-a362-9df8035d7be8",
      "x-amzn-remapped-content-length": "842",
      "x-amzn-remapped-connection": "keep-alive",
      "x-amz-apigw-id": "SjYBXEU7IAMEBgA=",
      "x-amzn-trace-id": "Root=1-68f13a08-3b36c97e7cc3e2b32c456693",
      "x-amzn-remapped-date": "Thu, 16 Oct 2025 18:31:36 GMT"
    },
    "RetryAttempts": 0
  },
  "clientSecretArn": {
    "secretArn": "arn:aws:secretsmanager:us-east-1:165361166149:secret:bedrock-agentcore-identity!default/oauth2/OctankOAuthTokenCfg-8ygBIj"
  },
  "name": "OctankOAuthTokenCfg",
  "credentialProvid

Create an OpenAPI Target

In [62]:
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import os

try:
    # Check credentials
    session = boto3.Session()
    if not session.get_credentials():
        raise NoCredentialsError()
    
    s3_client = session.client('s3')
    sts_client = session.client('sts')
    
    # Get account info
    account_id = sts_client.get_caller_identity()["Account"]
    region = session.region_name
    bucket_name = f'agentcore-gateway-{account_id}-{region}'
    
    # Check file exists
    file_path = 'openapi-specs/octank-financials-apis.yaml'
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    
    # Create bucket if needed
    try:
        s3_client.head_bucket(Bucket=bucket_name)
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=bucket_name)
            else:
                s3_client.create_bucket(
                    Bucket=bucket_name,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
        else:
            raise
    
    # Upload file
    with open(file_path, 'rb') as file_data:
        s3_client.put_object(Bucket=bucket_name, Key='octank-financials-apis.yaml', Body=file_data)
    
    openapi_s3_uri = f's3://{bucket_name}/octank-financials-apis.yaml'
    print(f'Success: {openapi_s3_uri}')

except NoCredentialsError:
    print("Error: AWS credentials not found")
except FileNotFoundError as e:
    print(f"Error: {e}")
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'AccessDenied':
        print("Error: Insufficient S3 permissions")
    elif error_code == 'BucketAlreadyExists':
        print("Error: Bucket name already taken")
    else:
        print(f"AWS Error: {error_code}")
except Exception as e:
    print(f"Unexpected error: {e}")


Success: s3://agentcore-gateway-165361166149-us-east-1/octank-financials-apis.yaml


IMPORTANT: Make sure server URL in the OpenAPI file is pointing to your own endpoint URL. Gateway reads the server URL from the OpenAPI file and calls the endpoint. Before uploading it to s3, please make sure you do this change.

In [None]:
gateway_client = boto3.client('bedrock-agentcore-control', region_name = os.environ['AWS_DEFAULT_REGION'])

# S3 Uri for OpenAPI spec file
openapi_s3_target_config = {
    "mcp": {
          "openApiSchema": {
              "s3": {
                  "uri": openapi_s3_uri
              }
          }
      }
}

# credential_config = [
#     {
#         "credentialProviderType" : "OAUTH",
#         "credentialProvider": {
#             "oauthCredentialProvider": {
#                 "providerArn": credentialProviderARN, 
#                 "scopes": ["tickets:read", "read", "tickets:write", "write"] 
#             }
#         }
#     }
#   ]

credential_config = [
    {
        "credentialProviderType" : "OAUTH",
        "credentialProvider": {
            "oauthCredentialProvider": {
                "providerArn": credentialProviderARN,
                "scopes": ["read", "write"] 
            }
        }
    }
  ]


# target_name="OctankFinancialsOpenAPIGW"
# response = gateway_client.create_gateway_target(
#     gatewayIdentifier=gatewayId,
#     name=target_name,
#     description='OpenAPI Target for Octank Financials',
#     targetConfiguration=openapi_s3_target_config,
#     credentialProviderConfigurations=credential_config)

target_name = "OctankFinancialsOpenAPIGW"

# Check if target already exists
try:
    targets_response = gateway_client.list_gateway_targets(gatewayIdentifier=gatewayId)
    targets_list = targets_response.get('items', [])
    existing_target = next((t for t in targets_list if t['name'] == target_name), None)
    
    if existing_target:
        target_id = existing_target['targetId']
        status = existing_target.get('status', 'Unknown')
        print(f"Target already exists - ID: {target_id}, Name: {existing_target['name']}, Status: {status}")
    else:
        # Create new target
        response = gateway_client.create_gateway_target(
            gatewayIdentifier=gatewayId,
            name=target_name,
            description='OpenAPI Target for Octank Financials',
            targetConfiguration=openapi_s3_target_config,
            credentialProviderConfigurations=credential_config)
        
        time.sleep(30)
        
               
        # Wait for target creation
        print("Waiting 30 seconds for target creation...")
        # Check status after creation
        status = status_response.get('status', 'Unknown')
        target_id = response['targetId']
        status_response = gateway_client.get_gateway_target(
            gatewayIdentifier=gatewayId,
            targetId=target_id)
        print(f"Created new target - ID: {target_id}, Name: {existing_target['name']}, Status: {status}")
        
except Exception as e:
    print(f"Error: {e}")


Waiting 30 seconds for target creation...
Error: name 'status_response' is not defined


In [None]:
# From your create_gateway_target response
target_id = response['targetId']  # This should be a 10-character ID like "abc123def4"

# Then use it to get status
response = agentcore_client.get_gateway_target(
    gatewayIdentifier=gatewayId,
    targetId=target_id
)

print(f"Target Status: {response['status']}")
print(f"Target State: {response.get('targetState', 'N/A')}")


ValidationException: An error occurred (ValidationException) when calling the GetGatewayTarget operation: 1 validation error detected: Value 'OctankFinancialsOpenAPIGW' at 'targetId' failed to satisfy constraint: Member must satisfy regular expression pattern: [0-9a-zA-Z]{10}

ParamValidationError: Parameter validation failed:
Missing required parameter in input: "targetId"
Unknown parameter in input: "targetIdentifier", must be one of: gatewayIdentifier, targetId

In [65]:

response_metadata = response['ResponseMetadata']

print(response_metadata)

{'RequestId': '5c14909d-5a3a-49d2-8222-b6f6ad0ad021', 'HTTPStatusCode': 202, 'HTTPHeaders': {'date': 'Thu, 16 Oct 2025 18:32:12 GMT', 'content-type': 'application/json', 'content-length': '776', 'connection': 'keep-alive', 'x-amzn-requestid': '5c14909d-5a3a-49d2-8222-b6f6ad0ad021', 'x-amzn-remapped-x-amzn-requestid': 'e89d151a-b00d-4c2c-bd9b-af970a0949b3', 'x-amzn-remapped-content-length': '776', 'x-amzn-remapped-connection': 'keep-alive', 'x-amz-apigw-id': 'SjYG8G9QoAMEDUg=', 'x-amzn-trace-id': 'Root=1-68f13a2c-5968c66e495af31c0d74dbdf', 'x-amzn-remapped-date': 'Thu, 16 Oct 2025 18:32:12 GMT'}, 'RetryAttempts': 0}


## SKIP if not doing semantic search To demonstrate the power of gateway search, now we add a few more copies of the Calculator target, 
so that we end up with 300+ MCP tools exposed.

In [None]:
# def add_more_tools(gatewayId):
#     time.sleep(10)
#     calcTargetId = create_gatewaytarget(
#         gateway_id=gatewayId,
#         lambda_arn=calc_lambda_arn,
#         target_name="Calc2",
#         target_descr="Calculation 2 Tools",
#         api_spec=calc_api_spec,
#     )
#     print(f"Calc2 Target created with id: {calcTargetId} on gateway: {gatewayId}")
#     time.sleep(10)
#     calcTargetId = create_gatewaytarget(
#         gateway_id=gatewayId,
#         lambda_arn=calc_lambda_arn,
#         target_name="Calc3",
#         target_descr="Calculation 3 Tools",
#         api_spec=calc_api_spec,
#     )
#     print(f"Calc3 Target created with id: {calcTargetId} on gateway: {gatewayId}")
#     time.sleep(10)
#     calcTargetId = create_gatewaytarget(
#         gateway_id=gatewayId,
#         lambda_arn=calc_lambda_arn,
#         target_name="Calc4",
#         target_descr="Calculation 4 Tools",
#         api_spec=calc_api_spec,
#     )
#     print(f"Calc4 Target created with id: {calcTargetId} on gateway: {gatewayId}")

In [None]:
# add_more_tools(gatewayId=gatewayId)

In [60]:
resp = agentcore_client.list_gateway_targets(gatewayIdentifier=gatewayId)
targets = resp["items"]
for target in resp["items"]:
    print(f"{target['name']} - {target['description']}")

DemoOpenAPIGW - OpenAPI Target with S3Uri using SDK
CalcTools - Calculation Tools
DemoOpenAPITargetS3NasaMars - OpenAPI Target with API-Key


# Searching for tools from a Gateway

### Getting familiar with MCP list tools before we search

Let's define some utility functions to retrieve our MCP endpoint URL for a given Gateway ID, and to 
retrieve our JWT OAuth access token to securely use our Gateway.

In [None]:
def get_gateway_endpoint(gateway_id):
    response = agentcore_client.get_gateway(gatewayIdentifier=gateway_id)
    gateway_url = response["gatewayUrl"]
    return gateway_url

Now that our Gateway is created and has targets, let's grab the MCP URL to that
Gateway. We can retrieve the endpoint URL from the Gateway control plane based on the Gateway ID.

#### Using MCP Inspector against your Gateway

Now that we have an endpoint URL for the MCP server, and we have a JWT bearer token, you may want to explore
the MCP server with the MCP Inspector tool. MCP Inspector is an open source tool that can connect to any MCP
server, lets you list the tools provided, and even provides an easy to use tool invocation experience. 

From your terminal window, simply enter `npx @modelcontextprotocol/inspector` to launch the MCP Inspector. Then paste
in your Gateway endpoint URL and your JWT token to connect. Once connected, try out List Tools and Invoke Tool.

Here's a sample screenshot.

![MCP Inspector](images/mcp_inspector.png)

In [None]:
gatewayEndpoint = get_gateway_endpoint(gateway_id=gatewayId)
print(f"Gateway Endpoint - MCP URL: {gatewayEndpoint}")

MCP server security is based on OAuth. To interact with our Gateway, we'll need to
retrieve a JWT OAuth access token from our IdP.

In [None]:
jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
print(f"Bearer token: {jwtToken}")

In [None]:
!npx @modelcontextprotocol/inspector

#### Creating helper functions that use jsonrpc to invoke MCP tools or list them
Let's define a helper function called `invoke_gateway_tool` that uses jsonrpc to invoke any of the
MCP tools exposed by an MCP Server, including of course, your Gateway. Given an endpoint URL and a JWT token,
you can use this utility to invoke any of the MCP tools that AgentCore Gateway made available
for you when you added Gateway Targets to your Gateway.

In [None]:
def invoke_gateway_tool(gateway_endpoint, jwt_token, tool_params):
    # print(f"Invoking tool {tool_params['name']}")

    requestBody = {
        "jsonrpc": "2.0",
        "id": 2,
        "method": "tools/call",
        "params": tool_params,
    }
    response = requests.post(
        gateway_endpoint,
        json=requestBody,
        headers={
            "Authorization": f"Bearer {jwt_token}",
            "Content-Type": "application/json",
        },
    )

    return response.json()

Here's another utility function for using MCP's `tools/list` method for listing the MCP tools
available from your Gateway. Given a Gateway ID and a JWT Token, it retrieves the full set
of tools from that Gateway, and returns a list in agent-ready form. The returned list contains 
Strands Agents MCPAgentTool objects that are suitable for handing your Agent. 

Note that `tools/list` call is paginated, so the function needs to loop, getting a page of
tools at a time, until the `nextCursor` field is no longer populated. The utility function directly
calls the endpoint using HTTPS and the jsonrpc protocol. This is a lower level way to list tools
compared to the `MCPClient` class provided by Strands Agents. We'll see that experience later.

In [None]:
def get_all_agent_tools_from_mcp_endpoint(gateway_endpoint, jwt_token, client):
    more_tools = True
    tools_count = 0
    tools_list = []

    requestBody = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
    next_cursor = ""

    while more_tools:
        if tools_count == 0:
            requestBody["params"] = {}
        else:
            print(f"\nGetting next page of tools since a next cursor was returned\n")
            requestBody["params"] = {"cursor": next_cursor}

        headers = {
            "Authorization": f"Bearer {jwt_token}",
            "Content-Type": "application/json",
        }

        print(f"\n\nListing tools for gateway {gateway_endpoint}")

        response = requests.post(gateway_endpoint, json=requestBody, headers=headers)

        tools_json = response.json()
        tools_count += len(tools_json["result"]["tools"])

        for tool in tools_json["result"]["tools"]:
            mcp_tool = MCPTool(
                name=tool["name"],
                description=tool["description"],
                inputSchema=tool["inputSchema"],
            )
            mcp_agent_tool = MCPAgentTool(mcp_tool, client)
            short_descr = tool["description"][0:40] + "..."
            print(f"adding tool '{mcp_agent_tool.tool_name}' - {short_descr}")
            tools_list.append(mcp_agent_tool)

        if "nextCursor" in tools_json["result"]:
            next_cursor = tools_json["result"]["nextCursor"]
            more_tools = True
        else:
            more_tools = False

    print(f"\nTotal tools found: {tools_count}\n")
    return tools_list

Lets use this helper function and see the results.

In [None]:
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    all_tools = get_all_agent_tools_from_mcp_endpoint(
        gateway_endpoint=gatewayEndpoint, jwt_token=jwtToken, client=client
    )
    print(f"\nFound {len(all_tools)} tools using jsonrpc to list MCP tools\n")

#### Using Strands Agents list_tools_sync() with pagination
If you have written any Python based MCP client, you are likely familiar with the `list_tools_sync()` method 
which returns the set of tools available from the MCP Server which the client is associated
with. But did you know MCP list tools is also paginated? By default, you will only get the first small
subset of tools returned. For simple MCP servers, you may not have noticed this, but for many real world 
MCP servers, your code needs to loop, grabbing pages of tools at a time
until there are no more pages remaining. The following utility `get_all_mcp_tools_from_mcp_client` does exactly that. 
It returns the full list of tools from a given Strands Agent MCP Client.

In [None]:
def get_all_mcp_tools_from_mcp_client(client):
    more_tools = True
    tools = []
    pagination_token = None
    while more_tools:
        tmp_tools = client.list_tools_sync(pagination_token=pagination_token)
        tools.extend(tmp_tools)
        if tmp_tools.pagination_token is None:
            more_tools = False
        else:
            more_tools = True
            pagination_token = tmp_tools.pagination_token
    return tools

Let's give it a try with our Gateway and find out how many tools the Python client finds. 
First we create an MCPClient object based on our endpoint URL and our JWT bearer token. Then 
we retrieve the full set of tools across many pages of tools returned by the MCP server.
Given the targets we added earlier, this should return 300+ tools.

In [None]:
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    all_tools = get_all_mcp_tools_from_mcp_client(client)
    print(f"\nFound {len(all_tools)} tools from list_tools_sync() on mcp client\n")

We have now seen 3 different ways to get the full set of tools from your Gateway using it
as an MCP Server: 

1. directly using jsonrpc
2. using the `list_tools_sync()` method on the Strands Agent MCPClient
3. using the MCP Inspector tool (which uses jsonrpc behind the scenes). 

For typical developers building agents, you'll be using option 2.

### Using the built-in Gateway semantic search tool
Now lets try our first semantic search on the Gateway using its built-in search tool provided as
an additional MCP tool that gets added to your MCP tool list.

First let's define a simple utility function to execute the search tool using MCP.
Just like for listing tools, we need the gateway endpoint and JWT token. Other than that,
all we need to pass in is the search query. The Gateway search tool will do the rest,
matching that query against the serverless vector store that it automatically manages on your behalf.

In [None]:
def tool_search(gateway_endpoint, jwt_token, query):
    toolParams = {
        "name": "x_amz_bedrock_agentcore_search",
        "arguments": {"query": query},
    }
    toolResp = invoke_gateway_tool(
        gateway_endpoint=gateway_endpoint, jwt_token=jwt_token, tool_params=toolParams
    )
    tools = toolResp["result"]["structuredContent"]["tools"]
    return tools

In [None]:
start_time = time.time()
tools_found = tool_search(
    gateway_endpoint=gatewayEndpoint,
    jwt_token=jwtToken,
    query="find me 3 credit research tools",
)
end_time = time.time()
print(
    f"tool search via direct Gateway invocation took {(end_time - start_time):.2f} seconds"
)
print(f"Top tool: {tools_found[0]['name']}")

Notice how fast the search returns, in under a second in most cases. The results are returned
in descending order of search relevance based on matching the query to the tool metadata.
The most relevant tools are first on the list. The intial implementation of search gives back
up to 10 results. You could then use all of these tools in your agent, or simply pick a subset of
the most relevant matches.

# Using Strands Agents with an MCP server that has many tools

First, we select a model to use with our Strands Agent. 
For this notebook, we are using Amazon Bedrock models, but Strands and AgentCore
can work with any LLM.

In [None]:
bedrockmodel = BedrockModel(
    model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    temperature=0.7,
    streaming=True,
    boto_session=session,
)

#### Simple Strands Agent using AgentCore Gateway for agent tools
Now lets show how easy it is to use a Strands Agent to leverage an MCP Server
provided by AgentCore Gateway. In our
simple example, we ask the agent to add some numbers.

In [None]:
jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    all_tools = get_all_mcp_tools_from_mcp_client(client)
    print(f"\nFound {len(all_tools)} tools from list_tools_sync() on mcp client\n")

    simple_agent = Agent(
        model=bedrockmodel, tools=all_tools, callback_handler=null_callback_handler
    )
    result = simple_agent("add 100 plus 50 pass ")
    print(f"{result.message['content'][0]['text']}")

The Strands Agents framework also lets you bypass the agent event loop, invoking an MCP tool directly.
Since Gateway tools are exposed as native MCP tools, this can be done against Gateway tools as well. Here
we call a Gateway MCP tool using the `agent.tool.<tool_name>(args)` syntax:

```python
direct_result = simple_agent.tool.Calc2___add_numbers(firstNumber=10, secondNumber=20)
resp_json = json.loads(direct_result['content'][0]['text'])
```

In [None]:
jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    all_tools = get_all_mcp_tools_from_mcp_client(client)
    print(f"\nFound {len(all_tools)} tools from list_tools_sync() on mcp client\n")

    simple_agent = Agent(
        model=bedrockmodel, tools=all_tools, callback_handler=null_callback_handler
    )
    direct_result = simple_agent.tool.Calc2___add_numbers(
        firstNumber=10, secondNumber=20
    )
    print(f"direct result = {direct_result}")

In [None]:
def get_search_tool(client):
    mcp_tool = MCPTool(
        name="x_amz_bedrock_agentcore_search",
        description="A special tool that returns a trimmed down list of tools given a context. Use this tool only when there are many tools available and you want to get a subset that matches the provided context.",
        inputSchema={
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "search query to use for finding tools",
                }
            },
            "required": ["query"],
        },
    )
    return MCPAgentTool(mcp_tool, client)

In [None]:
def search_using_strands(client, query):
    simple_agent = Agent(
        model=bedrockmodel,
        tools=[get_search_tool(client)],
        callback_handler=null_callback_handler,
    )

    direct_result = simple_agent.tool.x_amz_bedrock_agentcore_search(query=query)

    resp_json = json.loads(direct_result["content"][0]["text"])
    search_results = resp_json["tools"]
    print(json.dumps(search_results, indent=4))
    return search_results

In [None]:
def find_strands_tools(client, query, top_n):
    strands_mcp_tools = []
    results = search_using_strands(client, query)
    for tool in results[:top_n]:
        mcp_tool = MCPTool(
            name=tool["name"],
            description=tool["description"],
            inputSchema=tool["inputSchema"],
        )
        strands_mcp_tools.append(MCPAgentTool(mcp_tool, client))
    return strands_mcp_tools

In [None]:
jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    simple_agent = Agent(
        model=bedrockmodel,
        tools=[get_search_tool(client)],
        callback_handler=null_callback_handler,
    )

    direct_result = simple_agent.tool.x_amz_bedrock_agentcore_search(
        query="find equity trading tools"
    )

    resp_json = json.loads(direct_result["content"][0]["text"])
    search_results = resp_json["tools"]
    print(json.dumps(search_results, indent=4))

In [None]:
jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    results = search_using_strands(client, "find trading tools")
    print(json.dumps(search_results[0], indent=4))

    results = search_using_strands(client, "find credit research tools")
    print(json.dumps(search_results[0], indent=4))

# Adding tool search results to a Strands Agent

Now let's look at how the tools returned from a search can be added to a
Strands Agent. To make the coding simpler, let's provide a utility function that
maps tool search results to Strands MCPAgentTool objects. Simply pass in the 
search results, and indicate how many of those results you want to pass to your
agent.

In [None]:
import json

def tools_to_strands_mcp_tools(tools, top_n):
    strands_mcp_tools = []
    for tool in tools[:top_n]:
        print(f"Converting tool: {tool['name']}")  # Debug info
        mcp_tool = MCPTool(
            name=tool["name"],
            description=tool["description"],
            inputSchema=tool["inputSchema"],
        )
        strands_mcp_tools.append(MCPAgentTool(mcp_tool, client))
        
    return strands_mcp_tools

In [None]:
jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    agent = Agent(
        model=bedrockmodel,
        tools=find_strands_tools(
            client,
            "tools for doing addition, subtraction, multiplication, division",
            10,
        ),
    )
    result = agent("(10*2)/(5-3)")
    print(f"{result.message['content'][0]['text']}")

In [None]:
%%time

jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)
with client:
    print("Searching for an ADDING tool from endpoint with full set of tools...")
    tools_found = tool_search(
        gateway_endpoint=gatewayEndpoint,
        jwt_token=jwtToken,
        query="tools for multiplying two numbers",
    )
    print(f"Top tool found: {tools_found[0]['name']}\n")

    agent = Agent(model=bedrockmodel, tools=tools_to_strands_mcp_tools(tools_found, 1))
    result = agent("10 * 70")
    print(f"{result.message['content'][0]['text']}")

Notice the latency improvement. This example using a subset of tools from Gateway search is significantly faster than
agent invocation when depending on hundreds of tools.

# Showing 3x latency improvement by using tool search

Now that we know how to use Gateway MCP tools from a Strands agent, and we know how to search for tools and
add them to an agent, lets show the power of search. We'll highlight the significant latency reduction
and input token usage that can be delivered.

To demonstrate the latency and token reductions, we compare 2 approaches side by side:

1. **Without search**. We add the full set of MCP tools that the MCP server exposes (300+ in our case) to our agent and let the agent do its tool selection and invocation accordingly.
2. **Using search**. In the second approach, we do a search based on the topic at hand, and only send in the most relevant tools to the agent. To prove the point, we use two different topics: math (adding numbers), and food (booking a restaurant reservation), each requiring a different set of tools.

To normalize the latency distribution and get a meaningful comparison, we perform multiple iterations of
each approach. Also, to avoid overstating the gains, when doing the search approach, we include not only the
latency of the agent invocation, but also the latency of performing the tool search. For each 
iteration, we hand the agent two tasks: 

1. Math task -  add 2 numbers 
2. Food task - book a restaurant reservation

The results below demonstrate the benefits, highlighting 3x latency reduction, and even greater reduction in
input token usage. Note that while token usage savings translate to cost savings, that may not be as impactful due to
the relatively lower cost of input tokens (for many model providers, input tokens are much lest costly). Even so, for 
large scale agent deployment, even input token usage costs can add up, so dynamic search can help reduce 
agent runtime costs as well.

#### Measure latency and token usage for agent using the entire set of MCP tools

In [None]:
iterations = 2
full_tokens = light_tokens = 0
full_elapsed_time = light_elapsed_time = 0

jwtToken = utils.get_bearer_token(
    client_id=cognito_response["client_id"],
    username="testuser",
    password="MyPassword123!",
)
client = MCPClient(
    lambda: streamablehttp_client(
        f"{gatewayEndpoint}", headers={"Authorization": f"Bearer {jwtToken}"}
    )
)

In [None]:
with client:
    all_tools = get_all_mcp_tools_from_mcp_client(client)
    print(f"\nFound {len(all_tools)} tools from list_tools_sync() on mcp client\n")
    heavy_agent = Agent(
        model=bedrockmodel, tools=all_tools, callback_handler=null_callback_handler
    )

    math_input = "add 100 plus <iteration>"
    food_input = (
        "book me a table for 2 at Burger King under name Jo Smith at 7pm August <day>"
    )

    print("using agent with ALL tools...")
    start_time = time.time()

    for i in range(iterations):
        result = heavy_agent(math_input.replace("<iteration>", str(i + 1)))
        print(f"{i+1}) {result.message['content'][0]['text']}")

        result = heavy_agent(food_input.replace("<day>", str(i + 1)))
        print(f"{i+1}) {result.message['content'][0]['text']}")

    end_time = time.time()
    full_tokens = result.metrics.accumulated_usage["totalTokens"]
    full_elapsed_time = end_time - start_time
    print(f"\nTotal time: {full_elapsed_time:.1f} s, tokens: {full_tokens:,d}\n")

#### Measure latency and token usage for agents using Gateway Search
Now we'll use a dynamic approach, calling search to find relevant tools, and then calling the
agent with only those relevant tools. Note that since we are resetting the agent on each 
conversation turn, we're also intializing the message list from conversation history of the prior turn.

In [None]:
with client:
    print("using agent with ONLY tools from focused search...")
    start_time = time.time()
    messages = []

    light_agent = Agent()

    for i in range(iterations):
        print("Searching for an ADDING tool from endpoint with full set of tools...")
        tools_found = tool_search(
            gateway_endpoint=gatewayEndpoint,
            jwt_token=jwtToken,
            query="tools for simply adding two numbers",
        )
        print(f"Top tool found: {tools_found[0]['name']}\n")
        light_agent = Agent(
            model=bedrockmodel,
            tools=tools_to_strands_mcp_tools(tools_found, 1),
            messages=messages,
            callback_handler=null_callback_handler,
        )
        light_result = light_agent(math_input.replace("<iteration>", str(i + 1)))
        print(f"{i+1}) {light_result.message['content'][0]['text']}")
        messages = light_agent.messages

        print(
            "Searching for a RESTAURANT BOOKING tool from endpoint with full set of tools..."
        )
        tools_found = tool_search(
            gateway_endpoint=gatewayEndpoint,
            jwt_token=jwtToken,
            query="tools for booking a restaurant reservation",
        )
        print(f"Top tool found: {tools_found[0]['name']}\n")
        light_agent = Agent(
            model=bedrockmodel,
            tools=tools_to_strands_mcp_tools(tools_found, 1),
            messages=messages,
            callback_handler=null_callback_handler,
        )
        light_result = light_agent(food_input.replace("<day>", str(i + 1)))
        print(f"{i+1}) {light_result.message['content'][0]['text']}")
        messages = light_agent.messages
        light_tokens = light_result.metrics.accumulated_usage["totalTokens"]
    end_time = time.time()

    light_elapsed_time = end_time - start_time
    print(f"\nTotal time: {light_elapsed_time:.1f} s, tokens: {light_tokens:,d}\n")

#### Compare results, higlighting benefits of search

In [None]:
print(
    f"\n\nLatency without search: {full_elapsed_time:.1f}s, using search: {light_elapsed_time:.1f}s"
)
print(f"Tokens without search: {full_tokens:,d}, using search: {light_tokens:,d}")

# Conclusion
In this tutorial, you have learned about Amazon Bedrock AgentCore Gateway and its built-in 
fully managed semantic search capability. You have seen the following:

- how to create a gateway with semantic search enabled
- how to add multiple gateway targets to surface 300+ MCP tools from a single endpoint
- how to list the tools on your gateway using 3 different approaches
- how to use the built-in semantic search tool to find relevant tools
- how to integrate search with your Strands Agent
- how to compare performance of an agent using a server with hundreds of tools versus one that uses semantic search to narrow tools to a specific topic

AgentCore Gateway search is helpful for more advanced use cases as well. By offering the search as a native
MCP tool and not just a control plane API, you can imagine giving your agents more autonomy to discover new
MCP servers, and find new capabilities at runtime leading to breakthroughs in solving more challenging problems.
In addition, search is an important foundation for MCP registries and supporting agent developers as they 
design and build new agents.

# Cleaning up resources

First let's define some helper functions for cleaning up AgentCore Gateway resources.

In [None]:
def delete_gatewaytarget(gateway_id):
    response = agentcore_client.list_gateway_targets(gatewayIdentifier=gateway_id)

    print(f"Found {len(response['items'])} targets for the gateway")

    for target in response["items"]:
        print(
            f"Deleting target with Name: {target['name']} and Id: {target['targetId']}"
        )

        response = agentcore_client.delete_gateway_target(
            gatewayIdentifier=gateway_id, targetId=target["targetId"]
        )
        time.sleep(20)


def delete_gateway(gateway_id):
    response = agentcore_client.delete_gateway(gatewayIdentifier=gateway_id)

### Deleting Gateway Targets

In [None]:
delete_gatewaytarget(gateway_id=gatewayId)

### Deleting the Gateway itself

In [None]:
delete_gateway(gateway_id=gatewayId)

In [None]:
lambda_arns = [
    calc_lambda_resp["lambda_function_arn"],
    restaurant_lambda_resp["lambda_function_arn"],
]

for arn in lambda_arns:
    if utils.delete_gateway_lambda(arn):
        print(f"Deleted Lambda: {arn}")
    else:
        print(f"Lambda {arn} not found or deletion failed")

In [None]:
# Gateway role cleanup
if utils.delete_gateway_iam_role():
    print("Gateway IAM role deleted")
else:
    print("Gateway IAM role not found or deletion failed")

# Cognito cleanup
if utils.delete_cognito_user_pool():
    print("Cognito pool deleted")
else:
    print("✗ Failed to delete Cognito pool")