## Run Inference on all deployed endpoints: Various combinations of payloads, concurrency levels, model configurations

---

_This notebook works best with the conda_python3 kernel on a ml.t3.medium machine_.

#### This step of our solution design includes running inferences on all deployed model endpoints (with different configurations, concurrency levels and payload sizes). This notebook runs inferences in a manner that is calls endpoints concurrently and asychronously to generate responses and record metrics. Here are some of the key components:

- **Accessing the deployed endpoints**, creating a predictor object for these endpoints to call them during inference time.

- **Functions to define metrics**: This notebook sets stage for metrics to be recorded during the time of invocation of all these models for benchmarking purposes.

- **Running Actual Inferences**: Once the metrics are defined, we set a blocker function that is responsible for creating inference on a single payload called get_inference. We then run a series of asynchronous functions that can be viewed in the code (link above), to create asychronous inferefences on the deployed models. The way we send requests are by creating combinations: this means creating combinations of payloads of different sizes that can be viewed in the config.yml file, with different concurrency levels (in this case we first go through all patches of payloads with a concurrency level of 1, then 2, and then 4). You can set this to your desired value.


### Import all of the necessary libraries below to run this notebook


In [None]:
# if interactive mode is set to no -> pickup fmbench from Python installation path
# if interactive mode is set to yes -> pickup fmbench from the current path (one level above this notebook)
# if interactive mode is not defined -> pickup fmbench from the current path (one level above this notebook)
# the premise is that if run non-interactively then it can only be run through main.py which will set interactive mode to no
import os
import sys

if os.environ.get("INTERACTIVE_MODE_SET", "yes") == "yes":
    sys.path.append(os.path.dirname(os.getcwd()))


In [None]:
import glob
import time
import json
import io
import uuid
import copy
import boto3
import asyncio
import logging
import botocore
import itertools
import sagemaker
import numpy as np
import pandas as pd
import importlib.util
from pathlib import Path
from fmbench.utils import *
from fmbench.globals import *
from fmbench import defaults
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from transformers import AutoTokenizer
from sagemaker.predictor import Predictor
import importlib.resources as pkg_resources
from sagemaker.serializers import JSONSerializer
from typing import Dict, List, Optional, Tuple, Union
from fmbench.scripts.pricing import load_and_update_pricing
from fmbench.scripts.bedrock_predictor import BedrockPredictor


#### Pygmentize globals.py to view and use any of the globally initialized variables


In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Remove existing handlers
logger.handlers.clear()

# Add a simple handler
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)


#### Load the Config.yml file that contains information that is used across this benchmarking environment


In [None]:
config = load_main_config(CONFIG_FILE)
logger.info(json.dumps(config, indent=2))


#### Load the pricing config file


In [None]:
# represents getting the config file from the s3 bucket/https path for pricing yml information
pricing_file_path: str = config["pricing"]

# initialize the pricing config file to None
pricing_config: Optional[Dict] = None

# get the current config dir path
config_dir = Path(pkg_resources.files("fmbench"), "configs")
logger.info(f"Using fmbench.configs directory: {config_dir}")

pricing_module = Path(config["pricing"])
logger.info(
    f"pricing config provided for inference from this model is --> {pricing_module}"
)
pricing_file_path = os.path.join(config_dir, pricing_module)
logger.info(f"pricing config file path is --> {pricing_file_path}")

instance_list = [
    experiment.get("instance_type")
    for experiment in config.get("experiments", [])
    if experiment.get("instance_type")
]


# Print the extracted instance types
logger.info(f"Extracted instances from the main config --> {instance_list}")

pricing_config = load_and_update_pricing(
    pricing_file_path, PRICING_FALLBACK_YAML_PATH, instance_list
)
logger.info(f"pricing config file recorded: {json.dumps(pricing_config, indent=2)}")


In [None]:
date_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")


In [None]:
## getting access to the s3 bucket where endpoints.json for different models resides
s3_client = boto3.client("s3")


### Access the deployed model endpoints from the endpoints.json file


In [None]:
# Refer to the file path for the endpoint
# getting the endpoint as an s3 object from the deployed path
try:
    # First get the S3 object
    s3_object = get_s3_object(config["aws"]["bucket"], ENDPOINT_LIST_PATH)
    # Check if s3_object is None before attempting to load JSON
    if s3_object is None:
        logger.warning(
            f"Retrieved None object from bucket={config['aws']['bucket']}, key={ENDPOINT_LIST_PATH}. Using empty list for endpoints."
        )
        endpoint_info_list = []
    else:
        # Attempt to load JSON
        try:
            endpoint_info_list = json.loads(s3_object)
            logger.info(
                f"Found information for {len(endpoint_info_list)} endpoints in bucket={config['aws']['bucket']}, key={ENDPOINT_LIST_PATH}"
            )
            logger.info(json.dumps(endpoint_info_list, indent=2))
        except (TypeError, json.JSONDecodeError) as json_error:
            logger.warning(
                f"Failed to parse JSON from S3 object: {json_error}. Using empty list for endpoints."
            )
            endpoint_info_list = []

except FileNotFoundError:
    logger.warning(
        f"Key {ENDPOINT_LIST_PATH} not found in bucket {config['aws']['bucket']}. Using an empty list for endpoints for bedrock models/bring your own externalized endpoints."
    )
    endpoint_info_list = []
except botocore.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == "NoSuchKey":
        logger.warning(
            f"Key {ENDPOINT_LIST_PATH} not found in bucket {config['aws']['bucket']}. Using an empty list for endpoints for bedrock models/bring your own externalized endpoints."
        )
        endpoint_info_list = []
    # raise an error if the model is not deployed as an endpoint via sagemaker/eks or bedrock
    else:
        raise e

In [None]:
# List down the endpoint names that have been deployed
endpoint_name_list = [e["endpoint"]["EndpointName"] for e in endpoint_info_list]

# endpoint information
logger.info(
    f"there are {len(endpoint_name_list)} deployed endpoint(s), endpoint_name_list->{endpoint_name_list}"
)


### Creating functions to define and calculate metrics during the time of invocations


In [None]:
def safe_sum(vals: List) -> Union[int, float]:
    return sum(filter(None, vals))


def safe_div(n: Union[int, float], d: Union[int, float]) -> Optional[Union[int, float]]:
    return n / d if d else None


def stat_summaries(
    responses: List[Dict], metric_name: str, successes: int
) -> Tuple[float]:
    metric_vals = [r[metric_name] for r in responses]
    metric_vals_not_none = list(filter(None, metric_vals))
    if metric_vals_not_none != []:
        metric_mean = safe_div(safe_sum(metric_vals_not_none), successes)
        metric_p50, metric_p95, metric_p99 = np.percentile(
            metric_vals_not_none, [50, 95, 99]
        )
    else:
        metric_p50, metric_p95, metric_p99, metric_mean = None, None, None, None
    return metric_p50, metric_p95, metric_p99, metric_mean


# Represents the function to calculate all of the metrics at the time of inference
def calculate_metrics(
    responses, chunk, elapsed_async, experiment_name, concurrency, payload_file
) -> Dict:

    # calculate errors based on the completion status of the inference prompt
    errors = [
        r
        for r in responses
        if r["completion"] is None or r["completion_tokens"] is None
    ]

    # Calculate the difference as the successes
    successes = len(chunk) - len(errors)

    # Count all of the prompts token count during inference
    all_prompts_token_count = safe_sum([r["prompt_tokens"] for r in responses])
    prompt_token_throughput = round(all_prompts_token_count / elapsed_async, 2)
    prompt_token_count_mean = safe_div(all_prompts_token_count, successes)
    all_completions_token_count = safe_sum([r["completion_tokens"] for r in responses])
    completion_token_throughput = round(all_completions_token_count / elapsed_async, 2)
    completion_token_count_mean = safe_div(all_completions_token_count, successes)
    transactions_per_second = round(successes / elapsed_async, 2)
    transactions_per_minute = int(transactions_per_second * 60)

    # calculate the latency, TTFT, TTLT and TPOT mean utilizing the safe_sum function defined above
    latency_p50, latency_p95, latency_p99, latency_mean = stat_summaries(
        responses, "latency", successes
    )

    ttft_p50, ttft_p95, ttft_p99, ttft_mean = stat_summaries(
        responses, "time_to_first_token", successes
    )

    tpot_p50, tpot_p95, tpot_p99, tpot_mean = stat_summaries(
        responses, "time_per_output_token", successes
    )

    ttlt_p50, ttlt_p95, ttlt_p99, ttlt_mean = stat_summaries(
        responses, "time_to_last_token", successes
    )

    # Function returns all these values at the time of the invocations
    return {
        "experiment_name": experiment_name,
        "concurrency": concurrency,
        "payload_file": payload_file,
        "errors": errors,
        "successes": successes,
        "error_rate": len(errors) / len(chunk),
        "all_prompts_token_count": all_prompts_token_count,
        "prompt_token_count_mean": prompt_token_count_mean,
        "prompt_token_throughput": prompt_token_throughput,
        "all_completions_token_count": all_completions_token_count,
        "completion_token_count_mean": completion_token_count_mean,
        "completion_token_throughput": completion_token_throughput,
        "transactions": len(chunk),
        "transactions_per_second": transactions_per_second,
        "transactions_per_minute": transactions_per_minute,
        #'latency_mean': latency_mean,
        "latency_p50": latency_p50,
        "latency_p95": latency_p95,
        "latency_p99": latency_p99,
        #'TTFT_mean': ttft_mean,
        "TTFT_p50": ttft_p50,
        #'TTFT_p95': ttft_p95,
        "TTFT_p99": ttft_p99,
        #'TPOT_mean': tpot_mean,
        "TPOT_p50": tpot_p50,
        #'TPOT_p95': tpot_p95,
        "TPOT_p99": tpot_p99,
    }


### Set a blocking function and a series of asynchronous concurrent model prompt invocations


In [None]:
def set_metrics(
    endpoint_name=None,
    prompt=None,
    ground_truth=None,
    base64_img=None,
    question=None,
    payload_file=None,
    inference_params=None,
    completion=None,
    prompt_tokens=None,
    completion_tokens=None,
    latency=None,
    time_to_first_token=None,
    time_per_output_token=None,
    time_to_last_token=None,
    uuid=None,
) -> Dict:
    return dict(
        endpoint_name=endpoint_name,
        prompt=prompt,
        question=question,
        ground_truth=ground_truth,
        base64_img=base64_img,
        payload_file=payload_file,
        **inference_params,
        completion=completion,
        prompt_tokens=prompt_tokens,
        completion_tokens=completion_tokens,
        latency=latency,
        time_to_first_token=time_to_first_token,
        time_per_output_token=time_per_output_token,
        time_to_last_token=time_to_last_token,
        uuid=uuid,
    )


# function to get inference
def get_inference(predictor, payload, payload_file) -> Dict:
    try:
        # get inference
        request_uuid = uuid.uuid4().hex
        logger.info(f"get_inference, sending request with uuid={request_uuid}")
        resp = predictor.get_prediction(payload)
        logger.info(f"get_inference, response for uuid={request_uuid}, resp={resp}")
        # handle the case when ground truth is either provided or not provided as a
        # part of the dataset
        ground_truth = payload.get("ground_truth", None)
        base64_img = payload.get("base64_img", None)
        question = payload.get("question", None)
        if ground_truth is not None:
            if isinstance(ground_truth, list):
                ground_truth = ",".join(ground_truth)
        else:
            ground_truth = None
        # Set metrics and logging for both cases
        generated_text = (
            resp["response_json"].get("generated_text", "")
            if resp.get("response_json") is not None
            else ""
        )
        response = set_metrics(
            predictor.endpoint_name,
            payload["inputs"],
            # the open source long bench dataset has ground truth responses in a list. In this case,
            # use all of the elements in the list as a source of ground truth and check for whether the
            # answer matches any. If the ground truth is not a list, then assuming it being a string, we
            # use the string as the ground truth
            ground_truth,
            base64_img,
            question,
            payload_file,
            predictor.inference_parameters,
            generated_text,
            resp["prompt_tokens"],
            resp["completion_tokens"],
            resp["latency"],
            resp["time_to_first_token"],
            resp["time_per_output_token"],
            resp["time_to_last_token"],
            request_uuid,
        )

        # log the output of the prediction
        logger.info(
            f"get_inference, done, uuid={request_uuid}, endpoint={predictor.endpoint_name}, "
            f"prompt_tokens={resp['prompt_tokens']}, completion_tokens={resp['completion_tokens']}, "
            f"latency={resp['latency']}"
        )

    except Exception as e:
        print(
            f"get_inference, uuid={request_uuid}, error occurred with {predictor.endpoint_name}, exception={str(e)}"
        )
        response = set_metrics(
            predictor.endpoint_name,
            payload["inputs"],
            predictor.inference_parameters,
            None,
            None,
            None,
            None,
            None,
            None,
            None,
            None,
            None,
            None,
            None,
            None,
        )
    return response


### Setting a series of asynchronous functions to invoke and run inferences concurrently and asynchronously


In [None]:
# Represents a function to start invoking models in separate thread asynchronously
# for the blocker function
async def async_get_inference(predictor, payload: Dict, payload_file: str) -> Dict:
    return await asyncio.to_thread(get_inference, predictor, payload, payload_file)


# Gathers all of the tasks and sets of the concurrent calling of the asychronous
# invocations
async def async_get_all_inferences(
    predictor, payload_list: List, payload_file: str
) -> List:
    logger.info(f"async_get_all_inferences, length of payload_list={len(payload_list)}")
    return await asyncio.gather(
        *[
            async_get_inference(predictor, payload, payload_file)
            for payload in payload_list
        ]
    )


In [None]:
from concurrent.futures import ThreadPoolExecutor


# This function runs the asynchronous function series above together
# for different experiments and concurrency levels.
async def run_inferences(
    predictor: sagemaker.base_predictor.Predictor,
    chunk: List,
    experiment: Dict,
    concurrency: int,
    payload_file: str,
) -> Tuple[List, Dict]:
    logger.info(f"processing chunk with concurrency={concurrency}")
    s = time.perf_counter()
    responses = await async_get_all_inferences(predictor, chunk, payload_file)
    elapsed_async = time.perf_counter() - s

    # Add more metadata about this experiment
    for r in responses:
        r["experiment_name"] = experiment["name"]
        r["concurrency"] = concurrency

    metrics = calculate_metrics(
        responses, chunk, elapsed_async, experiment["name"], concurrency, payload_file
    )
    return responses, metrics


In [None]:
from fmbench.scripts.fmbench_predictor import FMBenchPredictor


# Function to create the predictors from the experiment we are iterating over
def create_predictor_for_experiment(
    experiment: Dict, config: Dict, endpoint_info_list: List
) -> Optional[FMBenchPredictor]:
    # initialize inference spec to none
    inference_spec = None
    ep_info = [
        e for e in endpoint_info_list if e["experiment_name"] == experiment["name"]
    ]
    logger.info(f"endpoint info found is: {ep_info}")

    # if the endpoint info list is not empty, the deployed model
    # is a sagemaker endpoint and contains the endpoint config as a dict
    if ep_info != []:
        # get the endpoint name from the dict created
        ep_name = ep_info[0]["endpoint"]["EndpointName"]

        # get the inference spec based on the model type
        inference_spec = experiment.get("inference_spec")
        logger.info(
            f"experiment name={experiment['name']}, ep_name={ep_name}, "
            f"inference_spec={inference_spec}"
        )
    # case to handle if the model is not a sagemaker endpoint and coming from
    # the bedrock predictor file
    else:
        ep_name = experiment["ep_name"]
        # re initializing the inference spec in the case of bring your own endpoint/rest predictor for access to inference parameters
        inference_spec = experiment.get("inference_spec")
        logger.info(
            f"seems like an external endpoint, no endpoint info found, going to use "
            f"ep_name={ep_name}"
        )

    # create predictor objects
    # Proceed with deployment as before
    # Assuming fmbench is a valid Python package and scripts is a subdirectory within it
    scripts_dir = Path(pkg_resources.files("fmbench"), "scripts")
    logger.info(f"Using fmbench.scripts directory: {scripts_dir}")

    # Ensure the scripts directory exists
    scripts_dir.mkdir(parents=True, exist_ok=True)
    module_name = Path(experiment["inference_script"]).stem
    logger.info(f"script provided for inference from this model={module_name}")
    script_path = scripts_dir / f"{module_name}.py"
    logger.info(f"script path={script_path}")

    # Check and proceed with local script
    if not script_path.exists():
        logger.error(f"script {script_path} not found.")
        return None

    logger.info(f"deploying using local code: {script_path}")

    spec = importlib.util.spec_from_file_location(module_name, str(script_path))
    inference_module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = inference_module
    spec.loader.exec_module(inference_module)
    # create a predictor from each endpoint in experiments
    metadata: Optional[Dict] = None
    if ep_info != []:
        if ep_info[0].get("endpoint"):
            production_variants = ep_info[0].get("endpoint").get("ProductionVariants")
            if production_variants is not None:
                variant_name = production_variants[0].get("VariantName")
                metadata = dict(variant_name=variant_name)
                logger.info(f"ep_name={ep_name}, variant_name={variant_name}")
        use_messages_api_format = experiment.get("use_messages_api_format")
        if use_messages_api_format:
            if metadata is None:
                metadata = dict(use_messages_api_format=use_messages_api_format)
            else:
                metadata["use_messages_api_format"] = use_messages_api_format
    logger.info(f"ep_name={ep_name}, metadata={metadata}")
    return inference_module.create_predictor(ep_name, inference_spec, metadata)


In [None]:
# Here, we will process combinations of concurrency levels, the payload files
# and then loop through the different combinations to make payloads splitted
# in terms of the concurrency metric and how we can run it and make inference


def create_payload_dict(jline: str, experiment: Dict) -> Dict:
    payload: Dict = json.loads(jline)
    return payload


def create_combinations(experiment: Dict) -> List[Tuple]:
    combinations_data = []

    # Repeat for each concurrency level
    combinations = list(
        itertools.product(experiment["concurrency_levels"], experiment["payload_files"])
    )
    logger.info(f"there are {len(combinations)} combinations of {combinations} to run")

    MAX_REQ_CNT = config["datasets"].get(
        "max_iters_per_combination", defaults.MAX_REQ_CNT
    )
    MIN_REQ_CNT = config["datasets"].get(
        "min_iters_per_combination", defaults.MIN_REQ_CNT
    )
    logger.info(
        f"setting the max iterations per combination to: {MAX_REQ_CNT}, min iterations per combination to: {MIN_REQ_CNT}"
    )

    for concurrency, payload_file in combinations:
        # Construct the full S3 file path
        s3_file_path = f"{PROMPTS_DIR}/{config['s3_read_data']['source_data_prefix']}/{payload_file}"
        logger.info(
            f"s3 path where the payload files are being read from -> {s3_file_path}"
        )

        # Read the payload file from S3
        try:
            # response = s3_client.get_object(Bucket=config['aws']['bucket'], Key=s3_file_path)
            # payload_file_content = response['Body'].read().decode('utf-8')
            payload_file_content = get_s3_object(
                bucket=config["aws"]["bucket"], key=s3_file_path
            )

            # Create a payload list by processing each line
            payload_list = [
                create_payload_dict(jline, experiment)
                for jline in payload_file_content.splitlines()
            ]

            fp: str = f"s3://{config['aws']['bucket']}/{s3_file_path}"
            logger.info(f"read from {fp}, contains {len(payload_list)} lines")

        except Exception as e:
            logger.error(f"Error reading file from S3: {e}")
            continue

        logger.info(
            f"creating combinations for concurrency={concurrency}, \
                      payload_file={payload_file}, payload_list length={len(payload_list)}"
        )

        n = concurrency

        pl_list_len = len(payload_list)
        if pl_list_len < n:
            nr_elements_to_add = n - pl_list_len

            for i in range(nr_elements_to_add):
                payload_list.append(payload_list[i % pl_list_len])

        # Split the original list into sublists which contain the number of
        # requests we want to send concurrently
        payload_list_splitted = [
            payload_list[i * n : (i + 1) * n]
            for i in range((len(payload_list) + n - 1) // n)
        ]

        for p in payload_list_splitted:
            p_ori_len = len(p)
            if p_ori_len < n:
                elements_to_add = n - p_ori_len
                for i in range(elements_to_add):
                    p.append(p[i % p_ori_len])

        # Only keep lists that have at least concurrency number of elements
        len_before = len(payload_list_splitted)
        payload_list_splitted = [
            p for p in payload_list_splitted if len(p) == concurrency
        ]

        payload_list_splitted_len = len(payload_list_splitted)

        if payload_list_splitted_len > MAX_REQ_CNT:
            payload_list_splitted = payload_list_splitted[0:MAX_REQ_CNT]

        if payload_list_splitted_len < MIN_REQ_CNT:
            nr_list_element_to_add = MIN_REQ_CNT - payload_list_splitted_len

            for i in range(nr_list_element_to_add):
                payload_list_splitted.append(
                    payload_list_splitted[i % payload_list_splitted_len]
                )

        logger.info(
            f"after only retaining chunks of length {concurrency}, "
            f"we have {len(payload_list_splitted)} chunks, previously we had {len_before} chunks"
        )
        combinations_data.append((concurrency, payload_file, payload_list_splitted))
    logger.info(f"there are {len(combinations)} for {experiment}")
    return combinations_data


In [None]:
# for each experiment
#   - for each endpoint and concurrency in an experiment


def clear_dir(dir_path: str):
    files = glob.glob(os.path.join(dir_path, "*"))
    for f in files:
        os.remove(f)


_ = list(map(clear_dir, [METRICS_PER_INFERENCE_DIR, METRICS_PER_CHUNK_DIR]))

# Initializing the experiment run cost to 0
exp_cost: Optional[float] = None

# To keep track of the experiment durations and the time it takes for
# the model endpoint to be in service to calculate cost association
experiment_durations: List[float] = []

# start the timer before the start of inferences
current_time = datetime.now(timezone.utc)
logger.info(f"current time recorded while running this experiment is {current_time} ")

num_experiments: int = len(config["experiments"])

# dataframe list to hold metrics for each endpoint
df_ep_metrics_list = []
# list for holding predictors and run start and end timestamp
# because cloud watch metrics are available after a 1-minute delay
predictors_and_metrics_timestamp_list = []
all_responses_list: List[Dict] = []
for e_idx, experiment in enumerate(config["experiments"]):
    # Start timer for the experiment
    experiment_start_time = time.perf_counter()
    predictor = create_predictor_for_experiment(experiment, config, endpoint_info_list)

    if predictor is None:
        logger.error(
            f"predictor could not be created for experiment={experiment}, moving to next..."
        )
        continue

    combination_data = create_combinations(experiment)

    prompt_tokens_total: int = 0
    completion_tokens_total: int = 0
    for concurrency, payload_file, split_payload in combination_data:
        # track time at minute boundaries
        experiment_at_concurrency_start_dttm = datetime.utcnow().replace(
            second=0, microsecond=0
        )
        for chunk_index, chunk in enumerate(split_payload):
            logger.info(
                f"experiment_index={e_idx+1}/{num_experiments}, "
                f"concurrency={concurrency}, payload_file={payload_file}, "
                f"chunk_index={chunk_index+1}/{len(split_payload)}"
            )

            # set concurrency level in Python asyncio so that the number of threads
            # is set to same as concurrency level otherwise number of threads defaults
            # to number of processors*5 (see
            # https://stackoverflow.com/questions/75885213/how-to-increase-asyncio-thread-limits-in-an-existing-co-routine)
            loop = asyncio.get_running_loop()
            loop.set_default_executor(ThreadPoolExecutor(max_workers=concurrency))
            responses, metrics = await run_inferences(
                predictor, chunk, experiment, concurrency, payload_file
            )
            if metrics:
                logger.info(f"metrics={json.dumps(metrics, indent=2, default=str)}")
                prompt_tokens_total += metrics.get("all_prompts_token_count", 0)
                completion_tokens_total += metrics.get("all_completions_token_count", 0)
                metrics_json = json.dumps(metrics, indent=2)
                metrics_file_name = f"{time.time()}.json"
                metrics_s3_path = os.path.join(METRICS_PER_CHUNK_DIR, metrics_file_name)
                write_to_s3(
                    metrics_json,
                    config["aws"]["bucket"],
                    "",
                    METRICS_PER_CHUNK_DIR,
                    metrics_file_name,
                )

            if responses:
                save_s3_list = []
                all_responses_list.extend(responses)

                for r in responses:
                    response_json = json.dumps(r, indent=2)
                    response_file_name = f"{time.time()}.json"
                    response_s3_path = os.path.join(
                        METRICS_PER_INFERENCE_DIR, response_file_name
                    )
                    save_s3_list.append(
                        (
                            response_json,
                            config["aws"]["bucket"],
                            "",
                            METRICS_PER_INFERENCE_DIR,
                            response_file_name,
                        )
                    )
                write_multiple_to_s3(save_s3_list)

        # save endpoint metrics
        experiment_at_concurrency_end_dttm = datetime.utcnow().replace(
            second=0, microsecond=0
        )
        # if the endtime and start time are in the same minute then move the endtime to the next
        # minute otherwise cloudwatch would return an empty resonse
        time_delta_in_seconds = (
            experiment_at_concurrency_end_dttm - experiment_at_concurrency_start_dttm
        ).seconds
        if time_delta_in_seconds < 60:
            experiment_at_concurrency_end_dttm += timedelta(seconds=60)

        predictors_and_metrics_timestamp_list.append(
            (
                predictor,
                experiment_at_concurrency_start_dttm,
                experiment_at_concurrency_end_dttm,
                concurrency,
                experiment["instance_type"],
            )
        )

    # Experiment done, stopping the timer for this given experiment
    experiment_end_time = time.perf_counter()

    # calculating the duration of this given endpoint inference time
    experiment_duration = experiment_end_time - experiment_start_time
    logger.info(f"the {experiment['name']} ran for {experiment_duration} seconds")

    # calculating the per second cost for this instance type
    exp_instance_type: str = experiment["instance_type"]

    # cost for this given exp
    logger.info(f"metrics json is: {metrics}")

    # calculate the cost of run for both sagemaker and external models
    # use the pricing config file here to get the pricing
    exp_cost = predictor.calculate_cost(
        exp_instance_type,
        experiment.get("instance_count"),
        pricing_config,
        experiment_duration,
        prompt_tokens_total,
        completion_tokens_total,
    )
    logger.info(
        f"the cost for running {experiment['name']} running on "
        f"{exp_instance_type} for {experiment_duration}s is ${exp_cost}"
    )

    experiment_durations.append(
        {
            "experiment_name": experiment["name"],
            "instance_type": exp_instance_type,
            "instance_count": experiment.get("instance_count"),
            "duration_in_seconds": f"{experiment_duration:.2f}",
            "cost": f"{exp_cost:.6f}",
        }
    )

    logger.info(
        f"experiment={e_idx+1}/{num_experiments}, name={experiment['name']}, "
        f"duration={experiment_duration:.6f} seconds, exp_cost={exp_cost:.6f}, done"
    )

    # cleanup the endpoint if configured
    run_steps = config["run_steps"]
    cleanup = any([run_steps[s] for s in run_steps if "cleanup" in s])
    if cleanup is True:
        logger.info(f"going to attempt cleanup the endpoint")
        predictor.shutdown()
    else:
        logger.info(f"cleanup is set to false, not deleting endpoints at this time")


In [None]:
# add a 1-minute sleep to be able to grab the CW metrics from the last run
sleep_time: int = 60
logger.info(
    f"going to sleep for {sleep_time}s before querying metrics from the endpoint"
)
time.sleep(sleep_time)
logger.info(f"after sleep for {sleep_time}s before querying metrics from the endpoint")

for (
    predictor,
    experiment_at_concurrency_start_dttm,
    experiment_at_concurrency_end_dttm,
    concurrency,
    instance_type,
) in predictors_and_metrics_timestamp_list:
    # save endpoint metrics
    df_ep_metrics = predictor.get_metrics(
        experiment_at_concurrency_start_dttm, experiment_at_concurrency_end_dttm
    )
    if df_ep_metrics is not None:
        # Check if 'EndpointName' exists - this is valid for sagemaker utilization metrics
        if SAGEMAKER_EP_NAME_COL in df_ep_metrics.columns:
            # we want concurrency after timestamp, endpoint name
            df_ep_metrics.insert(loc=2,
                                column='instance_type',
                                value=instance_type)
            df_ep_metrics.insert(loc=3,
                                column='concurrency',
                                value=concurrency)
            df_ep_metrics_list.append(df_ep_metrics)
        # for measuring other utilization metrics (EC2)
        else:
            logger.info(f"The metrics utilization metrics are from a non-sagemaker deployed model. Recording the metrics into the report.")
            df_ep_metrics_list.append(df_ep_metrics)


In [None]:
# after all experiments are done, concatenate all the per experiment metrics and save to a single dataframe
df_all_ep_metrics: Optional[pd.DataFrame] = None
if len(df_ep_metrics_list) > 0:
    df_all_ep_metrics = pd.concat(df_ep_metrics_list)
    csv_ep_metrics = io.StringIO()
    df_all_ep_metrics.to_csv(csv_ep_metrics, index=False)
    csv_ep_metrics = csv_ep_metrics.getvalue()
    logger.info(f"shape of all df_all_ep_metrics is {df_all_ep_metrics.shape}")
    logger.info(df_all_ep_metrics.head())
    write_to_s3(
        csv_ep_metrics, config["aws"]["bucket"], "", METRICS_DIR, ENDPOINT_METRICS_FNAME
    )
    fpath: str = (
        f"s3://{config['aws']['bucket']}/{METRICS_DIR}/{ENDPOINT_METRICS_FNAME}"
    )
    logger.info(f"all endpoint metrics saved to {fpath}")
else:
    logger.error(f"length of df_ep_metrics is {df_ep_metrics}, nothing to save")


In [None]:
if df_all_ep_metrics is not None:
    # This check is only for endpoint metrics for SageMaker
    if SAGEMAKER_EP_NAME_COL in df_all_ep_metrics.columns:
        # we want to delete the first row of each endpoint metric because it may not 
        # have started at a minute boundary and therefore would be an entry for an
        # incomplete minute , unless, it is the only entry for that endpoint in which
        # case we have no choice but leave it as is
        groups = df_all_ep_metrics.groupby(SAGEMAKER_EP_NAME_COL).filter(lambda x: len(x) > 1)

        # mark the first row of these groups
        groups['is_first'] = groups.groupby(SAGEMAKER_EP_NAME_COL).cumcount() == 0

        # filter out the first row of these groups
        logger.info(f"shape of df_ep_metrics before removing first row for each endpoint = {df_all_ep_metrics.shape}") 
        df_ep_metrics = df_all_ep_metrics[~df_all_ep_metrics.index.isin(groups[groups['is_first']].index)]
        logger.info(f"shape of df_ep_metrics after removing first row for each endpoint = {df_all_ep_metrics.shape}")


In [None]:
if df_all_ep_metrics is not None:
    if SAGEMAKER_EP_NAME_COL in df_all_ep_metrics.columns:
        # Convert ModelLatency from microseconds to milliseconds
        df_all_ep_metrics['ModelLatency'] = df_all_ep_metrics['ModelLatency'] / 1000

        # Define the metric columns for aggregation
        aggregate_metric_cols = {
            "CPUUtilization": "mean",
            "DiskUtilization": "mean",
            "GPUMemoryUtilization": "mean",
            "GPUUtilization": "mean",
            "MemoryUtilization": "mean",
            "ModelLatency": "mean",
            "InvocationsPerInstance": "sum",
            "Invocations": "sum",
            "Invocation5XXErrors": "sum",
            "Invocation4XXErrors": "sum"
        }

        # Filter out columns not present in the DataFrame
        existing_columns = {col: func for col, func in aggregate_metric_cols.items() if col in df_all_ep_metrics.columns}
        logger.info(f"Found the following metric columns: {existing_columns}, going to summarize the results.")

        # Summarize for each endpoint and concurrency level
        df_ep_metrics_summarized = df_all_ep_metrics.groupby(
            by=["EndpointName", "instance_type", "concurrency"]
        ).agg(existing_columns).reset_index()

        logger.info(f"df_ep_metrics_summarized = {df_ep_metrics_summarized}")
        csv_ep_metrics = io.StringIO()
        df_ep_metrics_summarized.to_csv(csv_ep_metrics, index=False)
        csv_ep_metrics = csv_ep_metrics.getvalue()
        logger.info(f"shape of all df_ep_metrics_summarized is {df_ep_metrics_summarized.shape}")
        logger.info(df_ep_metrics_summarized.head())
    else:
        # If "EndpointName" is not present, perform analytics on the DataFrame.
        logger.info("EndpointName column not found, performing analytics on the DataFrame.")
        # For numeric columns, calculate min, 25th, 50th, 75th percentiles, and max.
        numeric_cols = df_all_ep_metrics.select_dtypes(include='number').columns
        quantile_df = df_all_ep_metrics[numeric_cols].quantile(UTILIZATION_QUANTILE_METRICS)
        UTILIZATION_PERCENT_METRICS = [f"{int(q * 100)}%" for q in globals.UTILIZATION_QUANTILE_METRICS]
        quantile_df.index = UTILIZATION_PERCENT_METRICS
        quantile_df.index.name = 'percentile'
        # Mapping dictionary
        quantile_df_column_mapping = {
            "percentile": "Percentile",
            "cpu_percent_mean": "CPU Utilization %",
            "memory_percent_mean": "Memory Utilization %",
            "memory_used_mean": "Memory Used (GB)",
            "gpu_utilization_mean": "GPU Utilization %",
            "gpu_memory_used_mean": "GPU Memory Used (GB)",
            "gpu_memory_free_mean": "GPU Memory Free (GB)",
            "gpu_memory_total_mean": "GPU Memory Total (GB)"
        }

        # Rename columns in the existing DataFrame
        quantile_df.rename(columns=quantile_df_column_mapping, inplace=True)
        csv_ep_metrics = io.StringIO()
        quantile_df.to_csv(csv_ep_metrics)
        csv_ep_metrics = csv_ep_metrics.getvalue()

    write_to_s3(csv_ep_metrics,
                config['aws']['bucket'],
                "",
                METRICS_DIR,
                ENDPOINT_METRICS_SUMMARIZED_FNAME)
    fpath: str = f"s3://{config['aws']['bucket']}/{METRICS_DIR}/{ENDPOINT_METRICS_SUMMARIZED_FNAME}"
    logger.info(f"all endpoint metrics summarized saved to {fpath}")


In [None]:
# After all experiments are done, summarize and optionally save experiment durations along with costs
df_durations = pd.DataFrame(experiment_durations)
logger.info(f"experiment durations: {df_durations}")

# Convert the DataFrame to CSV and write it to S3 or wherever you prefer
csv_buffer_cost = io.StringIO()
df_durations.to_csv(csv_buffer_cost, index=False)
experiment_associated_cost = csv_buffer_cost.getvalue()

# Assuming write_to_s3() is already defined and configured correctly
write_to_s3(
    experiment_associated_cost,
    config["aws"]["bucket"],
    "",
    METRICS_DIR,
    SUMMARY_MODEL_ENDPOINT_COST_PER_INSTANCE,
)
fpath: str = (
    f"s3://{config['aws']['bucket']}/{METRICS_DIR}/{SUMMARY_MODEL_ENDPOINT_COST_PER_INSTANCE}"
)
logger.info(f"summary for cost of instance per endpoint per run saved to {fpath}")

logger.info(f"total cost of all experiments: ${sum(df_durations.cost.astype(float))}")


In [None]:
# # List .json files in the specified S3 directory
# s3_files = list_s3_files(config['aws']['bucket'], METRICS_PER_INFERENCE_DIR)

# # Read and parse each JSON file from S3
# json_list = list(map(lambda key: json.loads(get_s3_object(config['aws']['bucket'], key)), \
#                      s3_files))


# Create DataFrame
df_responses = pd.DataFrame(all_responses_list)
logger.info(f"created dataframe of shape {df_responses.shape} from all responses")
df_responses.head()


In [None]:
# List .json files in the specified S3 directory
s3_files = list_s3_files(config["aws"]["bucket"], METRICS_PER_CHUNK_DIR)

# Read and parse each JSON file from S3
json_list = list(
    map(lambda key: json.loads(get_s3_object(config["aws"]["bucket"], key)), s3_files)
)

# Create DataFrame
df_metrics = pd.DataFrame(json_list)
logger.info(f"created dataframe of shape {df_metrics.shape} from all responses")
df_metrics.head()


In [None]:
df_experiments = pd.json_normalize(config["experiments"])
df_experiments = df_experiments.rename(columns={"name": "experiment_name"})
if "deploy" not in df_experiments.columns:
    logger.info(
        "deploy not setup for any experiment, setting it to None for all experiments"
    )
    df_experiments["deploy"] = None

df_experiments_skip_deploy = df_experiments[df_experiments.deploy != True]
logger.info(
    f"df_experiments shape={df_experiments.shape}, "
    f"df_experiments_skip_deploy shape={df_experiments_skip_deploy.shape}"
)
logger.info(df_experiments_skip_deploy.head())
logger.info(
    f"df_experiments_skip_deploy={df_experiments_skip_deploy[['experiment_name', 'instance_type']]}"
)


In [None]:
# if the endpoint list contains elements, utilize the sagemaker endpoint configuration properties
if endpoint_info_list:
    df_endpoints = pd.json_normalize(endpoint_info_list)
    if (
        "endpoint_config.ProductionVariants" in df_endpoints.columns
        and df_endpoints["endpoint_config.ProductionVariants"].notna().all()
    ):
        df_endpoints["instance_type"] = df_endpoints[
            "endpoint_config.ProductionVariants"
        ].map(lambda x: x[0]["InstanceType"])
        df_endpoints["instance_count"] = df_endpoints[
            "endpoint_config.ProductionVariants"
        ].map(lambda x: x[0]["InitialInstanceCount"])
    else:
        df_endpoints["instance_type"] = df_endpoints["instance_type"]
        df_endpoints["instance_count"] = df_endpoints["instance_count"]
    cols_for_env = [c for c in df_endpoints.columns if "Environment" in c]
    logger.info(f"cols_for_env={cols_for_env}")
    cols_of_interest = [
        "experiment_name",
        "instance_type",
        "instance_count",
        "endpoint.EndpointName",
        "model_config.ModelName",
        "model_config.PrimaryContainer.Image",
        "model_config.PrimaryContainer.ModelDataSource.S3DataSource.S3Uri",
    ]
    cols_of_interest.extend(cols_for_env)

    cols_of_interest = [c for c in cols_of_interest if c in df_endpoints.columns]
    df_endpoints = df_endpoints[cols_of_interest]
    cols_of_interest_renamed = [c.split(".")[-1] for c in cols_of_interest]
    df_endpoints.columns = cols_of_interest_renamed

    # Check if 'experiment_name' column exists in both DataFrames
    logger.info(f"columns in df_responses: {df_responses.columns}")

# if the endpoint list is empty, create columns specific to the bedrock/other supported
# models, which includes the name of the endpoint, experiment name, model name, etc
else:
    # Create an empty DataFrame with the desired columns
    logger.info("the endpoint_info_list is empty, creating an empty dataframe")
    df_endpoints = pd.DataFrame(
        columns=[
            "experiment_name",
            "instance_type",
            "instance_count",
            "EndpointName",
            "ModelName",
            "Image",
            "S3Uri",
        ]
    )

# at this point we have either a df_endpoints dataframe filled with SageMaker endpoint info
# or an empty dataframe if none of the experiments deployed any models on an endpoint
# for example if we were using Bedrock...so we now want to add any missing experiment and
# instance type information into the endpoint dataframe so that the rest of the analysis
# can proceed in the same way for both SageMaker and non-SageMaker deployments
if len(df_experiments_skip_deploy) != 0:
    logger.info(
        f"adding {len(df_experiments_skip_deploy)} "
        f"experiments to df_endpoints from df_experiments_skip_deploy"
    )
    df_endpoints = pd.concat(
        [
            df_endpoints,
            df_experiments_skip_deploy[
                ["experiment_name", "instance_type", "instance_count"]
            ],
        ]
    )
logger.info(f"df_endpoints shape={df_endpoints.shape}, df_endpoints={df_endpoints}")
logger.info(
    f"df_endpoints has {len(df_endpoints.experiment_name.unique())} experiments, "
    f"{df_endpoints.experiment_name.unique}"
)

logger.info(f"df_responses shape={df_responses.shape}, df_endpoints={df_responses}")
logger.info(
    f"df_responses has {len(df_responses.experiment_name.unique())} experiments, "
    f"{df_responses.experiment_name.unique()}"
)
# Merge operation
df_results = pd.merge(
    left=df_responses,
    right=df_endpoints,
    how="left",
    left_on="experiment_name",
    right_on="experiment_name",
)
logger.info(f"df_results shape={df_results.shape}, df_results={df_results}")
logger.info(
    f"df_results has {len(df_results.experiment_name.unique())} experiments, "
    f"{df_results.experiment_name.unique()}"
)
for e, experiment in enumerate(config["experiments"]):
    experiment_name = experiment["name"]
    instance_type = experiment["instance_type"]
    instance_count = experiment["instance_count"]

    logger.info(
        f"index {e+1}, experiment_name={experiment_name}, instance type={instance_type}"
    )
    # Update the instance_type column in df_results where the EndpointName matches
    df_results.loc[
        df_results["experiment_name"] == experiment_name, "instance_type"
    ] = instance_type
    df_results.loc[
        df_results["experiment_name"] == experiment_name, "instance_count"
    ] = instance_count

# Inspect the result
logger.info(
    f"after adding experiment info, df_results shape={df_results.shape}, df_results={df_results}"
)


In [None]:
df_endpoints


In [None]:
# Convert df_results to CSV and write to S3
csv_buffer = io.StringIO()
df_results.to_csv(csv_buffer, index=False)
csv_data_results = csv_buffer.getvalue()
results_file_name = config["report"]["per_inference_request_file"].format(
    datetime=date_time
)
results_s3_path = os.path.join(METRICS_DIR, results_file_name)
logger.info(f"results s3 path for per inference csv --> {results_s3_path}")
write_to_s3(
    csv_data_results, config["aws"]["bucket"], "", METRICS_DIR, results_file_name
)
logger.info(
    f"saved results dataframe of shape={df_results.shape} in s3://{BUCKET_NAME}/{results_s3_path}"
)


In [None]:
# Ensure the metadata directory exists
os.makedirs(METADATA_DIR, exist_ok=True)

# Path for the metrics_path.txt file
metrics_path_file = os.path.join(METADATA_DIR, "metrics_path.txt")
logger.info(f"the metrics metadata path is saved here --> {metrics_path_file}")

# Write the METRICS_DIR to metrics_path.txt
with open(metrics_path_file, "w") as file:
    file.write(METRICS_DIR)

# Write this data to S3
write_to_s3(METRICS_DIR, config["aws"]["bucket"], "", DATA_DIR, "metrics_path.txt")

logger.info(
    f"the information on the defined path for results on these metrics are given in this --> {METRICS_DIR}"
)


In [None]:
df_metrics


In [None]:
logger.info(f"df_metrics cols = {df_metrics.columns}")
logger.info(f"df_endpoints cols = {df_endpoints.columns}")
df_metrics = pd.merge(
    left=df_metrics,
    right=df_endpoints,
    how="left",
    left_on="experiment_name",
    right_on="experiment_name",
)
logger.info(df_metrics)
# Convert df_metrics to CSV and write to S3
csv_buffer = io.StringIO()
df_metrics.to_csv(csv_buffer, index=False)
csv_data_metrics = csv_buffer.getvalue()
metrics_file_name = config["report"]["all_metrics_file"].format(datetime=date_time)
metrics_s3_path = os.path.join(METRICS_DIR, metrics_file_name)
logger.info(f"results s3 path for metrics csv --> {metrics_s3_path}")
write_to_s3(
    csv_data_metrics, config["aws"]["bucket"], "", METRICS_DIR, metrics_file_name
)
logger.info(
    f"saved metrics results dataframe of shape={df_metrics.shape} in s3://{config['aws']['bucket']}/{metrics_s3_path}"
)
