In [1]:
%pip install --upgrade -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


Basic Bedrock Integration

In [None]:
import boto3
import json
import os
import urllib3

# Read the API KEYs from the environment, replace the default values (the second argument) with your own keys if needed
payi_api_key = os.getenv("PAYI_API_KEY", "YOUR_PAYI_API_KEY")

payi_base_url = "https://api.pay-i.com"

def handle_payi_parameters(params, context, **kwargs):
    context["extra_headers"] = params.pop("extra_headers", {})

def redirect_to_payi(request, event_name, **kwargs):
    if not event_name.startswith('request-created.bedrock-runtime'):
        return
    
    parsed_url = urllib3.util.parse_url(request.url)
    route_path = parsed_url.path
    request.url = f"{payi_base_url}/api/v1/proxy/aws.bedrock{route_path}"

    request.headers['xProxy-api-key'] = payi_api_key
    request.headers['xProxy-Provider-BaseUri'] = parsed_url.scheme + "://" + parsed_url.host
    extra_headers = request.context.get('extra_headers', {})
    for key, value in extra_headers.items():
        request.headers[key] = value


def register_bedrock_client_callbacks(client, model):
    # Pass a unqiue_id to avoid registering the same callback multiple times in case this cell executed more than once

    # Process the extra_headers parameter passed to the bedrock runtime call before the AWS client validates the input parameters
    client.meta.events.register(f'provide-client-params.bedrock-runtime.{model}', handle_payi_parameters, unique_id=handle_payi_parameters)

    # Redirect the request to the Pay-i endpoint after the request has been signed. 
    client.meta.events.register_last(f'request-created', redirect_to_payi, unique_id=redirect_to_payi)
    
# Substitute the region for your regional deployment
region_name = "us-west-2"

bedrock = boto3.client(
    'bedrock-runtime',
    region_name=region_name,
    )

# Register client callbacks to handle the Pay-i extra_headers parameter in the inference calls and redirect the request to the Pay-i endpoint
register_bedrock_client_callbacks(bedrock, 'InvokeModel')
register_bedrock_client_callbacks(bedrock, 'InvokeModelWithResponseStream')
register_bedrock_client_callbacks(bedrock, 'Converse')
register_bedrock_client_callbacks(bedrock, 'ConverseStream')

request_dict = {
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 512,
    "temperature": 0.5,
    "messages": [
        {
            "role": "user",
            "content": [{"type": "text", "text": "this is a test"}],
        }
    ],
}

# Convert the request to JSON
request_body = json.dumps(request_dict)
model_id = 'anthropic.claude-3-haiku-20240307-v1:0'

# Invoke the model with the request.
invoke_response = bedrock.invoke_model(
    modelId=model_id,
    body=request_body,
    )

# Decode the response body.
response = invoke_response["body"].read()

response_json = json.loads(response)
print(json.dumps(response_json, indent=4))

xproxy_result = response_json['xproxy_result']
print("xproxy_result:")
print(json.dumps(xproxy_result, indent=4))


{
    "id": "msg_bdrk_01U82fQW1KfLpKMuL7DKyQAi",
    "type": "message",
    "role": "assistant",
    "model": "claude-3-haiku-20240307",
    "content": [
        {
            "type": "text",
            "text": "Okay, this is a test. I'm ready to assist you with any questions or tasks you may have."
        }
    ],
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
        "input_tokens": 11,
        "output_tokens": 26
    },
    "xproxy_result": {
        "request_id": "1816171",
        "resource_id": "75",
        "cost": {
            "currency": "usd",
            "input": {
                "base": 2.75e-06
            },
            "output": {
                "base": 3.25e-05
            },
            "total": {
                "base": 3.525e-05
            }
        }
    }
}
xproxy_result:
{
    "request_id": "1816171",
    "resource_id": "75",
    "cost": {
        "currency": "usd",
        "input": {
            "base": 2.75e-06
        },
        

Invoke stream invocation with pay-i as the proxy.

In [3]:

response = bedrock.invoke_model_with_response_stream(
    body=request_body,
    modelId=model_id, 
)

message = ""
input_tokens = None
output_tokens = None
invoke_id = None

stream = response.get('body')

for event in stream:
    chunk = event.get('chunk')
    if not chunk:
        continue

    decode = json.loads(chunk.get('bytes').decode())

    match decode['type']:
        case "message_start":
            input_tokens = decode['message']['usage']['input_tokens']
            invoke_id = decode['message']['id']
        case "content_block_start":
            message += decode['content_block']['text']
        case "content_block_delta":
            message += decode['delta']['text']
        case "message_delta":
            output_tokens = decode['usage']['output_tokens']
        case "content_block_stop" | "message_stop":
            ...

print(message)

Okay, this is a test. I'm ready to assist you with any questions or tasks you may have.


Converse stream invocation with pay-i as the proxy.

In [4]:
converse_request_dict=[
    {
        "role": "user",
        "content": [
            {
                "text": "this is a test"
            }
        ]
    }
]
converse_request_inference_config={
    "temperature": 0.5,
    "maxTokens": 512,
}

converse_response = bedrock.converse_stream(
    modelId=model_id,
    messages=converse_request_dict,
    inferenceConfig=converse_request_inference_config
)

stream = converse_response['stream']

if stream:
    for event in stream:
        # print(f'{json.dumps(event, indent=2)}')

        # decode = json.loads(event)
        
        if 'contentBlockDelta' in event:
            message += event['contentBlockDelta']['delta']['text']
        elif 'metadata' in event:
            input_tokens = event['metadata']['usage']['inputTokens']
            output_tokens = event['metadata']['usage']['outputTokens']

print(message)


Okay, this is a test. I'm ready to assist you with any questions or tasks you may have.Okay, this is a test. I'm ready to assist you with whatever you need.


Create the payi client

In [5]:
from payi import Payi

payi_client = Payi(
    api_key=payi_api_key
)

Use the Pay-i SDK to generate the headers to send a request with request tags

In [8]:
from payi.lib.helpers import create_headers

# The AWS client will only allow the extra_headers parameter if the event callbacks above are registered
invoke_response = bedrock.invoke_model(
    modelId=model_id,
    body=request_body,
    extra_headers=create_headers(request_tags=["x", "y"])
)

response = invoke_response["body"].read()
response_json = json.loads(response)
print(json.dumps(response_json, indent=4))

xproxy_result = response_json['xproxy_result']
print("xproxy_result:")
print(json.dumps(xproxy_result, indent=4))

{
    "id": "msg_bdrk_01M9jKDZvccNihb12j5mWaji",
    "type": "message",
    "role": "assistant",
    "model": "claude-3-haiku-20240307",
    "content": [
        {
            "type": "text",
            "text": "Okay, this is a test response from me."
        }
    ],
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
        "input_tokens": 11,
        "output_tokens": 14
    },
    "xproxy_result": {
        "request_id": "1816172",
        "request_tags": [
            "x",
            "y"
        ],
        "resource_id": "75",
        "cost": {
            "currency": "usd",
            "input": {
                "base": 2.75e-06
            },
            "output": {
                "base": 1.75e-05
            },
            "total": {
                "base": 2.025e-05
            }
        }
    }
}
xproxy_result:
{
    "request_id": "1816172",
    "request_tags": [
        "x",
        "y"
    ],
    "resource_id": "75",
    "cost": {
        "currency":

Create a limit and make a request with that limit

In [None]:
# Create a limit
limit_response = payi_client.limits.create(
    # As long as the limit configuration remains the same across creates, the same limit name can be used repeatedly
    limit_name='Bedrock quickstart allow limit',
    max=12.50, #$12.50 USD
    limit_type="Allow",
    limit_tags=["example_limit"]
)

limit_name = limit_response.limit.limit_name
limit_id = limit_response.limit.limit_id

print("Limit Created")
print(f"Limit Name: {limit_name}")
print(f"Limit ID: {limit_id}")

invoke_response = bedrock.invoke_model(
    modelId=model_id,
    body=request_body,
    extra_headers=create_headers(
        request_tags=["x", "y"],
        limit_ids=[limit_id]
    )
)

response = invoke_response["body"].read()
response_json = json.loads(response)
print(json.dumps(response_json, indent=4))

xproxy_result = response_json['xproxy_result']
print(json.dumps(xproxy_result, indent=4))

See limit status

In [None]:
invoke_response = payi_client.limits.retrieve(limit_id=limit_id)
print(f"Limit Name: {invoke_response.limit.limit_name}")
print(f"Limit ID: {invoke_response.limit.limit_id}")
print(f"Limit Creation Timestamp: {invoke_response.limit.limit_creation_timestamp}")
print(f"Limit Tags: {invoke_response.limit.limit_tags}")
print(f"Limit Input Base Cost: {invoke_response.limit.totals.cost.input.base}")
print(f"Limit Output Base Cost: {invoke_response.limit.totals.cost.output.base}")
print(f"Limit Total Base Cost: {invoke_response.limit.totals.cost.output.base}")

Make an ingest call with pre-computed token values

In [None]:
invoke_response = payi_client.ingest.units(
    category="system.aws.bedrock",
    resource=model_id,
    units={ "text": { "input": 50, "output": 100 } },
    limit_ids=[limit_id],
    request_tags=["a", "b"]
)

print(f"Ingest request ID: {invoke_response.request_id}")
print(f"Input Base Cost: {invoke_response.xproxy_result.cost.input.base}")
print(f"Output Base Cost: {invoke_response.xproxy_result.cost.output.base}")
print(f"Total Base Cost: {invoke_response.xproxy_result.cost.total.base}")

Reset a limit back to zero tracked cost

In [None]:
invoke_response = payi_client.limits.reset(limit_id=limit_id)
print(invoke_response.message)
print("State prior to reset: ")
print(f"Limit Name: {invoke_response.limit_history.limit_name}")
print(f"Limit ID: {invoke_response.limit_history.limit_id}")
print(f"Limit Tags: {invoke_response.limit_history.limit_tags}")
print(f"Limit Reset Timestamp: {invoke_response.limit_history.limit_reset_timestamp}")
print(f"Limit Input Base Cost: {invoke_response.limit_history.totals.cost.input.base}")
print(f"Limit Output Base Cost: {invoke_response.limit_history.totals.cost.output.base}")
print(f"Limit Total Base Cost: {invoke_response.limit_history.totals.cost.total.base}")

print("\nState after reset:")
invoke_response = payi_client.limits.retrieve(limit_id=limit_id)
print(f"Limit Name: {invoke_response.limit.limit_name}")
print(f"Limit ID: {invoke_response.limit.limit_id}")
print(f"Limit Creation Timestamp: {invoke_response.limit.limit_creation_timestamp}")
print(f"Limit Tags: {invoke_response.limit.limit_tags}")
print(f"Limit Input Base Cost: {invoke_response.limit.totals.cost}")
print(f"Limit Output Base Cost: {invoke_response.limit.totals.cost.output.base}")
print(f"Limit Total Base Cost: {invoke_response.limit.totals.cost.total.base}")

Create a small blocking limit that will prevent calls from happening that exceed the maximum, then capture the output.

In [None]:
limit_response = payi_client.limits.create(
    #As long as the limit configuration remains the same across creates, the same limit name can be used repeatedly
    limit_name='Bedrock quickstart block limit',
    max=0.00000001, 
    limit_type="block",
    limit_tags=["limit_block_example"]
)
block_limit = limit_response.limit.limit_id

print("Limit Created")
print(f"Limit Name: {limit_response.limit.limit_name}")
print(f"Limit ID: {limit_response.limit.limit_id}")

try:
    longer_request_1_dict = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 512,
        "temperature": 0.5,
        "messages": [
            {
                "role": "user",
                "content": [{"type": "text", "text": "provide me a list of toys for children 5 and under"}],
            }
        ],
    }
    longer_request_1_body =json.dumps(longer_request_1_dict)

    invoke_response = bedrock.invoke_model(
        modelId=model_id,
        body=longer_request_1_body,
        extra_headers=create_headers(
            request_tags=["x", "y"],
            limit_ids=[block_limit]
        )
    )

    response = invoke_response["body"].read()
    response_json = json.loads(response)
    print(json.dumps(response_json, indent=4))

    longer_request_2_dict = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 512,
        "temperature": 0.5,
        "messages": [
            {
                "role": "user",
                "content": [{"type": "text", "text": "tell me a short story about a toy"}],
            }
        ],
    }
    longer_request_2_body =json.dumps(longer_request_2_dict)

    invoke_response = bedrock.invoke_model(
        modelId=model_id,
        body=longer_request_2_body,
        extra_headers=create_headers(
            request_tags=["x", "y"],
            limit_ids=[block_limit]
        )
    )

    # This will note execute as invoke_model call will raise an exception due to the blocking limit returning with a 4xx HTTP status code
    response = invoke_response["body"].read()
    response_json = json.loads(response)
    print(json.dumps(response_json, indent=4))

except Exception as e:
    print(json.dumps(e.response, indent=4))

Create an experience type and send a request with it. Pay-i will auto generate an experience id that can be specified later.

In [None]:
# Create an experience type
exp_name="quickstart_experience"
exp_type_response = payi_client.experiences.types.create(
    name=exp_name,
    description="An example of an experience"
)

# Make a request using the limit, request tags, and experience
invoke_response = bedrock.invoke_model(
    modelId=model_id,
    body=request_body,
    extra_headers=create_headers(
        request_tags=["x", "y"],
        limit_ids=[limit_id],
        experience_name=exp_name
    )
)

response = invoke_response["body"].read()
response_json = json.loads(response)
print(json.dumps(response_json, indent=4))

xproxy_result = response_json['xproxy_result']
experience_id = xproxy_result['experience_id']
print("xproxy_result:")
print(json.dumps(xproxy_result, indent=4))

Send a request with a limit and user ID

In [None]:
# Make a request using the limit, request tags, and user id
invoke_response = bedrock.invoke_model(
    modelId=model_id,
    body=request_body,
    extra_headers=create_headers(
        limit_ids=[limit_id],
        # user id can be any string value
        user_id="example_user_id"
    )
)

response = invoke_response["body"].read()
response_json = json.loads(response)
print(json.dumps(response_json, indent=4))

xproxy_result = response_json['xproxy_result']
print(json.dumps(xproxy_result, indent=4))

List and then delete all limits

In [None]:
invoke_response = payi_client.limits.list()
for limit in invoke_response.items:
    print("Deleting limit with id:" + limit.limit_id)
    payi_client.limits.delete(limit.limit_id)