- Time to First Token (TTFT): latency before the first output token is produced.
- End-to-End Request Latency : how long it takes from submitting a query to receiving the full response
- Time Per Output Token (TPOT): average generation speed in tokens per second. (also known as Inter-token Latency (ITL))
- Token Generation Time (TGT): duration from first to last token.
- Total Latency: TTFT + TGT.

In [9]:
# loosely based on code acquired from https://github.com/rumanxyz/llm-perf-benchmark
import torch
import time
import GPUtil
import numpy as np
import traceback
import threading
from transformers import TextIteratorStreamer
from typing import Optional, Dict, Any, List, Union


class GPUMonitor:
    def __init__(self, monitoring_interval: float = 0.1, gpu_indices: Optional[List[int]] = None):
        self.monitoring_interval = monitoring_interval
        self.gpu_indices = gpu_indices
        self._mem_samples = []
        self._util_samples = []
        self._is_monitoring = False
        self._monitoring_thread = None

    def start(self):
        self._is_monitoring = True
        self._mem_samples.clear()
        self._util_samples.clear()

        def monitor_gpu():
            while self._is_monitoring:
                try:
                    gpus = GPUtil.getGPUs()
                    if gpus:
                        idxs = self.gpu_indices if self.gpu_indices is not None else list(range(len(gpus)))
                        mem_mb = 0.0
                        util_pct = 0.0

                        for i in idxs:
                            gpu = gpus[i]
                            mem_mb += self._gpu_memory_usage.append(gpu.memoryUsed)
                            util_pct += self._gpu_utilization.append(gpu.load * 100)

                        self._mem_samples.append(mem_mb)
                        self._util_samples.append(util_pct)
                    time.sleep(self.monitoring_interval)
                except Exception as e:
                    print(f"GPU monitoring error: {e}")
                    break

        self._monitoring_thread = threading.Thread(target=monitor_gpu, daemon=True)
        self._monitoring_thread.start()

    def stop(self):
        """Stop GPU monitoring"""
        self._is_monitoring = False
        if self._monitoring_thread:
            self._monitoring_thread.join()

    def peak_mem(self) -> float:
        return max(self._mem_samples) if self._mem_samples else 0.0

    def p90_mem(self) -> float:
        return float(np.percentile(self._mem_samples, 90)) if self._mem_samples else 0.0

    def peak_util(self) -> float:
        return max(self._util_samples) if self._util_samples else 0.0

    def p90_util(self) -> float:
        return float(np.percentile(self._util_samples, 90)) if self._util_samples else 0.0


def benchmark_single_prompt(
    model,
    tokenizer,
    input_prompt_text: str,
    temperature: float = 1.0,
    top_p: float = 0.95,
    max_new_tokens: int = 100,
    device: Optional[str] = None,
    gpu_indices: Optional[List[int]] = None,
) -> Dict[str, Any]:
    """
    Benchmark a language model's performance for a single prompt.
    """
    # Determine device
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    if device.startswith("cuda"):
        try:
            torch.cuda.reset_peak_memory_stats()
        except:
            pass

    # GPU monitoring setup
    gpu_monitor = GPUMonitor(monitoring_interval=0.1, gpu_indices=gpu_indices)
    gpu_monitor.start()

    # Tokenize input
    start_input_process = time.time()
    inputs = tokenizer(input_prompt_text, return_tensors="pt").to(device)
    input_process_time = time.time() - start_input_process

    generation_kwargs = {
        'input_ids': inputs.input_ids,
        'attention_mask': inputs.attention_mask,
        'max_new_tokens': max_new_tokens,
        'temperature': temperature,
        'top_p': top_p,
        'do_sample': temperature is not None and temperature > 0,
        'return_dict_in_generate': True,
        'output_scores': False
    }

    # Streaming generation setup
    streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=False)
    gen_kwargs["streamer"] = streamer

    generation_start_time = time.time()
    first_token_time = None
    first_token_wall = None
    result_holder = {}

    def _generate():
        result_holder["out"] = model.generate(**gen_kwargs)

    generation_thread = threading.Thread(target=generate, daemon=True)
    generation_thread.start()

    # Streaming generation loop
    try:
        for token in streamer:
            if first_token_time is None:
                first_token_time = time.time() - generation_start_time
                first_token_start_time = time.time()
    except Exception as e:
        print(f"Generation error: {e}")
        print(f"Error trace:\n{traceback.format_exc()}")
        gpu_monitor.stop()
        return {}

    # Stop GPU monitoring
    generation_thread.join()
    gpu_monitor.stop()

    if "out" not in result_holder:
        return {}

    output = result_holder["out"]
    sequences = output.sequences

    total_len = int(sequences.shape[1])
    input_tokens = int(enc.input_ids.shape[1])
    output_tokens = max(total_len - input_tokens, 0)
    total_tokens = input_tokens + output_tokens

    total_generation_time = time.time() - generation_start_time

    if first_token_wall is not None:
        decode_time = max(time.time() - first_token_wall, 1e-9)
        ttft = first_token_time
    else:
        decode_time = total_time
        ttft = None

    # Metrics
    total_tps = (input_tokens + output_tokens) / max(total_time, 1e-9)
    decode_tps = (output_tokens) / max(decode_time, 1e-9)

    # GPU metrics
    peak_gpu_usage = mon.peak_mem()
    p90_gpu_usage = mon.p90_mem()
    peak_gpu_utilization = mon.peak_util()
    p90_gpu_utilization = mon.p90_util()

    benchmark_results = {
        'total_generation_time': total_generation_time,
        'time_to_first_token_seconds': ttft,
        'token_generation_time' : decode_time,
        'time_per_output_token' : 1 / decode_tps,
        'input_tokens': input_tokens,
        'output_tokens': output_tokens,
        'total_tokens': total_tokens,
        'tokens_per_second': total_tps,
        'output_decode_tokens_per_second': decode_tps,
        'input_process_time_seconds': input_process_time,
        'e2e_latency' : ttft + total_generation_time, 
        'peak_gpu_memory_mb': peak_gpu_usage,
        'p90_gpu_memory_mb': p90_gpu_usage,
        'peak_gpu_utilization': peak_gpu_utilization,
        'p90_gpu_utilization': p90_gpu_utilization
    }

    return benchmark_results


def benchmark_language_model(
    model,
    tokenizer,
    prompts: List[str],
    temperature: float = 1.0,
    top_p: float = 0.95,
    max_new_tokens: int = 100,
    device: Optional[str] = None,
    gpu_indices: Optional[List[int]] = None,
) -> Dict[str, Union[float, List[Dict[str, Any]]]]:
    """
    Benchmark a language model's performance across multiple prompts.
    """
    prompt_results = []
    for prompt in prompts:
        result = benchmark_single_prompt(
            model,
            tokenizer,
            prompt,
            temperature,
            top_p,
            max_new_tokens,
            device
        )
        if result:
            prompt_results.append(result)

    if not prompt_results:
        return {}

    # Extract metric lists for aggregation
    ttft_list = [result['time_to_first_token_seconds'] for result in prompt_results]
    tpot_list = [result['time_per_output_token'] for result in prompt_results]
    tgt_list = [result['total_generation_time'] for result in prompt_results]
    e2e_latency_list = [result['e2e_latency'] for result in prompt_results]
    decode_tps_list = [result['output_decode_tokens_per_second'] for result in prompt_results]
    gpu_usage_list = [result['peak_gpu_memory_mb'] for result in prompt_results]
    gpu_util_list = [result['peak_gpu_utilization'] for result in prompt_results]

    # Aggregate metrics
    aggregate_results = {
        # Time to First Token (TTFT) metrics
        'p50_ttft_seconds': round(np.percentile(ttft_list, 50), 3),
        'p90_ttft_seconds': round(np.percentile(ttft_list, 90), 3),

        # Time per output token (TPOT) metrics
        'p50_tpot_seconds': round(np.percentile(tpot_list, 50), 3),
        'p90_tpot_seconds': round(np.percentile(tpot_list, 90), 3),
        
        # Total generation time (TGT) metrics
        'p50_tgt_seconds': round(np.percentile(tgt_list, 50), 3),
        'p90_tgt_seconds': round(np.percentile(tgt_list, 90), 3),
        
        # End to end latency (e2e latency) metrics
        'p50_e2elatency_seconds': round(np.percentile(e2e_latency_list, 50), 3),
        'p90_e2elatency_seconds': round(np.percentile(e2e_latency_list, 90), 3),

        # Output Decode Tokens Per Second metrics
        'p50_decode_tps': round(np.percentile(decode_tps_list, 50), 3),
        'p90_decode_tps': round(np.percentile(decode_tps_list, 90), 3),

        # GPU Memory Usage metrics
        'max_gpu_memory_mb': round(max(gpu_usage_list), 3),
        'p90_gpu_memory_mb': round(np.percentile(gpu_usage_list, 90), 3),

        # GPU Utilization metrics
        'max_gpu_utilization': round(max(gpu_util_list), 3),
        'p90_gpu_utilization': round(np.percentile(gpu_util_list, 90), 3)
    }

    return aggregate_results
