# Calling AWS bedrock models

## 1 Boto3 version

### 1.1 Non Stream

In [9]:
import boto3
import json

# Initialize a session using Amazon Bedrock
session = boto3.Session(region_name='us-east-1')

# Create a Bedrock runtime client
bedrock_client = session.client('bedrock-runtime')

# Prepare the payload
payload = {
    "inputText": "Hi",
    "textGenerationConfig": {
        "maxTokenCount": 4096,
        "stopSequences": [],
        "temperature": 0,
        "topP": 0.9
    }
}

# Invoke the model
response = bedrock_client.invoke_model(
    modelId='amazon.titan-text-lite-v1',
    contentType='application/json',
    accept='application/json',
    body=json.dumps(payload)
)

# Read and print the response
response_body = response['body'].read().decode('utf-8')
print(response_body)


{"inputTextTokenCount":3,"results":[{"tokenCount":12,"outputText":"\nBot: Hi! How can I help you?","completionReason":"FINISH"}]}


### 1.2 Stream

In [91]:
import boto3
import json

# Initialize a session using Amazon Bedrock
session = boto3.Session(region_name='us-east-1')

# Create a Bedrock runtime client
bedrock_client = session.client('bedrock-runtime')

# Prepare the payload
payload = {
    "inputText": "Write 3 paragraphs about dinassaurs.",
    "textGenerationConfig": {
        "maxTokenCount": 4096,
        "stopSequences": [],
        "temperature": 0,
        "topP": 0.9
    }
}

# Invoke the model with streaming response
response = bedrock_client.invoke_model_with_response_stream(
    modelId='amazon.titan-text-lite-v1',
    contentType='application/json',
    accept='application/json',
    body=json.dumps(payload)
)

stream = response.get('body')
if stream:
    for event in stream:
        chunk = event.get('chunk')
        if chunk:
            print(json.loads(chunk.get('bytes').decode()))


{'outputText': '\nDinosaurs are a group of reptiles that lived on Earth from 252 to 66 million years ago. They are known for their large size, powerful limbs, and distinctive features such as tails, h', 'index': 0, 'totalOutputTextTokenCount': None, 'completionReason': None, 'inputTextTokenCount': 9}
{'outputText': "orns, and teeth. Dinosaurs were the dominant land animals for millions of years and played a crucial role in shaping the Earth's ecosystem.\n\nOne of the most famous dinosaurs is the Tyrannosaurus rex, which lived in North America during the late Cretaceous period. It ", 'index': 0, 'totalOutputTextTokenCount': None, 'completionReason': None, 'inputTextTokenCount': None}
{'outputText': 'was the largest land animal ever to walk the Earth, with a length of up to 40 feet and a weight of up to 8 tons. The Tyrannosaurus had a powerful jaw and teeth that were adapted for hunting and tearing apart prey. It also had a long tail that it used for balance and to help it move quickly t

# 2. Raw calls

### 2.1 NonStream

In [None]:
from dotenv import load_dotenv
load_dotenv()

In [60]:
import sys
import os
import datetime
import hashlib
import hmac
import requests
import json
from dotenv import load_dotenv
load_dotenv()

# ************* CONFIGURATION *************
# AWS credentials
access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

if access_key is None or secret_key is None:
    print('Error: AWS credentials are not set in environment variables.')
    sys.exit(1)

# Service and region
service = 'bedrock'
region = 'us-east-1'

# Host and endpoint
host = f'bedrock-runtime.{region}.amazonaws.com'
endpoint = f'https://{host}/model/amazon.titan-text-lite-v1/invoke'

# Request parameters
payload = {
    "inputText": "Hi",
    "textGenerationConfig": {
        "maxTokenCount": 4096,
        "stopSequences": [],
        "temperature": 0,
        "topP": 0.9
    }
}

request_parameters = json.dumps(payload)
content_type = 'application/json'

# ************* HELPER FUNCTIONS *************
def sign(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, region_name, service_name):
    k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    k_region = sign(k_date, region_name)
    k_service = sign(k_region, service_name)
    k_signing = sign(k_service, 'aws4_request')
    return k_signing

# ************* TASK 1: CREATE A CANONICAL REQUEST *************
def create_canonical_request(method, uri, query_string, canonical_headers, signed_headers, payload_hash):
    canonical_request = '\n'.join([
        method,
        uri,
        query_string,
        canonical_headers,
        signed_headers,
        payload_hash
    ])
    return canonical_request

# ************* TASK 2: CREATE THE STRING TO SIGN *************
def create_string_to_sign(algorithm, amz_date, credential_scope, canonical_request):
    string_to_sign = '\n'.join([
        algorithm,
        amz_date,
        credential_scope,
        hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
    ])
    return string_to_sign

# ************* TASK 3: CALCULATE THE SIGNATURE *************
def calculate_signature(secret_key, date_stamp, region, service, string_to_sign):
    signing_key = get_signature_key(secret_key, date_stamp, region, service)
    signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
    return signature

# ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST *************
def create_authorization_header(access_key, credential_scope, signed_headers, signature):
    authorization_header = (
        f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope}, "
        f"SignedHeaders={signed_headers}, Signature={signature}"
    )
    return authorization_header

# ************* MAIN FUNCTION *************
def main():
    method = 'POST'
    uri = '/model/amazon.titan-text-lite-v1/invoke'
    query_string = ''

    # Timestamp and date
    t = datetime.datetime.utcnow()
    amz_date = t.strftime('%Y%m%dT%H%M%SZ')  # Format: YYYYMMDD'T'HHMMSS'Z'
    date_stamp = t.strftime('%Y%m%d')  # Date without time for credential scope

    # Payload hash
    payload_hash = hashlib.sha256(request_parameters.encode('utf-8')).hexdigest()

    # Headers (lowercase)
    headers = {
        'content-type': content_type,
        'host': host,
        'x-amz-date': amz_date,
        'x-amz-content-sha256': payload_hash
    }

    # Canonical headers and signed headers
    sorted_header_keys = sorted(headers.keys())
    canonical_headers = ''
    signed_headers = ''
    for key in sorted_header_keys:
        canonical_headers += f"{key}:{headers[key]}\n"
        signed_headers += f"{key};"
    signed_headers = signed_headers.rstrip(';')

    # Create canonical request
    canonical_request = create_canonical_request(
        method,
        uri,
        query_string,
        canonical_headers,
        signed_headers,
        payload_hash
    )

    # Algorithm and credential scope
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"

    # Create string to sign
    string_to_sign = create_string_to_sign(
        algorithm,
        amz_date,
        credential_scope,
        canonical_request
    )

    # Calculate the signature
    signature = calculate_signature(
        secret_key,
        date_stamp,
        region,
        service,
        string_to_sign
    )

    # Create authorization header
    authorization_header = create_authorization_header(
        access_key,
        credential_scope,
        signed_headers,
        signature
    )

    # Add authorization header to headers
    headers['Authorization'] = authorization_header

    # Remove 'host' from the headers for the actual request (requests library adds it automatically)
    request_headers = {k: v for k, v in headers.items() if k != 'host'}

    # Make the request
    print('Making request to AWS Bedrock...')
    response = requests.post(endpoint, data=request_parameters, headers=request_headers)
    print(response)

    # Check the response
    print(f"Response Code: {response.status_code}")
    print("Response Body:")
    print(response.text)

if __name__ == '__main__':
    main()


Making request to AWS Bedrock...
<Response [200]>
Response Code: 200
Response Body:
{"inputTextTokenCount":3,"results":[{"tokenCount":12,"outputText":"\nBot: Hi! How can I help you?","completionReason":"FINISH"}]}


### 2.2 Stream

In [94]:
import sys
import os
import datetime
import hashlib
import hmac
import requests
import json
import base64
import struct
import binascii
import io

# ************* CONFIGURATION *************
# AWS credentials
access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

if access_key is None or secret_key is None:
    print('Error: AWS credentials are not set in environment variables.')
    sys.exit(1)

# Service and region
service = 'bedrock'
region = 'us-east-1'

# Host and endpoint
host = f'bedrock-runtime.{region}.amazonaws.com'
# Endpoint path
uri = '/model/amazon.titan-text-lite-v1/invoke-with-response-stream'
endpoint = f'https://{host}{uri}'

# Request parameters
payload = {
    "inputText": "Write 3 paragraphs about dinossaurs.",
    "textGenerationConfig": {
        "maxTokenCount": 4096,
        "stopSequences": [],
        "temperature": 0,
        "topP": 0.9
    }
}
request_parameters = json.dumps(payload)
content_type = 'application/json'
accept = 'application/json'  # Use 'application/json' for EventStream format

# ************* HELPER FUNCTIONS *************
def sign(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, region_name, service_name):
    k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    k_region = sign(k_date, region_name)
    k_service = sign(k_region, service_name)
    k_signing = sign(k_service, 'aws4_request')
    return k_signing

# ************* TASK 1: CREATE A CANONICAL REQUEST *************
def create_canonical_request(method, uri, query_string, canonical_headers, signed_headers, payload_hash):
    canonical_request = '\n'.join([
        method,
        uri,
        query_string,
        canonical_headers,
        signed_headers,
        payload_hash
    ])
    return canonical_request

# ************* TASK 2: CREATE THE STRING TO SIGN *************
def create_string_to_sign(algorithm, amz_date, credential_scope, canonical_request):
    string_to_sign = '\n'.join([
        algorithm,
        amz_date,
        credential_scope,
        hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
    ])
    return string_to_sign

# ************* TASK 3: CALCULATE THE SIGNATURE *************
def calculate_signature(secret_key, date_stamp, region, service, string_to_sign):
    signing_key = get_signature_key(secret_key, date_stamp, region, service)
    signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
    return signature

# ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST *************
def create_authorization_header(access_key, credential_scope, signed_headers, signature):
    authorization_header = (
        f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope}, "
        f"SignedHeaders={signed_headers}, Signature={signature}"
    )
    return authorization_header

# ************* EVENTSTREAM PARSER FUNCTION *************
def parse_event_stream_incremental(buffer):
    buffer_length = len(buffer)
    offset = 0
    while True:
        if buffer_length - offset < 4:
            # Not enough data to read total_length
            break
        total_length = struct.unpack('>I', buffer[offset:offset+4])[0]
        if buffer_length - offset < total_length:
            # Not enough data to read the whole message
            break
        message = buffer[offset:offset+total_length]
        # Parse the prelude
        headers_length = struct.unpack('>I', message[4:8])[0]
        prelude_crc = message[8:12]
        prelude = message[:8]
        computed_prelude_crc = binascii.crc32(prelude) & 0xffffffff
        if struct.unpack('>I', prelude_crc)[0] != computed_prelude_crc:
            raise ValueError('Prelude CRC mismatch')

        # Parse headers
        headers = {}
        pos = 12  # Starting position after prelude and prelude CRC
        headers_end = pos + headers_length
        if headers_end > total_length - 4:
            raise ValueError('Headers extend beyond message length')
        while pos < headers_end:
            if pos + 1 > total_length:
                raise ValueError('Incomplete header')
            name_len = message[pos]
            pos += 1
            if pos + name_len > total_length:
                raise ValueError('Incomplete header name')
            name = message[pos:pos+name_len].decode('utf-8')
            pos += name_len
            if pos + 1 > total_length:
                raise ValueError('Incomplete value type')
            value_type = message[pos]
            pos += 1
            if value_type == 7:  # String
                if pos + 2 > total_length:
                    raise ValueError('Incomplete value length')
                value_len = struct.unpack('>H', message[pos:pos+2])[0]
                pos += 2
                if pos + value_len > total_length:
                    raise ValueError('Incomplete value')
                value = message[pos:pos+value_len].decode('utf-8')
                pos += value_len
            else:
                # Handle other value types if necessary
                value = None
            headers[name] = value

        # Payload
        payload = message[headers_end:-4]  # Exclude the Message CRC at the end

        # Message CRC
        message_crc = struct.unpack('>I', message[-4:])[0]
        computed_message_crc = binascii.crc32(message[:-4]) & 0xffffffff
        if message_crc != computed_message_crc:
            raise ValueError('Message CRC mismatch')

        # Process the event
        message_length = total_length
        yield headers, payload, message_length

        offset += total_length

        if offset >= buffer_length:
            break

# ************* MAIN FUNCTION *************
def main():
    method = 'POST'
    query_string = ''

    # Timestamp and date
    t = datetime.datetime.utcnow()
    amz_date = t.strftime('%Y%m%dT%H%M%SZ')  # Format: YYYYMMDD'T'HHMMSS'Z'
    date_stamp = t.strftime('%Y%m%d')  # Date without time for credential scope

    # Payload hash
    payload_hash = hashlib.sha256(request_parameters.encode('utf-8')).hexdigest()

    # Headers (lowercase)
    headers = {
        'content-type': content_type,
        'host': host,
        'x-amz-date': amz_date,
        'x-amz-content-sha256': payload_hash,
        'accept': accept
    }

    # Canonical headers and signed headers
    sorted_header_keys = sorted(headers.keys())
    canonical_headers = ''
    signed_headers = ''
    for key in sorted_header_keys:
        canonical_headers += f"{key}:{headers[key]}\n"
        signed_headers += f"{key};"
    signed_headers = signed_headers.rstrip(';')

    # Create canonical request
    canonical_request = create_canonical_request(
        method,
        uri,
        query_string,
        canonical_headers,
        signed_headers,
        payload_hash
    )

    # Algorithm and credential scope
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"

    # Create string to sign
    string_to_sign = create_string_to_sign(
        algorithm,
        amz_date,
        credential_scope,
        canonical_request
    )

    # Calculate the signature
    signature = calculate_signature(
        secret_key,
        date_stamp,
        region,
        service,
        string_to_sign
    )

    # Create authorization header
    authorization_header = create_authorization_header(
        access_key,
        credential_scope,
        signed_headers,
        signature
    )

    # Add authorization header to headers
    headers['Authorization'] = authorization_header

    # Remove 'host' from the headers for the actual request (requests library adds it automatically)
    request_headers = {k: v for k, v in headers.items() if k.lower() != 'host'}

    # Make the request with streaming enabled
    print('Making streaming request to AWS Bedrock...')
    response = requests.post(endpoint, data=request_parameters, headers=request_headers, stream=True)

    # Check the response status
    print(f"Response Code: {response.status_code}")

    if response.status_code != 200:
        print("Error in response:")
        print(response.text)
        return

    # Process the streaming response
    print("Streaming response:")

    buffer = b''  # Initialize an empty buffer

    for chunk in response.iter_content(chunk_size=None):
        if chunk:
            buffer += chunk  # Add the chunk to the buffer

            while True:
                try:
                    parsed = False
                    for headers, payload, message_length in parse_event_stream_incremental(buffer):
                        parsed = True
                        # Process the message
                        event_type = headers.get(':event-type')
                        if event_type == 'chunk':
                            # The payload is JSON
                            data = json.loads(payload)
                            # The 'bytes' field is base64-encoded
                            decoded_bytes = base64.b64decode(data['bytes'])
                            text = decoded_bytes.decode('utf-8')
                            print(text)  # Print the text as it comes
                        elif event_type == 'error':
                            # Handle errors
                            error_code = headers.get(':error-code')
                            error_message = headers.get(':error-message')
                            print(f"\nError {error_code}: {error_message}")
                            return
                        # Remove the parsed message from the buffer
                        buffer = buffer[message_length:]
                        break  # Process next message
                    if not parsed:
                        # No complete message could be parsed
                        break
                except ValueError as e:
                    # Incomplete message or parsing error
                    break  # Wait for more data

if __name__ == '__main__':
    main()


Making streaming request to AWS Bedrock...
Response Code: 200
Streaming response:
{"outputText":"\nDinosaurs are a group of reptiles that lived on Earth from 245 to 66 million years ago. They are known for their large size, powerful limbs, and distinctive features such as tails,","index":0,"totalOutputTextTokenCount":null,"completionReason":null,"inputTextTokenCount":9}
{"outputText":" horns, and feathers. Dinosaurs were the dominant species on Earth for millions of years, and they played a crucial role in shaping the planet's ecosystem.\n\nOne of the most famous dinosaurs is the Tyrannosaurus rex, which was the largest land animal to ever exist. It was about 40 f","index":0,"totalOutputTextTokenCount":null,"completionReason":null,"inputTextTokenCount":null}
{"outputText":"eet long and weighed up to 8 tons. The Tyrannosaurus had a powerful jaw and teeth that could bite through bone and flesh, and it was capable of running at speeds of up to 30 miles per hour. It was also a scavenger, f