# Bedrock Basics
This notebook provides basic functionality for talking with Bedrock.  The functions are:

  - fill_defaults: Fills in all the default values for each call.  Allows you to send Bedrock as much or as little information as you like. Used by ask_bedrock.
  - calc_cost:  Calculates the cost, in dollars, for each call.  Based on public pricing as of 5/1/2024, us-east-1 region.  Used by ask_bedrock.
  - create_message_json:  takes an array of conversation turns and wraps them in the JSON format used by the Converse API
  - ask_bedrock:  Use the converse API to send a query to Bedrock.
  - ask_bedrock_threaded:  Call ask_bedrock in parallel threads for maximum efficiency.

All of the functions work around the idea of a Query object.  A Query represents a single call to the Bedrock API.  The program should load up the Query with all the info Bedrock needs, like model and prompt, and then after Bedrock is called, the response to that prompt is stored in the same Query object.  To support multiturn conversations, the full conversation history is added to a Query object. See part 3 of this notebook for examples.
  
#### This notebook has three sections:
  1) Prepare the environment and install dependencies.
  2) Define the basic functions described above.
  3) Example use for each of the functions.
     - Basic Use
     - Threaded Basic Use
     - Basic Muli-turn Conversation Use
     - Basic Image Input
     - Basic caching
     - Tool Use (Function Calling)
    

#### This notebook was authored by Justin Muller.  Email: justmul@amazon.com

## 1) Prepare the environment and install dependencies.
First install and update the libraries we need.

In [3]:
import os, tabulate, json, time

In [None]:
!pip install --upgrade --force-reinstall --quiet boto3

In [2]:
#Connect with Claude via Bedrock using Boto3
import boto3
from botocore.config import Config

#increase the standard time out limits in boto3, because Bedrock may take a while to respond to large requests.
my_config = Config(
    connect_timeout=60*5,
    read_timeout=60*5,
)
bedrock = boto3.client(service_name='bedrock-runtime',config=my_config)
bedrock_service = boto3.client(service_name='bedrock',config=my_config)

In [137]:
# list the endpoints we have avalaible.
# If the model you want is not listed here, check to make sure that you have added access to the model in this region via the console.:
models = [["Model Name","Model ID"]]
modelName_to_ID = {} #used to make code more readable by using model names rather than their endpoint ID.
model_list = bedrock_service.list_inference_profiles()["inferenceProfileSummaries"]
for line in model_list:
    models.append([line['inferenceProfileName'],line['inferenceProfileId']])
    modelName_to_ID[line['inferenceProfileName']] = line['inferenceProfileId']
tabulate.tabulate(models, tablefmt='html')

0,1
Model Name,Model ID
US Anthropic Claude 3 Sonnet,us.anthropic.claude-3-sonnet-20240229-v1:0
US Anthropic Claude 3 Opus,us.anthropic.claude-3-opus-20240229-v1:0
US Anthropic Claude 3 Haiku,us.anthropic.claude-3-haiku-20240307-v1:0
US Meta Llama 3.2 11B Instruct,us.meta.llama3-2-11b-instruct-v1:0
US Meta Llama 3.2 3B Instruct,us.meta.llama3-2-3b-instruct-v1:0
US Meta Llama 3.2 90B Instruct,us.meta.llama3-2-90b-instruct-v1:0
US Meta Llama 3.2 1B Instruct,us.meta.llama3-2-1b-instruct-v1:0
US Anthropic Claude 3.5 Sonnet,us.anthropic.claude-3-5-sonnet-20240620-v1:0
US Anthropic Claude 3.5 Haiku,us.anthropic.claude-3-5-haiku-20241022-v1:0


## 2) Define the basic functions

  - fill_defaults: Fills in all the default values for each call.  Allows you to send Bedrock as much or as little information as you like. Used by ask_bedrock.
  - calc_cost:  Calculates the cost, in dollars, for each call.  Based on public pricing as of 5/1/2024, us-east-1 region.  Used by ask_bedrock.
  - create_message_json:  takes an array of conversation turns and wraps them in the JSON format used by the Converse API
  - ask_bedrock:  Use the converse API to send a query to Bedrock.
  - ask_bedrock_threaded:  Call ask_bedrock in parallel threads for maximum efficiency.

In [39]:
def fill_defaults(query):
    """
    Fills in all the default values for each call.  Allows you to send Bedrock as much or as little information  as you like. Used by ask_bedrock.
    This only fills in values that are missing.  Change values here to make them default for all calls.
    If the prompt is a simple string, it also converts the prompt to a json format expected by the Converse API.
    """
    #prompt defaults
    if not 'prompt' in query:query['prompt'] = "What is your quest?"
    if not 'system' in query:query['system'] = None
    if not 'tools' in query:query['tools'] = None
    if not 'model' in query:query['model'] = "US Nova Lite"
    if not 'modelID' in query:query['modelID'] = modelName_to_ID[query['model']]

    #Inference defaults
    if not 'maxTokens' in query:query['maxTokens'] = None
    if not 'stopSequences' in query:query['stopSequences'] = []
    if not 'temperature' in query:query['temperature'] = 0.5

    #Converse requires that the prompt be a JSON opject, so change to that if it is currently a string.
    if type(query['prompt'])==str:
        query['prompt'] = create_message_json([["user",query['prompt']]])

    #Converse requires that the system prompt be a list, so change to that if it is currently a string.
    #if the system propmt is an empty string, leave it along since it won't be sent to Bedrock.
    if type(query['system'])==str and query['system'] != None:
        query['system'] = [{"text": query['system']}]

    #Converse wants the tools as an object, so convert to that.
    if query['tools'] is not None:
       query['tools']={"tools": query['tools']}

In [131]:
#helper function for converting tokens to public pricing for Bedrock, as of May 7 2025.
def calc_cost(model,usage):
    """
    Calculates the cost, in dollars, for each call.  Based on public pricing as of 12/2/2024, us-east-1 region.  Used by ask_bedrock.
    model is the model name used to make the call.
    usage is the usage object returned by the call the Converse API as part of the response. 
    """
    cost = 0
    input_tokens = usage['inputTokens']
    output_tokens = usage['outputTokens']
    read_cache_tokens = 0
    read_cache_cost = 0
    #for anthropic models, there's an extra charge for cache writes
    write_cache_cost = 0
    write_cache_tokens = 0
    
    if 'cacheWriteInputTokens' in usage:#this usage only shows up when a cache point is included in the prompt.
        read_cache_tokens = usage['cacheReadInputTokens']
        write_cache_tokens = usage['cacheWriteInputTokens']
        read_cache_cost = -1 #set to negative 1, so we can catch cases where cache was used, but we don't have pricing info.
        write_cache_cost = -1 #set to negative 1, so we can catch cases where cache was used, but we don't have pricing info.

    million=1000000
    thousand=1000
    match model:
        case "us.anthropic.claude-3-haiku-20240307-v1:0":
            input_cost = 0.00025/thousand
            output_cost = 0.00125/thousand
        case "us.anthropic.claude-3-5-haiku-20241022-v1:0":
            input_cost = 0.0008/thousand
            output_cost = 0.004/thousand
            read_cache_cost = 0.00008/thousand
            write_cache_cost = 0.001/thousand
        case "us.anthropic.claude-3-5-sonnet-20241022-v2:0":
            input_cost = 0.003/thousand
            output_cost = 0.015/thousand
            read_cache_cost = 0.0003/thousand
            write_cache_cost = 0.00375/thousand
        case "us.anthropic.claude-3-7-sonnet-20250219-v1:0":
            input_cost = 0.003/thousand
            output_cost = 0.015/thousand
            read_cache_cost = 0.0003/thousand
            write_cache_cost = 0.00375/thousand
        case "us.meta.llama3-2-90b-instruct-v1:0":
            input_cost = 0.00072/thousand
            output_cost = 0.00072/thousand
        case "us.meta.llama3-2-11b-instruct-v1:0":
            input_cost = 0.00016/thousand
            output_cost = 0.00016/thousand
        case "us.amazon.nova-pro-v1:0":
            input_cost = 0.0008/thousand
            output_cost = 0.0032/thousand
            read_cache_cost = 0.0002/thousand
            write_cache_cost = input_cost
        case "us.amazon.nova-premier-v1:0":
            input_cost = 0.0025/thousand
            output_cost = 0.0125/thousand
        case "us.amazon.nova-lite-v1:0":
            input_cost = 0.00006/thousand
            output_cost = 0.00024/thousand
            read_cache_cost = 0.000015/thousand
            write_cache_cost = input_cost
        case "us.amazon.nova-micro-v1:0":
            input_cost = 0.000035/thousand
            output_cost = 0.00014/thousand
            read_cache_cost = 0.00000875/thousand
            write_cache_cost = input_cost
        case _:
            print ("Warning!  No pricing data found for this model.  Setting cost to 0.  Please update calc_cost().")
            input_cost = 0
            output_cost = 0
            cache_cost = 0
            
    if write_cache_cost == -1 or read_cache_cost == -1:
        print ("Warning!  No pricing data found for CACHE for this model.  Setting cost to 0.  Please update calc_cost().")
        input_cost = 0
        output_cost = 0
        cache_cost = 0
        
    cost = input_tokens*input_cost + output_tokens*output_cost + write_cache_tokens*write_cache_cost + read_cache_tokens*read_cache_cost
    return cost

In [119]:
def create_message_json(messages):
    """
    takes an array of conversation turns and wraps them in the JSON format used by the Converse API
    Each element of the array should be a pair (TYPE,CONTENT) where TYPE identifies which turn it is,
    and CONTENT is the content for that turn.  TYPE can be user, assistant, image, tool_request or tool_result
    This is the expected format:

    user, prompt string
    assistant, response string
    tool_request, toolID, tool name, tool input JSON
    tool_result, toolID, tool result string
    image, image location

    This function exists because the Converse API uses a high number of nested dictionaries, which can be hard to track.
    This function simplifies tracking conversation history because it can be stored in an array of ordered turns.
    """
    message_jsons = []
    for msg in messages:
        if msg[0]=="user":
            message_jsons.append({"role": "user","content": [{"text": msg[1]}]})
        elif msg[0]=="assistant":
            message_jsons.append({"role": "assistant","content": [{"text": msg[1]}]})
        elif msg[0]=="cachePoint":
            message_jsons.append({"role": "user","content": [{"cachePoint": {"type": msg[1]}}]})
        elif msg[0]=="tool_request":
            message_jsons.append({"role": "assistant","content": [{"toolUse": {"toolUseId":msg[1],"name":msg[2],"input":msg[3]}}]})
        elif msg[0]=="tool_result":
            message_jsons.append({"role": "user","content": [{"toolResult": {"toolUseId":msg[1],"content":[{"json":{"result":msg[2]}}]}}]})
        elif msg[0]=="image":
            with open(msg[1], "rb") as f:
                image = f.read()
            filename, file_extension = os.path.splitext(msg[1])
            file_extension = file_extension.replace(".","")
            if file_extension == "jpg": file_extension = "jpeg" #requirment of the Converse API
            message_jsons.append({"role": "user","content": [{"image": {"format":file_extension,"source":{"bytes":image}}}]})
        else:
            raise(Exception("Error!  Message type not recognized:",msg[0]))
    #pack concurent turns together.  Converse requires that the array always alternates between user and assistiant, so if any are two in a row, they need to be in the same user block.
    packed_messages = []
    current_message = ""
    for i, this_msg in enumerate(message_jsons):
        if i == 0:
            current_message = (this_msg['role'],this_msg['content'])
            if i+1>=len(message_jsons):#this is the only message, no need to pack more
                packed_messages = [this_msg]
            continue
        if this_msg['role'] == current_message[0]:#next message is the same role as current, so pack it in
            current_message[1].append(this_msg['content'][0])
        else:#this is a new role, so save the previous stuff, and make this the new stuff.
            packed_messages.append({"role": current_message[0],"content": current_message[1]})
            current_message = (this_msg['role'],this_msg['content'])
            
        if i+1>=len(message_jsons):#this is the last message, save it to the packed list.
                packed_messages.append({"role": current_message[0],"content": current_message[1]})
    return packed_messages
    

In [120]:
def ask_bedrock(query, DEBUG=False):
    """
    Use the converse API to send a query to Claude.
    Note that if something goes wrong with calling the model, the stop reason will be ERROR, and the output message will be the error message.
    """
    #first, fill in any values this query is missing.
    fill_defaults(query)

    #set up the inference configuration options
    inference_config = {
        "stopSequences": query['stopSequences'],
        "temperature": query['temperature']
    }

    if query['maxTokens'] is not None:
        inference_config["maxTokens"] = query['maxTokens'],

    #build the parameters for calling bedrock, which change depending on the type of call.
    query_parameters = {}
    query_parameters['modelId'] = query['modelID']
    query_parameters['messages'] = query['prompt']
    query_parameters['inferenceConfig'] = inference_config

    if query['system'] is not  None:
        query_parameters['system'] = query['system']

    if query['tools'] is not  None:
        query_parameters['toolConfig'] = query['tools']
    
    try:
        #make the call to Bedrock
        response = bedrock.converse(**query_parameters)
        
        #unpack the response from Bedrock
        query['stopReason'] = response['stopReason']
            
        if query['stopReason'] == "tool_use":
            #for tool use, we capture the relevant tool related information.
            content_blocks = response['output']['message']['content']
            for content in content_blocks:#skip to the block with the tool use request
                if not 'toolUse' in content:continue
                query['output'] = content#used as a conversation turn.
                query['toolUseId'] = content['toolUse']['toolUseId']
                query['toolName'] = content['toolUse']['name']
                query['toolInput'] = content['toolUse']['input']
        else:
            query['output'] = response['output']['message']['content'][0]['text']

        #grab the usage information
        query['usage'] = response['usage'] #contains input and output token counts
        query['latencyMs'] = response['metrics']['latencyMs']
        query['cost'] = calc_cost(query['modelID'],response['usage'])
        
    except Exception as E:
        if DEBUG:
            print ("Warning!  Model returned the following error:")
            print (E)
        query['output'] = str(E)
        query['stopReason'] = "ERROR"

In [121]:
from queue import Queue
from threading import Thread

# Threading function for queue processing.
def thread_request(q):
    while not q.empty():
        this_query = q.get()    #fetch new work from the Queue
        try:
            ask_bedrock(this_query[0],DEBUG=this_query[1])
        except Exception as e:
            print('Error with threaded query:',str(e))
        #signal to the queue that task has been processed
        q.task_done()
    return True

def ask_bedrock_threaded(queries,MAX_THREADS = 50,DEBUG=False):
    '''
    Call ask_bedrock in parallel threads for maximum efficiency.
    queries is just a list of query objects.  The threads do not return data because they add to each query object directly.
    MAX_THREADS is how many queries to make in parallel.  Adjust this to avoid throttling.
    '''
    q = Queue(maxsize=0)
    num_theads = min(MAX_THREADS, len(queries))
    
    #Populating Queue with tasks
    for query in queries:
        q.put((query,DEBUG))
        
    #Starting worker threads on queue processing
    if DEBUG:print("Starting %s threads."%str(num_theads))
    for i in range(num_theads):
        #print('Starting thread ', i)
        worker = Thread(target=thread_request, args=(q,))
        worker.daemon = True
        worker.start()

    #now we wait until the queue has been processed
    q.join()

## 3) Example Use

### 3a) Basic use

In [122]:
#Just send a prompt, everything else is a default value.
query = {}
query["prompt"] = "Hello!"
ask_bedrock(query)
print (query['output'])

Hello! How can I assist you today? If you have any questions or need information on a particular topic, feel free to ask. Whether it's about science, technology, history, or something else, I'm here to help.


### 3b) Threading Basic Use

In [104]:
%%time
#testing ask_bedrock_threaded with the default values
query_1 = {'prompt':"In three words, what is love?"}
query_2 = {'prompt':"In three words, what is love?"}
query_3 = {'prompt':"In three words, what is love?"}
ask_bedrock_threaded([query_1,query_2,query_3], DEBUG=False)
print (query_1['output'], " (Latency:%sms)"%query_1['latencyMs'])
print (query_2['output'], " (Latency:%sms)"%query_2['latencyMs'])
print (query_3['output'], " (Latency:%sms)"%query_3['latencyMs'])

Infinite, beautiful, complex. 

These words encapsulate the multifaceted nature of love, highlighting its boundless potential, its capacity to bring beauty into the world, and the intricate layers that make it such a profound and intricate human experience.  (Latency:395ms)
Infinite, beautiful, complex. 

These words capture the multifaceted nature of love, suggesting its boundless potential, its capacity to bring beauty into the world, and the intricate layers that make it such a profound and challenging human experience.  (Latency:397ms)
Infinite, beautiful, complex. 

These words capture some of the essence of love, suggesting its boundless nature, its capacity to bring beauty into the world, and the intricate, multifaceted experience it often represents.  (Latency:360ms)
CPU times: user 12.1 ms, sys: 0 ns, total: 12.1 ms
Wall time: 414 ms


### 3c) Basic Muli-turn Conversation Use

In [256]:
conversation_history = []
print ("Please start the conversation: (enter STOP to end)")
while True:
    user_input = input("User:")
    if user_input == "STOP": break
    conversation_history.append(['user',user_input])
    query = {}
    query['prompt'] = create_message_json(conversation_history)
    ask_bedrock(query)
    response = query['output']
    print ("Assistant:",response)
    conversation_history.append(['assistant',response])


Please start the conversation: (enter STOP to end)


User: Hello!


Assistant: Hello! How can I assist you today?


User: What do you think the color green tastes like?


Assistant: That's an interesting and imaginative question! Since colors are a visual experience and tastes are a sensory experience, there isn't a direct correspondence between them. However, we can use our creativity and associations to imagine what a color might taste like.

Some common associations with the color green include:

- Fresh, crisp flavors like green apples, limes, or fresh herbs like mint or basil.
- Earthy, vegetal flavors like green beans, asparagus, or fresh greens like spinach or kale.
- Grassy, herbal flavors like freshly cut grass or green tea.

So if I had to imagine what the color green might taste like, I'd envision something bright, fresh, and slightly bitter or tangy - perhaps a blend of green apple, fresh herbs, and leafy greens. Of course, this is just my interpretation using my imagination and knowledge. What do you think green might taste like?


User: STOP


### 3d) Basic Image Input

In [254]:
image_query = {}
prompt = create_message_json([("user","what is this a picture of?  Please be concise."),('image','duck.jpg')])
image_query['prompt'] = prompt
ask_bedrock(image_query)
print (image_query["output"])

This is a picture of a large, inflatable yellow rubber duck floating in the water in front of a cityscape.


### 3e) Basic caching
#### To use caching, simple add ("cachePoint","default") to your prompt.  Everything before that point is added to the cache.

In [135]:
cache_query = {}

#caching only works for prompts longer than 1K tokens, so lets make a long prompt.
long_string = "I love tacos! :)  " * 1000

prompt = create_message_json([("user",long_string),("cachePoint","default"),("user","What do I love?  Respond with a single word.")])
cache_query['prompt'] = prompt
ask_bedrock(cache_query)
print ("First call:")
print ("Latency: ",cache_query["latencyMs"])
print ("Usage: ",cache_query["usage"])
print ("Cost for 1000 of these calls: ", cache_query["cost"]*1000)
ask_bedrock(cache_query)
print ("Second call:")
print ("Latency: ",cache_query["latencyMs"])
print ("Usage: ",cache_query["usage"])
print ("Cost for 1000 of these calls: ", cache_query["cost"]*1000)

First call:
Latency:  387
Usage:  {'inputTokens': 12, 'outputTokens': 2, 'totalTokens': 6013, 'cacheReadInputTokens': 0, 'cacheWriteInputTokens': 5999}
Cost for 1000 of these calls:  0.36114
Second call:
Latency:  212
Usage:  {'inputTokens': 12, 'outputTokens': 2, 'totalTokens': 6013, 'cacheReadInputTokens': 5999, 'cacheWriteInputTokens': 0}
Cost for 1000 of these calls:  0.09118500000000002


### 3f) Basic Tool Use
#### Start by defining our tool

In [105]:
#the actual tool:
def example_weather_tool(time_of_day):
    if time_of_day == 'AM':
        return "Sunny"
    if time_of_day == 'PM':
        return "Rainy"
    else:
        return "Error"

#the config so that the model knows about this tool
example_tool_config = [
    {
        "toolSpec": {
            "name": "weather",
            "description": "Get the local weather.",
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "time_of_day": {
                            "type": "string",
                            "description": "The time of day to get weather for, either AM or PM."
                        }
                    },
                    "required": ["time_of_day"]
                }
            }
        }
    }
]

#### Now we make a call where the model may want to use a tool:

In [106]:
#create a list of one or more tools.  ask_clade will package this into the proper call format.
query = {}
query["tools"] = example_tool_config
msg_1 = "What is the weather this morning?"
query["prompt"] = msg_1
ask_bedrock(query)
print ("Tool requested:",query['toolName'])
print ("Tool input:",query['toolInput'])

Tool requested: weather
Tool input: {'time_of_day': 'AM'}


#### Next, call the tool as requested by the model

In [107]:

tool_result = example_weather_tool(query['toolInput']['time_of_day'])

#### Finally, return the tool's response to the model.

In [108]:
tool_request_from_claude = query['output']
message_list = []
message_list.append(["user",msg_1])
message_list.append(["tool_request",query['toolUseId'],query['toolName'],query['toolInput']])
message_list.append(['tool_result',query['toolUseId'],tool_result])

query_2 = {}
#when passing the tool results to the model, it still needs to understand to original tool.
query_2["tools"] = example_tool_config
#change our message history into the Converse API nested JSON format.
prompt = create_message_json(message_list)
query_2['prompt'] = prompt

#send the full message history, tool use request, and tool use response to Claude so that it can answer the original question.
ask_bedrock(query_2)
print (query_2['output'])

<thinking> The weather tool has provided the result for the morning weather. I can now provide this information to the User. </thinking>
The weather this morning is sunny.


### Next example?