# Step 3 - Evaluate Model Latency

## Introduction

Latency is one of the three critical dimensions in model migration evaluation, alongside quality and cost. Measuring performance metrics accurately is essential for making informed migration decisions.

In this notebook, we'll implement a robust framework to measure and compare latency across our candidate models. We'll generate responses using the optimized prompts we created in the previous step and collect detailed performance metrics.

## Common Latency Metrics to measure

* Time-To-First-Token (TTFT): Measures how quickly the model produces its first response token
* Token Throughput Per Second (TTPS): Indicates how many tokens the model generates per second after the first token
* Overall Latency: Measures total time from request submission to complete response delivery

| Application Type | Primary API | Key Metrics | Examples |
|-----------------|-------------|-------------|----------|
| Streaming | [ConverseStream](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html), [InvokeModelWithResponseStream](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModelWithResponseStream.html) | TTFT, TTPS | Chatbots, virtual assistants, live content creation |
| Non-streaming | [Converse](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html), [InvokeModel](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html) | Overall Latency | Document summarization, batch processing, report generation |

In this workshop, since we have the Documentation Summarization use case, we will measure the overall latency only.
## What This Evaluation Produces

### 1. Detailed Log Files

A log file is automatically generated in your working directory as `model_latency_benchmarking-{timestamp}.log`, tracking all API calls, errors, and execution details to provide a complete audit trail and debugging information for your benchmarking process.

### 2. Results CSV Files

Results are saved in the `../outputs/` directory as `document_summarization_{model_id}_{timestamp}.csv`, containing key metrics including total latency, server-side processing time, token usage counts, API status indicators, and configuration details for comprehensive performance analysis.

These results will be used in Step 5 to create comprehensive model comparisons and visualizations.

## Benchmarking Guidelines

For statistically valid latency evaluation, consider these principles:

| Parameter | Description | Workshop Setting | Production Recommendation |
|-----------|-------------|-----------------|---------------------------|
| `invocations_per_scenario` | Repetitions per prompt | 1 (for workshop efficiency) | 10+ for statistical significance |
| `experiment_counts` | Times to repeat the whole experiment | 1 (for workshop) | Multiple runs across different days/times |
| `num_parallel_calls` | Concurrent API requests | 1 (to avoid throttling) | Match your production concurrency |

> **⚠️ Statistical Note:** While we're using simplified parameters for this workshop, production evaluations should follow more rigorous statistical practices. The Central Limit Theorem tells us that with sufficient samples (1000+), our metrics will approximate a normal distribution. Ideally, you should:
> - Collect samples across multiple days to account for time-of-day variations
> - Include peak traffic periods in your sampling
> - Match your test distribution to your actual production traffic patterns

Let's begin our latency evaluation!


In [1]:
import subprocess
import sys
import boto3
import botocore
import random
import pprint
import time
import json
import argparse
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import pytz
import os
import logging
from botocore.config import Config
from botocore.exceptions import ClientError
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from typing import List, Dict
from tqdm.notebook import tqdm
from IPython.display import display


In [2]:

### load our tracking df

evaluation_tracking_file = '../data/evaluation_tracking.csv'
evaluation_tracking = pd.read_csv(evaluation_tracking_file)
display(evaluation_tracking)

Unnamed: 0,model,model_clean_name,text_prompt,region,inference_profile
0,source_model,source_model,"\nFirst, please read the article below.\n{cont...",us-east-1,standard
1,amazon.nova-lite-v1:0,amazon.nova-lite-v1-0,## Instruction\nYour task is to read the given...,us-east-1,standard
2,us.anthropic.claude-3-5-haiku-20241022-v1:0,us.anthropic.claude-3-5-haiku-20241022-v1-0,<task>\nYour task is to provide an extremely c...,us-east-1,standard


## Configuration Setup

### Defining Our Benchmarking Parameters

The key parameters we'll configure include:

- **Data Source**: Location of our test documents
- **Experiment Repetitions**: How many times to run the complete test suite
- **Invocation Settings**: Controls for API call frequency and parallelism
- **Model Parameters**: Configuration for temperature, token limits, etc.

These settings directly impact the quality and reliability of our latency measurements.


In [3]:
# location of the prompt dataset and directory to save the results 
documents_path = "../data/document_sample_10.csv"
directory = "../outputs" ## output to this directory

dt = datetime.fromtimestamp(time.time(), tz=pytz.utc)
job_timestamp_iso = dt.strftime('%Y-%m-%dT%H:%M:%SZ')

# Configuration to repeat experiment for reliable metrics
scenario_config = {
    "sleep_between_invocations": 5, # in seconds
    "invocations_per_scenario": 1 # number of times you want to run the same prompt to get more samples - note: this means more cost 
}

# Set the number of parallel calls
num_parallel_calls = 1 ## STRONGLY RECOMMEND TO SET TO 1, TO AVOID THROTTLING IN WORKSHOP

# how many times do you want to run the experiment (increase this for longer experiments, helps with more reliable numbers)
experiment_counts = 1

# Other inference parameters
MAX_TOKENS = 1024
TEMPERATURE = 0
TOP_P = 1
EXPERIMENT_NAME = f'experiment_{job_timestamp_iso}' # your custom experiment name

In [4]:
### We need to lock logging becasue we will use threads to evaluate the concurrency

logging_lock = Lock()
os.makedirs(f"{directory}", exist_ok=True)
os.makedirs(f"{directory}-analysis", exist_ok=True)

# Configure logging
logging.basicConfig(filename=f"model_latency_benchmarking-{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", 
                    level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')


## Evaluation Framework Implementation

### Core Benchmarking Functions

This section contains the heart of our latency evaluation system - the functions that handle the actual API calls, timing measurements, and result processing. 

> **Note for Workshop Participants**: This is the most complex part of the notebook. You don't need to fully understand every line of code, but it's helpful to grasp the overall approach to latency measurement. The key concept is that we're making controlled API calls and precisely measuring the time taken for each response.

> **For Advanced Users**: This implementation includes thread-safe logging and concurrent execution capabilities that mirror production-grade benchmarking systems. These features ensure accurate measurements even under parallel testing conditions.


In [5]:
def post_iteration(scenario_config):
    logging.info(f'Sleeping for {scenario_config["sleep_between_invocations"]} seconds.')
    time.sleep(scenario_config["sleep_between_invocations"])

def benchmark(client, prompt, latency_inference_profile, model_id, inferenceConfig, system_config, sleep_on_throttling=5):
    accept = 'application/json'
    content_type = 'application/json'
    api_call_status = 'Success'
    full_error_message = 'Success'
    dt = datetime.fromtimestamp(time.time(), tz=pytz.utc)
    job_timestamp_iso = dt.strftime('%Y-%m-%dT%H:%M:%SZ')

      
    messages = [{"role": "user", "content": [{"text": prompt}]}]
    output_tokens, input_tokens = None, None

    while True:
        try:
            start = time.time()
           # print(f"=============step 2222 in benchmark==================")
            response = client.converse(
                modelId=model_id,
                messages=messages,
                inferenceConfig=inferenceConfig,
                system=system_config
            )
            end = time.time()
            latency = round(end - start, 2)
    
            # Process and print the response
            result = response.get("output")
            input_tokens = response["usage"]["inputTokens"]
            output_tokens = response["usage"]["outputTokens"]
            model_latencyMs = response["metrics"]["latencyMs"]
            output_list = result["message"].get("content", [])
            model_response = "\n".join([x["text"] for x in output_list])
            #print(f"=============model response: ==================\n{model_response}")


        except ClientError as err:
            full_error_message = err
            api_call_status = err.response['Error']['Code']
            print(f"Got Error: {api_call_status}")
            print(f"Full Error Message: {full_error_message}")
            break
        else:
            break
    return model_response, latency, model_latencyMs, job_timestamp_iso, api_call_status, full_error_message, output_tokens, input_tokens

def execute_benchmark(client, scenarios, scenario_config, inferenceConfig, system_config, num_parallel_calls=4, early_break=False):
    pp = pprint.PrettyPrinter(indent=2)
    all_invocations = []
    
    # Create a master progress bar for overall scenarios
    master_pbar = tqdm(total=len(scenarios), desc="Scenarios", position=0, leave=True)
    
    def process_scenario(scenario_idx, scenario):
        local_invocations = []
        prompt = scenario['prompt']
        #print(f"=============111111==================\n{prompt}")
            
        for invocation_id in range(scenario_config["invocations_per_scenario"]):
            try:
                model_response, latency, model_latencyMs, job_timestamp_iso, api_call_status, \
                full_error_message, model_output_tokens, model_input_tokens = benchmark(
                    client,
                    prompt,
                    latency_inference_profile=scenario['latency_inference_profile'],
                    model_id=scenario['model_id'],
                    inferenceConfig = inferenceConfig,
                    system_config = system_config,
                    sleep_on_throttling=scenario_config['sleep_between_invocations']
                )

                invocation = {
                    'model': scenario['model_id'],
                    'model_clean_name': scenario['model_clean_name'],
                    'region': scenario['region'],
                    'invocation_id': invocation_id,
                    'prompt':prompt,
                    'model_response':model_response,
                    'referenceResponse': scenario['referenceResponse'],
                    'latency':latency,
                    'model_latencyMs':model_latencyMs,
                    'job_timestamp_iso': job_timestamp_iso,
                    'model_input_tokens': model_input_tokens,
                    'model_output_tokens': model_output_tokens,
                    'api_call_status': api_call_status,
                    'full_error_message': full_error_message,
                    'TEMPERATURE': TEMPERATURE,
                    'TOP_P': TOP_P,
                    'EXPERIMENT_NAME': EXPERIMENT_NAME,
                    #'task_type': scenario['task_type'],
                    'inference_profile': scenario['latency_inference_profile'],
                }
                local_invocations.append(invocation)
                
                # Thread-safe logging
                with logging_lock:
                    logging.info(f'Invocation: {invocation}')
                
                post_iteration(scenario_config=scenario_config)
                
            except Exception as e:
                with logging_lock:
                    logging.error(f"Error while processing scenario: {scenario['model_id']}. Error: {e}")
                
        # Update master progress bar when a scenario is complete
        master_pbar.update(1)
        return local_invocations

    # Execute scenarios in parallel
    with ThreadPoolExecutor(max_workers=num_parallel_calls) as executor:
        # Submit all scenarios and store futures
        future_to_scenario = {executor.submit(process_scenario, idx, scenario): scenario 
                            for idx, scenario in enumerate(scenarios)}
        
        # Print initial state
        # print(f"Total scenarios submitted: {len(future_to_scenario)}")
        # print(f"Number of parallel workers: {num_parallel_calls}")
        
        # Monitor futures as they complete
        start_time = time.time()
        
        for future in concurrent.futures.as_completed(future_to_scenario):
            scenario = future_to_scenario[future]
            current_time = time.time() - start_time
            
            try:
                result = future.result()
                all_invocations.extend(result)
            except Exception as e:
                with logging_lock:
                    logging.error(f"Scenario failed: {e}")

        master_pbar.close()
        return all_invocations

## Loading Test Data

### Preparing Our Document Samples

To evaluate latency consistently across models, we need a standardized set of input documents. We'll use the dataset from our previous steps - a collection of news articles from the XSum dataset.

Each document will be processed by each of our candidate models using their respective optimized prompts. 

The dataset includes both the source documents and reference summaries, though for latency evaluation we're primarily focused on processing the source documents.


In [6]:
documents = pd.read_csv(documents_path)
documents.head(5)

Unnamed: 0,document,referenceResponse
0,"Rita King, 81, who had dementia, died after be...",An 87-year-old man who shot his wife dead at a...
1,Scottish winger Matt Williams' early touchdown...,Worcester Warriors booked their place in the C...
2,"Joe Lawton, 17, shot himself at his family far...",The parents of a boy who killed himself after ...
3,Here's BBC Sport's day-by-day guide so you kno...,The Paralympic Games are about to reach their ...
4,"It said such a move would ""reduce the burden"" ...",British Airways should automatically compensat...


## Running Latency Evaluations
### Executing Model Benchmarks
Now we're ready to run our latency evaluations against each candidate model. This process will:

1. Skip the source model (we already have its metrics from existing data)
2. Process each candidate model sequentially
3. Generate responses for each document in our dataset
4. Capture detailed performance metrics for every API call
5. Save comprehensive results to CSV files for later analysis


> This is a time-consuming part of the notebook. Depending on your settings and the number of documents, it may take several minutes to complete as we make multiple API calls with appropriate spacing between them to avoid throttling.

Note: In the code below, we're using progress bars to help visualize the benchmarking process. This gives you real-time feedback on how far along each model's evaluation has progressed.

In [7]:
# Process each model with its own standalone progress bar
for index, evaluation in evaluation_tracking.iterrows():
    ## skip the source model
    if evaluation['model'] == "source_model":
        continue
    
    use_cases_scenarios = []
    model_id = evaluation['model']
    print(f"========================== GENERATING RESPONSE WITH MODEL {model_id} ======================\n")
    prompt = evaluation['text_prompt']
    region = evaluation['region']
    latency_inference_profile = evaluation['inference_profile']

    inferenceConfig={
        'maxTokens': MAX_TOKENS,
        'temperature': TEMPERATURE,
        'topP': TOP_P
    }
    system_config = []
    system_config.append({"text": "You are an assistant."})
    
    # Create a progress bar for documents processing for this model
    print(f"Creating scenarios for model {model_id}:")
    for index, row in tqdm(documents.iterrows(), total=len(documents), desc=f"Building scenarios"):
        user_prompt = prompt.format(
            context=row['document']
        )

        use_cases_scenarios.append({
            "prompt": user_prompt,
            "referenceResponse": row['referenceResponse'],
            "model_id": model_id,
            "model_clean_name": evaluation['model_clean_name'],
            "region": region,
            "latency_inference_profile": latency_inference_profile
        })
    
    # Main experiment loop with its own progress bar
    print(f"Running experiments for model {model_id}:")
    for run_count in tqdm(range(1, experiment_counts + 1), desc=f"Experiment runs"):
        selected_scenarios = random.sample(
            use_cases_scenarios, 
            k=len(use_cases_scenarios) // 1
        )
    
        with logging_lock:
            logging.info(f"{len(selected_scenarios)} scenarios x {scenario_config['invocations_per_scenario']} invocations = {len(selected_scenarios) * scenario_config['invocations_per_scenario']} total invocations")
        
        logging.info(f"Running iteration {run_count}")
        
        # Create a new client for the main thread
        client = boto3.client(
            service_name='bedrock-runtime',
            region_name=region
        )
        
        # Run the scenarios and measure times
        invocations = execute_benchmark(
            client, 
            selected_scenarios, 
            scenario_config,
            inferenceConfig,
            system_config,
            num_parallel_calls=num_parallel_calls,
            early_break=False
        )
        
        # Convert the invocations list to a pandas DataFrame
        df = pd.DataFrame(invocations)
        df['timestamp'] = pd.Timestamp.now()
        df['run_count'] = run_count

        # Write the DataFrame to a CSV file
        document_summarization_output_file_name = f"../outputs/document_summarization_{model_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

        df.to_csv(document_summarization_output_file_name, index=False)
    
        with logging_lock:
            logging.info(f"Results written to {document_summarization_output_file_name}")
            logging.info(f"Completed run {run_count} of {scenario_config['invocations_per_scenario']}")
    
    # Add a visual separator between models
    print(f"\n{'='*80}\n")



Creating scenarios for model amazon.nova-lite-v1:0:


Building scenarios:   0%|          | 0/10 [00:00<?, ?it/s]

Running experiments for model amazon.nova-lite-v1:0:


Experiment runs:   0%|          | 0/1 [00:00<?, ?it/s]

Scenarios:   0%|          | 0/10 [00:00<?, ?it/s]




Creating scenarios for model us.anthropic.claude-3-5-haiku-20241022-v1:0:


Building scenarios:   0%|          | 0/10 [00:00<?, ?it/s]

Running experiments for model us.anthropic.claude-3-5-haiku-20241022-v1:0:


Experiment runs:   0%|          | 0/1 [00:00<?, ?it/s]

Scenarios:   0%|          | 0/10 [00:00<?, ?it/s]





## Verifying Evaluation Results


Before moving on, it's important to verify that our latency evaluation ran successfully and produced the expected outputs. 

Let's examine the output directory and peek at the contents of our result files:


In [8]:
! ls -l ../outputs/

total 356
-rw-rw-r-- 1 ec2-user ec2-user  56287 Jun 24 00:34 document_summarization_amazon.nova-lite-v1:0_20250624_003419.csv
-rw-rw-r-- 1 ec2-user ec2-user 242338 Jun  1 01:08 document_summarization_source_model.csv
-rw-rw-r-- 1 ec2-user ec2-user  58047 Jun 24 00:35 document_summarization_us.anthropic.claude-3-5-haiku-20241022-v1:0_20250624_003534.csv


In [9]:
! head -3 ../outputs/document_summarization_amazon.nova-lite-v1:0_*.csv

model,model_clean_name,region,invocation_id,prompt,model_response,referenceResponse,latency,model_latencyMs,job_timestamp_iso,model_input_tokens,model_output_tokens,api_call_status,full_error_message,TEMPERATURE,TOP_P,EXPERIMENT_NAME,inference_profile,timestamp,run_count
amazon.nova-lite-v1:0,amazon.nova-lite-v1-0,us-east-1,0,"## Instruction
Your task is to read the given article in the Ronan O'Mahony and James Cronin both notched two tries with Tommy O'Donnell and the Scannell brothers Niall and Rory also touching down for Munster.


## Saving Our Progress - Updating the Evaluation Tracking System

Now that we've successfully completed our latency evaluations, we'll update our central tracking dataframe to record:

1. The location of our latency evaluation results
2. Any additional metadata about the evaluation process
3. References to output files for later analysis

This information will be critical in Step 5 when we consolidate all our evaluation data for the final comparison. The tracking file maintains continuity between the different evaluation steps and ensures we can easily access all results during the analysis phase.


In [10]:
## saving the progress

evaluation_tracking['latency_evaluation_output'] = "../outputs/"

evaluation_tracking.to_csv(evaluation_tracking_file, index=False)

## Summary and Next Steps

### What We've Accomplished

In this notebook, we've successfully:

1. ✅ **Implemented** a comprehensive latency evaluation framework
2. ✅ **Generated** responses from our candidate models using optimized prompts
3. ✅ **Measured** critical performance metrics including end-to-end latency
4. ✅ **Collected** token usage data for later cost analysis
5. ✅ **Saved** detailed results for further analysis


### Next Steps

In the next notebook, **Step 4 - Evaluate Model Quality**, we'll focus on the second critical dimension of model evaluation: response quality. We'll use automated techniques like LLM-as-a-Judge to assess how well each model performs the summarization task.

By combining the latency metrics we've gathered here with the quality metrics from the next step, we'll build a comprehensive understanding of each model's strengths and weaknesses, enabling data-driven migration decisions.

> **For Production Implementations:** Remember that real-world implementations would typically include more extensive testing with larger sample sizes, testing across different time periods, and evaluating under various load conditions. The approach demonstrated here can be scaled up for more robust evaluation in production scenarios.
