## Handle asynchronous calls for Bedrock Models with concurrent calls

In [40]:
import os
import re
import sys
import json
import yaml
import time
import glob
import logging
import pandas as pd
from typing import Dict
from pathlib import Path
from tokenizer_utils import count_tokens
from bedrock_utils import get_bedrock_client
from utils import  (
    get_rouge_l_score,
    get_cosine_similarity,
    parse_model_response,
    is_amazon_model)

In [41]:
## Set the logger to log all of the information in
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)


In [42]:
## Initialize the config file to get all global constants
CONFIG_FILE_PATH = "config.yaml"


In [43]:
# read the config yaml file
fpath = CONFIG_FILE_PATH
with open(fpath, 'r') as yaml_in:
    config = yaml.safe_load(yaml_in)
logger.info(f"config read from {fpath} -> {json.dumps(config, indent=2)}")

[2024-01-12 19:35:57,781] p47653 {3034282685.py:5} INFO - config read from config.yaml -> {
  "app_name": "contact-center-transcript-summarization",
  "aws": {
    "region": "us-east-1",
    "sagemaker_execution_role": "Admin"
  },
  "dir": {
    "data": "data",
    "raw": "data/raw",
    "golden": "data/raw/golden",
    "prompts": "data/prompts",
    "models": "data/models",
    "metrics": "data/metrics",
    "completions": "data/completions",
    "async_completions": "data/async_completions"
  },
  "data": {
    "raw_data_file": "data.csv",
    "golden_transcript": "data/raw/golden/transcript.txt",
    "golden_transcript_summary": "data/raw/golden/summary.txt"
  },
  "prompt": {
    "very_large_prompt": {
      "sleep_time": 180,
      "threshold": 70000
    },
    "normal_prompt": {
      "sleep_time": 60
    }
  },
  "max_retries": 3,
  "desired_word_count_for_summary": 80,
  "experiments": [
    {
      "name": "single-line-reason",
      "prompt_template": null,
      "reps": 3,


In [44]:
## Initialize the bedrock client
bedrock_client = get_bedrock_client()

[2024-01-12 19:35:57,925] p47653 {credentials.py:1278} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Create new client
  Using region: None
boto3 Bedrock client successfully created!
bedrock-runtime(https://bedrock-runtime.us-east-1.amazonaws.com)


In [45]:
boto3_bedrock = get_bedrock_client(runtime=False)
fm_list_response = boto3_bedrock.list_foundation_models()
fm_list = fm_list_response['modelSummaries']
df_fm = pd.DataFrame(fm_list)
display(df_fm)


[2024-01-12 19:35:58,032] p47653 {credentials.py:1278} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Create new client
  Using region: None
boto3 Bedrock client successfully created!
bedrock(https://bedrock.us-east-1.amazonaws.com)


Unnamed: 0,modelArn,modelId,modelName,providerName,inputModalities,outputModalities,responseStreamingSupported,customizationsSupported,inferenceTypesSupported,modelLifecycle
0,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-tg1-large,Titan Text Large,Amazon,[TEXT],[TEXT],True,[],[ON_DEMAND],{'status': 'ACTIVE'}
1,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-e1t-medium,Titan Text Embeddings,Amazon,[TEXT],[EMBEDDING],,[],[ON_DEMAND],{'status': 'LEGACY'}
2,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-image-generator-v1:0,Titan Image Generator G1,Amazon,"[TEXT, IMAGE]",[IMAGE],,[FINE_TUNING],"[ON_DEMAND, PROVISIONED]",{'status': 'ACTIVE'}
3,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-image-generator-v1,Titan Image Generator G1,Amazon,"[TEXT, IMAGE]",[IMAGE],,[],[ON_DEMAND],{'status': 'ACTIVE'}
4,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-embed-g1-text-02,Titan Text Embeddings v2,Amazon,[TEXT],[EMBEDDING],,[],[ON_DEMAND],{'status': 'ACTIVE'}
5,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-text-lite-v1:0:4k,Titan Text G1 - Lite,Amazon,[TEXT],[TEXT],True,"[FINE_TUNING, CONTINUED_PRE_TRAINING]",[PROVISIONED],{'status': 'ACTIVE'}
6,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-text-lite-v1,Titan Text G1 - Lite,Amazon,[TEXT],[TEXT],True,[],[ON_DEMAND],{'status': 'ACTIVE'}
7,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-text-express-v1:0:8k,Titan Text G1 - Express,Amazon,[TEXT],[TEXT],True,"[FINE_TUNING, CONTINUED_PRE_TRAINING]",[PROVISIONED],{'status': 'ACTIVE'}
8,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-text-express-v1,Titan Text G1 - Express,Amazon,[TEXT],[TEXT],True,[],[ON_DEMAND],{'status': 'ACTIVE'}
9,arn:aws:bedrock:us-east-1::foundation-model/am...,amazon.titan-embed-text-v1:2:8k,Titan Embeddings G1 - Text,Amazon,[TEXT],[EMBEDDING],False,[],[PROVISIONED],{'status': 'ACTIVE'}


In [46]:
transcript_files = glob.glob(os.path.join(config['dir']['raw'], "*", "*transcript.txt"))
logger.info(f"found {len(transcript_files)} transcript_files ->\n{transcript_files}")

[2024-01-12 19:35:58,309] p47653 {3139140113.py:2} INFO - found 5 transcript_files ->
['data/raw/0/call_center_transcript_1_transcript.txt', 'data/raw/1/call_center_transcript_0_transcript.txt', 'data/raw/4/call_center_transcript_4_transcript.txt', 'data/raw/3/call_center_transcript_3_transcript.txt', 'data/raw/2/call_center_transcript_2_transcript.txt']


In [47]:
## Re load the utils file to make sure all functions are included from the utils class before importing them
import importlib
import utils
importlib.reload(utils)

<module 'utils' from '/Users/madhurpt/Downloads/bedrock-contact-center-tasks-eval-main-3/utils.py'>

In [48]:
from json import JSONEncoder
import re
from pathlib import Path
import os
import time
import utils
import json
from create_payload import model_payloads



## Create a payload referring to the model_payloads script
def create_payloads_for_all_models(transcript_files, config):

    ## initialize a payload dict
    all_payloads = []

    ## Iterate through all of the transcript files available
    for idx, tf in enumerate(transcript_files):
        transcript = Path(tf).read_text()
        fname = os.path.basename(tf)
        file_id = "_".join(fname.split('_')[:-1])

        ## Iterate through every experiment for each transcript file
        for experiment in config['experiments']:
            exp_name = experiment['name']
            model_list = experiment['model_list']
            
            ## Iterate through each model for each transcript file
            for model_info in model_list:
                model_name = model_info['model']
                model = config['bedrock_models'].get(model_name)

                if model is None:
                    logger.error(f"model={model_name} not found in bedrock_models")
                    continue
                
                # Use the imported create_payload function to generate the payload
                payload_dict = {
                    "model_id": model_name,
                    "payload": model_payloads(transcript, model_name)
                }
                all_payloads.append(payload_dict)

    return all_payloads

print("All model payloads per transcript per bedrock model ->")
create_payloads_for_all_models(transcript_files, config)



All model payloads per transcript per bedrock model ->


[{'model_id': 'amazon.titan-text-express-v1',
  'payload': {'inputText': "A: I wanted to discuss our strategy around generative AI and how we should approach this emerging technology. As you know, several applications have captured a lot of attention recently. \nAction item: Set up a follow-up meeting to brainstorm ideas for where generative AI could be applicable in our products\n\nB: Yes, generative AI is definitely a hot topic right now. All the major tech companies seem to be investing heavily in this space.\nAction item: Research current generative AI initiatives at other tech companies to analyze the competitive landscape\n\nA: Exactly. I think we need to have a plan here too or risk falling behind. What kind of applications do you see for generative AI in our products? Could it be used to automate certain processes or enhance our users' experience?\nAction item: Outline high-level ideas for where generative AI could drive automation or enhance user experience in our products\n\n

In [49]:
import asyncio
import json
import requests as req
import botocore.session
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from typing import Dict, List

SERVICE_NAME = 'bedrock'
region = 'us-east-1'

## Utilizing the bedrock REST API to get inference from each bedrock model
def get_inference(model_id: str, payload: Dict) -> Dict:
    try:
        endpoint = f"https://{SERVICE_NAME}-runtime.{region}.amazonaws.com/model/{model_id}/invoke"
        request_body = json.dumps(payload)

        request = AWSRequest(method='POST', url=endpoint, data=request_body, headers={'content-type': 'application/json'})
        session = botocore.session.Session()
        sigv4 = SigV4Auth(session.get_credentials(), SERVICE_NAME, region)
        sigv4.add_auth(request)
        prepped = request.prepare()

        response = req.post(prepped.url, headers=prepped.headers, data=request_body)
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error: Received status code {response.status_code}, Response: {response.text}")
            return None
    except Exception as e:
        print(f"Exception occurred: {e}")
        return None

## Utilize the async calls for creating a thread of model invocations
async def async_calls_on_model(model_id, payload):
    try:
        response = await asyncio.to_thread(get_inference, model_id, payload)
        return response
    except Exception as e:
        print(f"Error in async_calls_on_model: {e}")
        return None

async def async_invoke_model(model_name, payloads):
    responses = []
    for payload in payloads:
        response = await async_calls_on_model(model_name, payload)
        responses.append(response)
    return responses


In [50]:
import csv
import tokenizer_utils
from tokenizer_utils import count_tokens

## Filter the model payloads by the type of model id offering at bedrock for simpler calling
def filter_payloads_for_model(model_name, all_payloads):
    return [payload for payload in all_payloads if payload['model_id'] == model_name]

## Function to process a model through invoking it with transcripts through various concurrencies in config.yml
async def process_model(model_info, all_payloads, csv_writer, concurrency_level):
    model_id = model_info['model']
    ## filter the model by the id
    model_payloads = filter_payloads_for_model(model_id, all_payloads)

    ## loop through the conc levels and the model_payloads
    for i in range(0, len(model_payloads), concurrency_level):
        batch_payloads = [payload['payload'] for payload in model_payloads[i:i + concurrency_level]]
        print(f"Running {model_id} at concurrency level {concurrency_level}...")

        ## track metrics: latency
        start_time = time.time()
        responses = await async_invoke_model(model_id, batch_payloads)
        end_time = time.time()
        latency = (end_time - start_time)  # in seconds

        # Log and write each response to the CSV
        for j, response in enumerate(responses):
            csv_writer.writerow({
                'Model_id': model_id,
                'input token count': count_tokens(f'{batch_payloads[j]}'),
                'Latency (seconds)': latency,
                'Concurrency Level': concurrency_level,
                'call transcript': batch_payloads[j],
                'Response': json.dumps(response)
            })
            
            print(f"Response {j+1} at concurrency level {concurrency_level}: {response}")

        print(f"Latency: {latency} seconds\n")

## save to csv function
def init_csv_file(csv_file_path):
    with open(csv_file_path, 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['Model_id', 'input token count', 'Latency (seconds)', 'Concurrency Level', 'call transcript', 'Response'])
        writer.writeheader()

In [51]:
import time
## Function to process all of the models to invoke all transcripts one by one based on the concurrency level
async def process_all_models(all_payloads, config, csv_file_path):
    with open(csv_file_path, 'a', newline='') as file:
        csv_writer = csv.DictWriter(file, fieldnames=['Model_id', 'input token count', 'Latency (seconds)', 'Concurrency Level', 'call transcript', 'Response'])

        ## Loop through each concurrency level for each experiment and each model, and create tasks to call process model on the model id and payloads
        for concurrency_level in range(1, max(max(model_info['concurrency_metric']) for experiment in config['experiments'] for model_info in experiment['model_list']) + 1):
            tasks = []
            for experiment in config['experiments']:
                for model_info in experiment['model_list']:
                    if concurrency_level in model_info['concurrency_metric']:
                        task = asyncio.create_task(process_model(model_info, all_payloads, csv_writer, concurrency_level))
                        time.sleep(2)
                        tasks.append(task)
            await asyncio.gather(*tasks)

csv_file_path = 'async_bedrock_model_performance.csv'
init_csv_file(csv_file_path)

# Create payloads
all_payloads = create_payloads_for_all_models(transcript_files, config)

# Run the event loop and get results
loop = asyncio.get_event_loop()
if loop.is_running():
    task = asyncio.ensure_future(process_all_models(all_payloads, config, csv_file_path))
else:
    loop.run_until_complete(process_all_models(all_payloads, config, csv_file_path))