diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 02a1ed7fbd5e..d74f988f8935 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -1,10 +1,7 @@ name: Self-hosted runner (benchmark) on: - push: - branches: [main] - pull_request: - types: [ opened, labeled, reopened, synchronize ] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} diff --git a/.github/workflows/benchmark_v2.yml b/.github/workflows/benchmark_v2.yml index fc9e07635185..80b31345409d 100644 --- a/.github/workflows/benchmark_v2.yml +++ b/.github/workflows/benchmark_v2.yml @@ -1,35 +1,7 @@ name: Benchmark v2 Framework on: - workflow_call: - inputs: - runner: - description: 'GH Actions runner group to use' - required: true - type: string - container_image: - description: 'Docker image to use' - required: true - type: string - container_options: - description: 'Container options to use' - required: true - type: string - commit_sha: - description: 'Commit SHA to benchmark' - required: false - type: string - default: '' - run_id: - description: 'Custom run ID for organizing results (auto-generated if not provided)' - required: false - type: string - default: '' - benchmark_repo_id: - description: 'HuggingFace Dataset to upload results to (e.g., "org/benchmark-results")' - required: false - type: string - default: '' + workflow_dispatch: env: HF_HOME: /mnt/cache @@ -82,4 +54,4 @@ jobs: --token '${{ secrets.TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN }}' \ --log-level INFO env: - HF_TOKEN: ${{ secrets.HF_HUB_READ_TOKEN }} \ No newline at end of file + HF_TOKEN: ${{ secrets.HF_HUB_READ_TOKEN }} diff --git a/.github/workflows/benchmark_v2_a10_caller.yml b/.github/workflows/benchmark_v2_a10_caller.yml index 6573d398b000..1a6f5beae5b8 100644 --- a/.github/workflows/benchmark_v2_a10_caller.yml +++ b/.github/workflows/benchmark_v2_a10_caller.yml @@ -1,11 +1,7 @@ name: Benchmark v2 Scheduled Runner - A10 Single-GPU on: - schedule: - # Run daily at 16:30 UTC - - cron: "30 16 * * *" - pull_request: - types: [ opened, labeled, reopened, synchronize ] + workflow_dispatch: jobs: benchmark-v2-default: @@ -18,4 +14,4 @@ jobs: commit_sha: ${{ github.sha }} run_id: ${{ github.run_id }} benchmark_repo_id: hf-internal-testing/transformers-daily-benchmarks - secrets: inherit \ No newline at end of file + secrets: inherit diff --git a/.github/workflows/benchmark_v2_mi325_caller.yml b/.github/workflows/benchmark_v2_mi325_caller.yml index ed403148e596..94ca382c15d0 100644 --- a/.github/workflows/benchmark_v2_mi325_caller.yml +++ b/.github/workflows/benchmark_v2_mi325_caller.yml @@ -1,11 +1,7 @@ name: Benchmark v2 Scheduled Runner - MI325 Single-GPU on: - schedule: - # Run daily at 16:30 UTC - - cron: "30 16 * * *" - pull_request: - types: [ opened, labeled, reopened, synchronize ] + workflow_dispatch: jobs: benchmark-v2-default: @@ -18,4 +14,4 @@ jobs: commit_sha: ${{ github.sha }} run_id: ${{ github.run_id }} benchmark_repo_id: hf-internal-testing/transformers-daily-benchmarks - secrets: inherit \ No newline at end of file + secrets: inherit diff --git a/benchmark_v2/.gitignore b/benchmark_v2/.gitignore index 2f3040f513f2..a4ae560adee0 100644 --- a/benchmark_v2/.gitignore +++ b/benchmark_v2/.gitignore @@ -1 +1,2 @@ -benchmark_results/ \ No newline at end of file +benchmark_results/ +benchmark_results_profiles/ diff --git a/benchmark_v2/benches/__init__.py b/benchmark_v2/benches/__init__.py deleted file mode 100644 index 64b106a00370..000000000000 --- a/benchmark_v2/benches/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Benchmark implementations directory diff --git a/benchmark_v2/benches/llama.py b/benchmark_v2/benches/llama.py deleted file mode 100644 index 2349e75f1347..000000000000 --- a/benchmark_v2/benches/llama.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright 2025 The HuggingFace Team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -from typing import Any - -import torch -from benchmark_framework import ModelBenchmark - - -os.environ["TOKENIZERS_PARALLELISM"] = "1" -torch.set_float32_matmul_precision("high") - - -class LLaMABenchmark(ModelBenchmark): - """Simplified LLaMA model benchmark implementation using the ModelBenchmark base class.""" - - def __init__(self, logger: logging.Logger): - super().__init__(logger) - self._default_prompt = "Why dogs are so cute?" # Custom prompt for LLaMA - - def get_scenario_configs(self) -> list[dict[str, Any]]: - """ - Get LLaMA-specific scenario configurations. - - Returns: - List of scenario configuration dictionaries - """ - return [ - # Eager variants - {"variant": "eager", "compile_mode": None, "use_cache": True, "description": "Eager execution with cache"}, - # Compiled variants - { - "variant": "compiled", - "compile_mode": "max-autotune", - "use_cache": True, - "description": "Compiled with max autotune", - }, - # Kernelized variant (if available) - { - "variant": "kernelized", - "compile_mode": "max-autotune", - "use_cache": True, - "description": "Kernelized execution", - }, - ] - - def _is_kernelization_available(self) -> bool: - """Check if kernelization is available for LLaMA.""" - try: - from kernels import Mode, kernelize # noqa: F401 - - return True - except ImportError: - self.logger.debug("Kernelization not available: kernels module not found") - return False - - def get_default_generation_config(self) -> dict[str, Any]: - """Get LLaMA-specific generation configuration.""" - return { - "do_sample": False, - "top_p": 1.0, - "temperature": 1.0, - "repetition_penalty": 1.0, - "max_new_tokens": None, # Will be set per scenario - } - - def get_model_init_kwargs(self, config) -> dict[str, Any]: - """Get LLaMA-specific model initialization kwargs.""" - return { - "torch_dtype": getattr(torch, config.torch_dtype), - "attn_implementation": config.attn_implementation, - "use_cache": True, - } - - def get_default_torch_dtype(self) -> str: - """Get default torch dtype for LLaMA.""" - return "float16" # LLaMA works well with float16 - - def get_default_device(self) -> str: - """Get default device for LLaMA.""" - return "cuda" # LLaMA prefers CUDA - - -def run_llama(logger, output_dir, **kwargs): - """ - Run LLaMA benchmark with the given configuration. - - Args: - logger: Logger instance - output_dir: Output directory for results - **kwargs: Additional configuration options - - Returns: - Path to output file if successful - """ - from benchmark_framework import BenchmarkRunner - - # Extract parameters with defaults - model_id = kwargs.get("model_id", "meta-llama/Llama-2-7b-hf") - warmup_iterations = kwargs.get("warmup_iterations", 3) - measurement_iterations = kwargs.get("measurement_iterations", 5) - num_tokens_to_generate = kwargs.get("num_tokens_to_generate", 100) - include_sdpa_variants = kwargs.get("include_sdpa_variants", True) - device = kwargs.get("device", "cuda") - torch_dtype = kwargs.get("torch_dtype", "float16") - batch_size = kwargs.get("batch_size", 1) - commit_id = kwargs.get("commit_id") - - logger.info(f"Starting LLaMA benchmark for model: {model_id}") - logger.info( - f"Configuration: warmup={warmup_iterations}, measurement={measurement_iterations}, tokens={num_tokens_to_generate}" - ) - - try: - # Create benchmark instance - benchmark = LLaMABenchmark(logger) - - # Create scenarios - scenarios = benchmark.create_scenarios( - model_id=model_id, - warmup_iterations=warmup_iterations, - measurement_iterations=measurement_iterations, - num_tokens_to_generate=num_tokens_to_generate, - include_sdpa_variants=include_sdpa_variants, - device=device, - torch_dtype=torch_dtype, - batch_size=batch_size, - ) - - logger.info(f"Created {len(scenarios)} benchmark scenarios") - - # Create runner and execute benchmarks - runner = BenchmarkRunner(logger, output_dir) - results = runner.run_benchmark(benchmark, scenarios, commit_id=commit_id) - - if not results: - logger.warning("No successful benchmark results") - return None - - # Save results - model_name = model_id.split("/")[-1] # Extract model name from ID - output_file = runner.save_results(model_name, results) - - logger.info(f"LLaMA benchmark completed successfully. Results saved to: {output_file}") - return output_file - - except Exception as e: - logger.error(f"LLaMA benchmark failed: {e}") - import traceback - - logger.debug(traceback.format_exc()) - raise diff --git a/benchmark_v2/benchmark_framework.py b/benchmark_v2/benchmark_framework.py deleted file mode 100644 index 3e4005b9f4b0..000000000000 --- a/benchmark_v2/benchmark_framework.py +++ /dev/null @@ -1,1199 +0,0 @@ -# Copyright 2025 The HuggingFace Team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import gc -import json -import logging -import os -import statistics -import sys -import threading -import time -from abc import ABC, abstractmethod -from dataclasses import asdict, dataclass, field -from datetime import datetime -from typing import Any, Optional, TypedDict, Union - -import gpustat -import numpy as np -import psutil -import torch - - -class GPUMetrics(TypedDict): - """GPU monitoring result with GPU metrics.""" - - gpu_utilization_mean: float - gpu_utilization_max: float - gpu_utilization_min: float - gpu_memory_used_mean: float - gpu_memory_used_max: float - gpu_memory_used_min: float - sample_count: int - gpu_monitoring_status: str - - -class NoGPU(TypedDict): - """GPU monitoring result without GPU metrics.""" - - gpu_monitoring_status: str - gpu_monitoring_reason: str - - -class ArchAwareTimer: - """Architecture-aware timer for supposedly better prescision""" - - def __init__(self, device: Optional[str] = None): - """ - Initialize architecture-aware timer. - - Args: - device: Device to use. If None, uses current device. - """ - self.device = device - self.use_cuda = torch.cuda.is_available() - - if self.use_cuda: - if device and device != "cpu": - self.device_obj = torch.device(device) - else: - # Fall back to CPU timing if device is CPU or CUDA not available - self.use_cuda = False - - if self.use_cuda: - try: - # Create CUDA events for timing - self.start_event = torch.cuda.Event(enable_timing=True) - self.end_event = torch.cuda.Event(enable_timing=True) - except RuntimeError: - # Fall back to CPU timing if CUDA events fail - self.use_cuda = False - - if not self.use_cuda: - self.start_time = None - self.end_time = None - - def start(self): - """Start timing.""" - if self.use_cuda: - torch.cuda.synchronize(self.device_obj) - self.start_event.record(stream=torch.cuda.current_stream(self.device_obj)) - else: - self.start_time = time.perf_counter() - - def stop(self): - """Stop timing.""" - if self.use_cuda: - self.end_event.record(stream=torch.cuda.current_stream(self.device_obj)) - torch.cuda.synchronize(self.device_obj) - else: - self.end_time = time.perf_counter() - - def elapsed_time(self) -> float: - """ - Get elapsed time in seconds. - - Returns: - Elapsed time in seconds - """ - if self.use_cuda: - # CUDA events return time in milliseconds, convert to seconds - return self.start_event.elapsed_time(self.end_event) / 1000.0 - else: - if self.start_time is None or self.end_time is None: - raise RuntimeError("Timer not properly started/stopped") - return self.end_time - self.start_time - - @property - def timing_method(self) -> str: - """Get the timing method being used.""" - return "CUDA Events" if self.use_cuda else "CPU perf_counter" - - def __enter__(self): - """Context manager entry.""" - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.stop() - - -@dataclass -class BenchmarkConfig: - """Configuration for a single benchmark scenario.""" - - name: str - model_id: str - variant: str = "eager" # "eager", "compiled", "kernelized" - warmup_iterations: int = 3 - measurement_iterations: int = 10 - num_tokens_to_generate: int = 100 - device: str = "cuda" - torch_dtype: str = "float16" - compile_mode: Optional[str] = None # None, "default", "reduce-overhead", "max-autotune" - compile_options: dict[str, Any] = field(default_factory=dict) - use_cache: bool = True - batch_size: int = 1 - sequence_length: Optional[int] = None - attn_implementation: str = "sdpa" # "eager", "sdpa", "flash_attention_2" - sdpa_backend: Optional[str] = None # None, "math", "flash_attention", "efficient_attention", "cudnn_attention" - custom_params: dict[str, Any] = field(default_factory=dict) - - -class BenchmarkScenario: - """ - A benchmark scenario that encapsulates both configuration and setup logic. - This makes it easier to define and adapt benchmarks for different models. - """ - - def __init__(self, name: str, config: BenchmarkConfig, description: str = ""): - self.name = name - self.config = config - self.description = description - self._setup_callbacks = [] - self._teardown_callbacks = [] - - def add_setup_callback(self, callback: callable): - """Add a callback to be executed during scenario setup.""" - self._setup_callbacks.append(callback) - - def add_teardown_callback(self, callback: callable): - """Add a callback to be executed during scenario teardown.""" - self._teardown_callbacks.append(callback) - - def setup(self, model, tokenizer, logger=None): - """Execute setup callbacks for this scenario.""" - for callback in self._setup_callbacks: - try: - callback(model, tokenizer, self.config, logger) - except Exception as e: - if logger: - logger.warning(f"Setup callback failed for scenario {self.name}: {e}") - - def teardown(self, model, tokenizer, logger=None): - """Execute teardown callbacks for this scenario.""" - for callback in self._teardown_callbacks: - try: - callback(model, tokenizer, self.config, logger) - except Exception as e: - if logger: - logger.warning(f"Teardown callback failed for scenario {self.name}: {e}") - - def __repr__(self): - return f"BenchmarkScenario(name='{self.name}', variant='{self.config.variant}')" - - -@dataclass -class TimingResult: - """Result from a timing measurement.""" - - time_to_first_token_seconds: Optional[float] = None - latency_seconds: float = 0.0 - tokens_per_second: Optional[float] = None - time_per_output_token_seconds: Optional[float] = None - total_tokens_generated: int = 0 - metadata: dict[str, Any] = field(default_factory=dict) - - -@dataclass -class BenchmarkStatistics: - """Statistical analysis of benchmark measurements.""" - - name: str - measurements: list[float] - mean: float - median: float - std: float - min: float - max: float - p25: float # 25th percentile - p75: float # 75th percentile - p90: float # 90th percentile - p95: float # 95th percentile - p99: float # 99th percentile - unit: str = "seconds" - - @classmethod - def from_measurements(cls, name: str, measurements: list[float], unit: str = "seconds") -> "BenchmarkStatistics": - """Create statistics from a list of measurements.""" - if not measurements: - raise ValueError("Cannot create statistics from empty measurements") - - measurements_array = np.array(measurements) - - return cls( - name=name, - measurements=measurements, - mean=float(np.mean(measurements_array)), - median=float(np.median(measurements_array)), - std=float(np.std(measurements_array)), - min=float(np.min(measurements_array)), - max=float(np.max(measurements_array)), - p25=float(np.percentile(measurements_array, 25)), - p75=float(np.percentile(measurements_array, 75)), - p90=float(np.percentile(measurements_array, 90)), - p95=float(np.percentile(measurements_array, 95)), - p99=float(np.percentile(measurements_array, 99)), - unit=unit, - ) - - -@dataclass -class HardwareInfo: - """Hardware information collected during benchmarking.""" - - gpu_name: str - gpu_memory_total_mb: int - cpu_count: int - memory_total_mb: int - python_version: str - torch_version: Optional[str] = None - cuda_version: Optional[str] = None - - -@dataclass -class BenchmarkMetadata: - """Metadata collected for each benchmark run.""" - - timestamp: str - commit_id: str - hardware_info: HardwareInfo - config: BenchmarkConfig - - -class GPUMonitor: - """Monitor GPU utilization during benchmark execution.""" - - def __init__(self, sample_interval: float = 0.1, logger: Optional[logging.Logger] = None): - self.sample_interval = sample_interval - self.logger = logger or logging.getLogger(__name__) - self.stop_event = threading.Event() - self.thread = None - self.gpu_utilization = [] - self.gpu_memory_used = [] - self.timestamps = [] - self.gpu_available = False - self.warning_logged = False - - # Test GPU availability on initialization - self._test_gpu_availability() - - def _test_gpu_availability(self): - """Test if GPU monitoring is available.""" - try: - gpu_stats = gpustat.GPUStatCollection.new_query() - if gpu_stats and len(gpu_stats) > 0: - self.gpu_available = True - self.logger.debug(f"GPU monitoring available: {len(gpu_stats)} GPU(s) detected") - else: - self.gpu_available = False - self.logger.debug("No GPUs detected by gpustat") - except Exception as e: - self.gpu_available = False - self.logger.debug(f"GPU monitoring not available: {e}") - - def start(self): - """Start monitoring GPU metrics.""" - if not self.gpu_available: - self.logger.debug("GPU monitoring disabled: no GPUs available") - return - - # Clear the stop event to enable monitoring - self.stop_event.clear() - self.gpu_utilization = [] - self.gpu_memory_used = [] - self.timestamps = [] - self.warning_logged = False # Reset warning flag for new monitoring session - self.thread = threading.Thread(target=self._monitor_loop) - self.thread.start() - self.logger.debug("GPU monitoring started") - - def stop_and_collect(self) -> Union[GPUMetrics, NoGPU]: - """Stop monitoring and return collected metrics.""" - if not self.gpu_available: - return NoGPU(gpu_monitoring_status="disabled", gpu_monitoring_reason="no_gpus_available") - - # Signal the monitoring thread to stop - self.stop_event.set() - if self.thread: - self.thread.join() - - if self.gpu_utilization: - metrics = GPUMetrics( - gpu_utilization_mean=statistics.mean(self.gpu_utilization), - gpu_utilization_max=max(self.gpu_utilization), - gpu_utilization_min=min(self.gpu_utilization), - gpu_memory_used_mean=statistics.mean(self.gpu_memory_used), - gpu_memory_used_max=max(self.gpu_memory_used), - gpu_memory_used_min=min(self.gpu_memory_used), - sample_count=len(self.gpu_utilization), - gpu_monitoring_status="success", - ) - self.logger.debug(f"GPU monitoring completed: {len(self.gpu_utilization)} samples collected") - return metrics - else: - return NoGPU(gpu_monitoring_status="failed", gpu_monitoring_reason="no_samples_collected") - - def _monitor_loop(self): - """Background monitoring loop using threading.Event for communication.""" - consecutive_failures = 0 - max_consecutive_failures = 5 - - # Continue monitoring until stop_event is set - while not self.stop_event.is_set(): - try: - gpu_stats = gpustat.GPUStatCollection.new_query() - if gpu_stats and len(gpu_stats) > 0: - gpu = gpu_stats[0] - self.gpu_utilization.append(gpu["utilization.gpu"]) - self.gpu_memory_used.append(gpu["memory.used"]) - self.timestamps.append(time.time()) - consecutive_failures = 0 # Reset failure counter on success - else: - consecutive_failures += 1 - if consecutive_failures >= max_consecutive_failures and not self.warning_logged: - self.logger.warning("GPU monitoring: No GPU data returned by gpustat") - self.warning_logged = True - - except Exception as e: - consecutive_failures += 1 - if consecutive_failures >= max_consecutive_failures and not self.warning_logged: - self.logger.warning(f"GPU monitoring failed after {max_consecutive_failures} attempts: {e}") - self.warning_logged = True - - # Use Event.wait() with timeout instead of time.sleep() - # This allows for immediate response to stop signal while still maintaining sample interval - if self.stop_event.wait(timeout=self.sample_interval): - # Event was set, break out of loop immediately - break - - -def get_hardware_info() -> HardwareInfo: - """Collect hardware information.""" - gpu_name = "unknown" - gpu_memory_total = 0 - - try: - gpu_stats = gpustat.GPUStatCollection.new_query() - if gpu_stats and len(gpu_stats) > 0: - gpu = gpu_stats[0] - gpu_name = gpu["name"] - gpu_memory_total = gpu["memory.total"] - except Exception: - pass - - torch_version = torch.__version__ - cuda_version = None - if hasattr(torch, "cuda") and torch.cuda.is_available(): - cuda_version = torch.version.cuda - - return HardwareInfo( - gpu_name=gpu_name, - gpu_memory_total_mb=gpu_memory_total, - cpu_count=psutil.cpu_count(), - memory_total_mb=int(psutil.virtual_memory().total / (1024 * 1024)), - python_version=f"{sys.version.split()[0]}", - torch_version=torch_version, - cuda_version=cuda_version, - ) - - -def flush_memory(): - """Flush GPU memory and run garbage collection.""" - gc.collect() - if hasattr(torch, "cuda") and torch.cuda.is_available(): - torch.cuda.empty_cache() - torch.cuda.reset_max_memory_allocated() - torch.cuda.reset_peak_memory_stats() - torch.cuda.synchronize() - - -def get_sdpa_backend(backend_name: Optional[str]): - """Get the SDPA backend enum from string name.""" - if backend_name is None: - return None - - try: - backend_map = { - "math": torch.nn.attention.SDPBackend.MATH, - "flash_attention": torch.nn.attention.SDPBackend.FLASH_ATTENTION, - "efficient_attention": torch.nn.attention.SDPBackend.EFFICIENT_ATTENTION, - "cudnn_attention": torch.nn.attention.SDPBackend.CUDNN_ATTENTION, - } - return backend_map.get(backend_name.lower()) - except AttributeError: - # torch.nn.attention.SDPBackend not available in older torch versions - return None - - -class SDPAContext: - """Context manager for SDPA kernel selection.""" - - def __init__(self, backend_name: Optional[str], logger: Optional[logging.Logger] = None): - self.backend_name = backend_name - self.logger = logger or logging.getLogger(__name__) - self.backend = get_sdpa_backend(backend_name) if backend_name else None - self.context = None - - def __enter__(self): - if self.backend is not None: - try: - self.context = torch.nn.attention.sdpa_kernel(self.backend) - self.context.__enter__() - if self.logger: - self.logger.debug(f"Using SDPA backend: {self.backend_name}") - except Exception as e: - if self.logger: - self.logger.warning(f"Failed to set SDPA backend {self.backend_name}: {e}") - self.context = None - elif self.backend_name and self.logger: - self.logger.debug( - f"SDPA backend '{self.backend_name}' requested but not using kernel context (backend={self.backend})" - ) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.context is not None: - try: - self.context.__exit__(exc_type, exc_val, exc_tb) - except Exception as e: - if self.logger: - self.logger.warning(f"Error exiting SDPA context: {e}") - return False - - -class AbstractModelBenchmark(ABC): - """Abstract base class for model benchmarks.""" - - def __init__(self, logger: logging.Logger): - self.logger = logger - self.model = None - self.tokenizer = None - self.device = None - self.scenarios = {} # Map of scenario_name -> BenchmarkScenario - - @abstractmethod - def create_scenarios(self, **kwargs) -> dict[str, "BenchmarkScenario"]: - """Create and return a dictionary of benchmark scenarios.""" - pass - - @abstractmethod - def setup_model(self, config: BenchmarkConfig) -> None: - """Setup the model for benchmarking with the given configuration.""" - pass - - @abstractmethod - def cleanup_model(self) -> None: - """Cleanup model resources.""" - pass - - @abstractmethod - def measure_time_to_first_token(self, config: BenchmarkConfig) -> float: - """Measure time to first token generation.""" - pass - - @abstractmethod - def measure_latency(self, config: BenchmarkConfig) -> TimingResult: - """Measure full generation latency and compute tokens/sec.""" - pass - - def prepare_inputs(self, config: BenchmarkConfig) -> Any: - """Prepare inputs for the model. Override if needed.""" - return None - - def get_scenarios(self, **kwargs) -> dict[str, "BenchmarkScenario"]: - """Get benchmark scenarios. Creates them if they don't exist.""" - if not self.scenarios: - self.scenarios = self.create_scenarios(**kwargs) - return self.scenarios - - -class ModelBenchmark(AbstractModelBenchmark): - """ - Base class for HuggingFace Transformers model benchmarks. - - This class provides common scenario creation logic and handles the standard - patterns for eager, compiled, and kernelized execution variants with different - attention implementations and SDPA backends. - """ - - def __init__(self, logger: logging.Logger): - super().__init__(logger) - self.inputs = None - self.compiled_model = None - self.past_key_values = None - self.config = None - self._default_prompt = "Why dogs are so cute?" - - @property - def default_prompt(self) -> str: - """Default prompt for text generation. Override in subclasses if needed.""" - return self._default_prompt - - def get_attention_configs(self, include_sdpa_variants: bool = True) -> list[dict[str, Any]]: - """ - Get attention implementation configurations. - - Args: - include_sdpa_variants: Whether to include SDPA backend variants - - Returns: - List of attention configuration dictionaries - """ - attention_configs = [ - {"attn_implementation": "eager", "sdpa_backends": [None], "desc_suffix": " with eager attention"}, - ] - - # Add SDPA variants if requested - if include_sdpa_variants: - attention_configs.append( - { - "attn_implementation": "sdpa", - "sdpa_backends": [None, "math", "flash_attention", "efficient_attention"], - "desc_suffix": "", - } - ) - - return attention_configs - - def get_scenario_configs(self) -> list[dict[str, Any]]: - """ - Get base scenario configurations. Override in subclasses to customize. - - Returns: - List of scenario configuration dictionaries - """ - return [ - # Eager variants - {"variant": "eager", "compile_mode": None, "use_cache": True, "description": "Eager execution with cache"}, - # Compiled variants - { - "variant": "compiled", - "compile_mode": "max-autotune", - "use_cache": True, - "description": "Compiled with max autotune", - }, - # Kernelized variant (if available) - { - "variant": "kernelized", - "compile_mode": "max-autotune", - "use_cache": True, - "description": "Kernelized execution", - }, - ] - - def _is_kernelization_available(self) -> bool: - """Check if kernelization is available. Override in subclasses.""" - try: - from kernels import Mode, kernelize # noqa: F401 - - return True - except ImportError: - return False - - def get_default_generation_config(self) -> dict[str, Any]: - """Get default generation configuration. Override in subclasses for model-specific defaults.""" - return {"do_sample": False, "top_p": 1.0, "temperature": 1.0} - - def get_model_init_kwargs(self, config: BenchmarkConfig) -> dict[str, Any]: - """Get model initialization kwargs. Override in subclasses for model-specific parameters.""" - return {"torch_dtype": getattr(torch, config.torch_dtype), "attn_implementation": config.attn_implementation} - - def get_default_torch_dtype(self) -> str: - """Get default torch dtype. Override in subclasses.""" - return "float16" - - def get_default_device(self) -> str: - """Get default device. Override in subclasses.""" - return "cuda" - - def create_scenarios(self, **kwargs) -> dict[str, "BenchmarkScenario"]: - """Create benchmark scenarios for HuggingFace models.""" - scenarios = {} - - # Extract parameters with model-specific defaults - model_id = kwargs.get("model_id", "microsoft/DialoGPT-medium") - warmup_iterations = kwargs.get("warmup_iterations", 3) - measurement_iterations = kwargs.get("measurement_iterations", 5) - num_tokens_to_generate = kwargs.get("num_tokens_to_generate", 100) - include_sdpa_variants = kwargs.get("include_sdpa_variants", True) - device = kwargs.get("device", self.get_default_device()) - torch_dtype = kwargs.get("torch_dtype", self.get_default_torch_dtype()) - batch_size = kwargs.get("batch_size", 1) - - # Get configurations - attention_configs = self.get_attention_configs(include_sdpa_variants) - scenario_configs = self.get_scenario_configs() - - # Create scenarios for each attention config and variant combination - for attn_config in attention_configs: - attn_implementation = attn_config["attn_implementation"] - sdpa_backends = attn_config["sdpa_backends"] - desc_suffix = attn_config["desc_suffix"] - - for scenario_config in scenario_configs: - for sdpa_backend in sdpa_backends: - # Skip kernelized if not available - if scenario_config["variant"] == "kernelized" and not self._is_kernelization_available(): - continue - - # Create unique config for this scenario - config = BenchmarkConfig( - name=scenario_config["variant"], - model_id=model_id, - variant=scenario_config["variant"], - compile_mode=scenario_config["compile_mode"], - use_cache=scenario_config["use_cache"], - warmup_iterations=warmup_iterations, - measurement_iterations=measurement_iterations, - num_tokens_to_generate=num_tokens_to_generate, - device=device, - torch_dtype=torch_dtype, - batch_size=batch_size, - attn_implementation=attn_implementation, - sdpa_backend=sdpa_backend if attn_implementation == "sdpa" else None, - ) - - # Create scenario name - scenario_name_parts = [scenario_config["variant"]] - if scenario_config["compile_mode"]: - scenario_name_parts.append(f"compile_{scenario_config['compile_mode']}") - - # Add attention implementation to name - if attn_implementation == "eager": - scenario_name_parts.append("eager_attn") - elif attn_implementation == "sdpa": - if sdpa_backend: - scenario_name_parts.append(f"sdpa_{sdpa_backend}") - else: - scenario_name_parts.append("sdpa_default") - - scenario_name = "_".join(scenario_name_parts) - - # Create description - description = scenario_config["description"] - if attn_implementation == "sdpa" and sdpa_backend: - description += f" with SDPA {sdpa_backend} backend" - elif attn_implementation == "sdpa": - description += " with SDPA default backend" - else: - description += desc_suffix - - # Create scenario - scenario = BenchmarkScenario(name=scenario_name, config=config, description=description) - - # Add setup callbacks based on variant - if scenario_config["variant"] == "compiled": - scenario.add_setup_callback(self._setup_compilation_callback) - elif scenario_config["variant"] == "kernelized": - scenario.add_setup_callback(self._setup_kernelization_callback) - - scenarios[scenario_name] = scenario - - return scenarios - - def _setup_compilation_callback(self, model, tokenizer, config, logger): - """Setup callback for compilation scenarios.""" - if logger: - logger.info(f"Setting up compilation with mode: {config.compile_mode}") - - # Perform torch.compile - if config.compile_mode is not None: - self.compiled_model = torch.compile(model, mode=config.compile_mode, **config.compile_options) - else: - self.compiled_model = torch.compile(model, **config.compile_options) - - # Setup static cache for compiled mode if needed - if config.use_cache and hasattr(self, "inputs") and self.inputs is not None: - self._setup_static_cache(config) - - def _setup_kernelization_callback(self, model, tokenizer, config, logger): - """Setup callback for kernelization scenarios.""" - if logger: - logger.info("Setting up kernelization") - - try: - from kernels import Mode, kernelize - - self.compiled_model = kernelize(model, mode=Mode.INFERENCE) - except Exception as e: - if logger: - logger.warning(f"Failed to setup kernelized mode: {e}") - logger.warning("Falling back to eager mode") - config.variant = "eager" - - def _setup_static_cache(self, config: BenchmarkConfig): - """Setup static cache for compiled models. Override if needed.""" - if hasattr(self, "inputs") and self.inputs is not None: - try: - from transformers import StaticCache - - seq_length = self.inputs["input_ids"].shape[1] - - # Get the actual device the model is on - if hasattr(self.model, "device"): - cache_device = self.model.device - else: - cache_device = self.device - - self.past_key_values = StaticCache( - config=self.model.config, - max_batch_size=config.batch_size, - max_cache_len=seq_length + config.num_tokens_to_generate, - device=cache_device, - dtype=getattr(torch, config.torch_dtype), - ) - self.logger.debug(f"StaticCache created on device: {cache_device}") - except (ImportError, TypeError) as e: - # StaticCache not available or incompatible, continue without it - self.logger.debug(f"StaticCache setup failed: {e}, continuing without cache") - self.past_key_values = None - - def setup_model(self, config: BenchmarkConfig) -> None: - """Setup the HuggingFace model for benchmarking with the given configuration.""" - - self.logger.info(f"Setting up model: {config.model_id} with variant: {config.variant}") - self.device = config.device - self.config = config - - # Load model and tokenizer - self._load_model_and_tokenizer(config) - - # Prepare inputs - self._prepare_model_inputs(config) - - # Configure generation settings - self._configure_generation(config) - - self.logger.info("Model setup complete") - - def _load_model_and_tokenizer(self, config: BenchmarkConfig): - """Load the model and tokenizer. Override in subclasses for custom loading.""" - - from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig - - # Load tokenizer - self.tokenizer = AutoTokenizer.from_pretrained(config.model_id) - if self.tokenizer.pad_token is None: - self.tokenizer.pad_token = self.tokenizer.eos_token - - # Prepare generation config - generation_config_dict = self.get_default_generation_config() - gen_config = GenerationConfig(**generation_config_dict) - - # Load model - self.logger.info("Loading model...") - - target_device = config.device - # Get model initialization kwargs - model_init_kwargs = self.get_model_init_kwargs(config) - model_init_kwargs.update({"generation_config": gen_config}) - - self.model = AutoModelForCausalLM.from_pretrained(config.model_id, **model_init_kwargs).eval() - - # Move model to target device - self.logger.info(f"Moving model to device: {target_device}") - self.model.to(target_device) - self.device = target_device # Update device to match actual device used - - def _prepare_model_inputs(self, config: BenchmarkConfig): - """Prepare model inputs. Override in subclasses for custom inputs.""" - # Prepare inputs - self.inputs = self.tokenizer(self.default_prompt, return_tensors="pt") - - # Move inputs to the same device as the model - if hasattr(self.model, "device"): - # Model is on a single device - model_device = self.model.device - else: - # Model might be distributed, use self.device which was set during model loading - model_device = self.device - - self.inputs = {k: v.to(model_device) for k, v in self.inputs.items()} - self.logger.debug(f"Moved inputs to device: {model_device}") - - def _configure_generation(self, config: BenchmarkConfig): - """Configure generation settings.""" - seq_length = self.inputs["input_ids"].shape[1] - self.model.generation_config.max_length = seq_length + config.num_tokens_to_generate - - def cleanup_model(self) -> None: - """Cleanup model resources.""" - if hasattr(self, "model") and self.model is not None: - del self.model - self.model = None - if hasattr(self, "compiled_model") and self.compiled_model is not None: - del self.compiled_model - self.compiled_model = None - if hasattr(self, "tokenizer") and self.tokenizer is not None: - del self.tokenizer - self.tokenizer = None - if hasattr(self, "past_key_values") and self.past_key_values is not None: - del self.past_key_values - self.past_key_values = None - - # Clear CUDA cache - flush_memory() - - def measure_time_to_first_token(self, config: BenchmarkConfig) -> float: - """Measure time to first token generation.""" - model_to_use = self.compiled_model if self.compiled_model is not None else self.model - - # Prepare generation kwargs - generation_kwargs = self._get_generation_kwargs(config, max_new_tokens=1) - - # Use CUDA timer for high-precision measurement - with ArchAwareTimer(device=config.device) as timer: - # Use SDPA context if specified - with SDPAContext(config.sdpa_backend, self.logger): - with torch.no_grad(): - _ = model_to_use.generate(**generation_kwargs) - - return timer.elapsed_time() - - def measure_latency(self, config: BenchmarkConfig) -> TimingResult: - """Measure full generation latency and compute tokens/sec.""" - model_to_use = self.compiled_model if self.compiled_model is not None else self.model - - # Prepare generation kwargs - generation_kwargs = self._get_generation_kwargs(config, max_new_tokens=config.num_tokens_to_generate) - - # Use CUDA timer for high-precision measurement - with ArchAwareTimer(device=config.device) as timer: - # Use SDPA context if specified - with SDPAContext(config.sdpa_backend, self.logger): - with torch.no_grad(): - outputs = model_to_use.generate(**generation_kwargs) - - # Calculate metrics - latency = timer.elapsed_time() - input_length = self.inputs["input_ids"].shape[1] - output_length = outputs.shape[1] - tokens_generated = output_length - input_length - - tokens_per_second = tokens_generated / latency if latency > 0 else 0 - time_per_output_token = latency / tokens_generated if tokens_generated > 0 else None - - return TimingResult( - latency_seconds=latency, - tokens_per_second=tokens_per_second, - time_per_output_token_seconds=time_per_output_token, - total_tokens_generated=tokens_generated, - metadata={ - "input_length": input_length, - "output_length": output_length, - "variant": config.variant, - "compile_mode": config.compile_mode, - "attn_implementation": config.attn_implementation, - "sdpa_backend": config.sdpa_backend, - }, - ) - - def _get_generation_kwargs(self, config: BenchmarkConfig, max_new_tokens: int) -> dict[str, Any]: - """Get generation kwargs. Override in subclasses for custom generation.""" - generation_config_dict = self.get_default_generation_config() - generation_kwargs = { - **self.inputs, - "max_new_tokens": max_new_tokens, - "do_sample": generation_config_dict.get("do_sample", False), - "temperature": generation_config_dict.get("temperature", 1.0), - "top_p": generation_config_dict.get("top_p", 1.0), - "pad_token_id": self.tokenizer.pad_token_id, - } - - # Handle static cache for compiled models - if self.past_key_values is not None and config.variant == "compiled": - try: - from transformers import StaticCache - - # Reset cache for each measurement - seq_length = self.inputs["input_ids"].shape[1] - - # Get the actual device the model is on - if hasattr(self.model, "device"): - cache_device = self.model.device - else: - cache_device = self.device - - fresh_cache = StaticCache( - config=self.model.config, - max_batch_size=config.batch_size, - max_cache_len=seq_length + max_new_tokens, - device=cache_device, - dtype=getattr(torch, config.torch_dtype), - ) - generation_kwargs["past_key_values"] = fresh_cache - except (ImportError, TypeError) as e: - self.logger.debug(f"Fresh StaticCache creation failed: {e}") - pass - - return generation_kwargs - - -class BenchmarkRunner: - """Main benchmark runner that coordinates benchmark execution.""" - - def __init__(self, logger: logging.Logger, output_dir: str = "benchmark_results"): - self.logger = logger - self.output_dir = output_dir - os.makedirs(output_dir, exist_ok=True) - - def run_benchmark( - self, - benchmark: ModelBenchmark, - scenarios: dict[str, BenchmarkScenario], - collect_gpu_metrics: bool = True, - commit_id: Optional[str] = None, - ) -> dict[str, dict[str, Any]]: - """ - Run benchmarks using scenarios. - - Args: - benchmark: The benchmark instance to run - scenarios: Dictionary mapping scenario names to BenchmarkScenario instances - collect_gpu_metrics: Whether to collect GPU utilization metrics - commit_id: Git commit ID for metadata (if not provided, will auto-detect from git) - - Returns: - Dictionary mapping scenario names to results with statistics - """ - all_results = {} - - for scenario_name, scenario in scenarios.items(): - self.logger.info(f"Running benchmark scenario: {scenario_name}") - config = scenario.config - - try: - # Setup model for this configuration - benchmark.setup_model(config) - - # Run scenario setup callbacks - scenario.setup(benchmark.model, benchmark.tokenizer, self.logger) - - # Quick validation: try one measurement first to see if this scenario works - try: - flush_memory() - test_result = benchmark.measure_time_to_first_token(config) - if test_result is None or test_result <= 0: - raise ValueError("Invalid measurement result") - except Exception as validation_error: - self.logger.warning(f"Skipping scenario {scenario_name}: validation failed - {validation_error}") - # Clean up and skip this scenario - try: - scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - benchmark.cleanup_model() - except Exception: - pass - continue - - # Collect metadata - metadata = BenchmarkMetadata( - timestamp=datetime.utcnow().isoformat(), - commit_id=commit_id, - hardware_info=get_hardware_info(), - config=config, - ) - - # Initialize GPU monitor - gpu_monitor = None - if collect_gpu_metrics: - gpu_monitor = GPUMonitor(logger=self.logger) - - # Warmup runs - self.logger.info(f"Warming up with {config.warmup_iterations} iterations...") - warmup_failures = 0 - for i in range(config.warmup_iterations): - try: - _ = benchmark.measure_latency(config) - except Exception as e: - warmup_failures += 1 - self.logger.warning(f"Warmup iteration {i + 1} failed: {e}") - - # If more than half the warmup iterations failed, skip this scenario - if warmup_failures > config.warmup_iterations // 2: - self.logger.warning( - f"Skipping scenario {scenario_name}: too many warmup failures ({warmup_failures}/{config.warmup_iterations})" - ) - try: - scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - benchmark.cleanup_model() - except Exception: - pass - continue - - # Start GPU monitoring - if gpu_monitor: - gpu_monitor.start() - - # Measurement runs for latency - self.logger.info(f"Measuring latency with {config.measurement_iterations} iterations...") - latency_measurements = [] - ttft_measurements = [] - tokens_per_sec_measurements = [] - itl_measurements = [] # Inter-Token Latency - measurement_failures = 0 - - for i in range(config.measurement_iterations): - try: - # Measure time to first token - ttft = benchmark.measure_time_to_first_token(config) - ttft_measurements.append(ttft) - - # Measure full latency - timing_result = benchmark.measure_latency(config) - latency_measurements.append(timing_result.latency_seconds) - - if timing_result.tokens_per_second is not None: - tokens_per_sec_measurements.append(timing_result.tokens_per_second) - - if timing_result.time_per_output_token_seconds is not None: - itl_measurements.append(timing_result.time_per_output_token_seconds) - - itl_str = ( - f", itl={timing_result.time_per_output_token_seconds:.4f}s/token" - if timing_result.time_per_output_token_seconds - else "" - ) - self.logger.debug( - f"Iteration {i + 1}: latency={timing_result.latency_seconds:.4f}s, ttft={ttft:.4f}s{itl_str}" - ) - - except Exception as e: - measurement_failures += 1 - self.logger.warning(f"Measurement iteration {i + 1} failed: {e}") - - # Stop GPU monitoring - gpu_metrics = {} - if gpu_monitor: - gpu_metrics = gpu_monitor.stop_and_collect() - - # If we don't have enough successful measurements, skip this scenario - if not latency_measurements or len(latency_measurements) < config.measurement_iterations // 2: - self.logger.warning( - f"Skipping scenario {scenario_name}: insufficient successful measurements ({len(latency_measurements)}/{config.measurement_iterations})" - ) - try: - scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - benchmark.cleanup_model() - except Exception: - pass - continue - - # Calculate statistics - scenario_results = { - "metadata": asdict(metadata), - "measurements": {}, - "gpu_metrics": gpu_metrics, - "scenario_description": scenario.description, - } - - if latency_measurements: - latency_stats = BenchmarkStatistics.from_measurements("latency_seconds", latency_measurements) - scenario_results["measurements"]["latency_seconds"] = asdict(latency_stats) - - if ttft_measurements: - ttft_stats = BenchmarkStatistics.from_measurements( - "time_to_first_token_seconds", ttft_measurements - ) - scenario_results["measurements"]["time_to_first_token_seconds"] = asdict(ttft_stats) - - if tokens_per_sec_measurements: - tps_stats = BenchmarkStatistics.from_measurements( - "tokens_per_second", tokens_per_sec_measurements, "tokens/sec" - ) - scenario_results["measurements"]["tokens_per_second"] = asdict(tps_stats) - - if itl_measurements: - itl_stats = BenchmarkStatistics.from_measurements( - "time_per_output_token_seconds", itl_measurements, "seconds/token" - ) - scenario_results["measurements"]["time_per_output_token_seconds"] = asdict(itl_stats) - - # Log summary - if latency_measurements: - self.logger.info(f"Latency: {latency_stats.mean:.4f}±{latency_stats.std:.4f}s (mean±std)") - if ttft_measurements: - self.logger.info(f"TTFT: {ttft_stats.mean:.4f}±{ttft_stats.std:.4f}s (mean±std)") - if tokens_per_sec_measurements: - self.logger.info(f"Throughput: {tps_stats.mean:.2f}±{tps_stats.std:.2f} tokens/sec (mean±std)") - if itl_measurements: - self.logger.info(f"ITL: {itl_stats.mean:.4f}±{itl_stats.std:.4f}s/token (mean±std)") - - # Add note about partial results if some measurements failed - if measurement_failures > 0: - scenario_results["warnings"] = [f"Some measurements failed ({measurement_failures} failures)"] - self.logger.info(f"Scenario completed with {measurement_failures} measurement failures") - - # Run scenario teardown callbacks - scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - - # Cleanup model - benchmark.cleanup_model() - - all_results[scenario_name] = scenario_results - - except Exception as e: - self.logger.warning(f"Skipping scenario {scenario_name}: setup failed - {e}") - import traceback - - self.logger.debug(traceback.format_exc()) - - # Try to clean up if possible - try: - scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - benchmark.cleanup_model() - except Exception: - pass - # Skip storing failed scenarios - just continue to the next one - finally: - try: - scenario.teardown(benchmark.model, benchmark.tokenizer, self.logger) - benchmark.cleanup_model() - except Exception as cleanup_error: - self.logger.warning(f"Cleanup failed for scenario {scenario_name}: {cleanup_error}") - - flush_memory() - - return all_results - - def save_results(self, model_name: str, results: dict[str, dict[str, Any]]) -> str: - """Save benchmark results to JSON file.""" - # Create model-specific subdirectory - model_dir = os.path.join(self.output_dir, model_name) - os.makedirs(model_dir, exist_ok=True) - - # Create filename with timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"{model_name}_benchmark_{timestamp}.json" - filepath = os.path.join(model_dir, filename) - - # Prepare output structure - output_data = {"model_name": model_name, "benchmark_scenarios": []} - - for config_name, config_results in results.items(): - scenario = { - "scenario_name": config_name, - "metadata": config_results["metadata"], - "measurements": config_results["measurements"], - "gpu_metrics": config_results.get("gpu_metrics", {}), - } - output_data["benchmark_scenarios"].append(scenario) - - # Save to JSON file - with open(filepath, "w") as f: - json.dump(output_data, f, indent=2, default=str) - - self.logger.info(f"Results saved to {filepath}") - return filepath diff --git a/benchmark_v2/framework/benchmark_config.py b/benchmark_v2/framework/benchmark_config.py new file mode 100644 index 000000000000..c1754d8aa80c --- /dev/null +++ b/benchmark_v2/framework/benchmark_config.py @@ -0,0 +1,218 @@ +import hashlib +import json +import logging +from typing import Any, Optional + + +KERNELIZATION_AVAILABLE = False +try: + from kernels import Mode, kernelize # noqa: F401 + + KERNELIZATION_AVAILABLE = True +except ImportError: + pass + +logger = logging.getLogger(__name__) + + +class BenchmarkConfig: + """Configuration for a single benchmark scenario.""" + + def __init__( + self, + warmup_iterations: int = 5, + measurement_iterations: int = 20, + gpu_monitoring: bool = False, # False by default because it slows down the benchmark by a lot + batch_size: int = 1, + sequence_length: int = 128, + num_tokens_to_generate: int = 128, + attn_implementation: str = "eager", + sdpa_backend: Optional[str] = None, + compile_mode: Optional[str] = None, + compile_options: Optional[dict[str, Any]] = None, + kernelize: bool = False, + name: Optional[str] = None, + skip_validity_check: bool = False, + ) -> None: + # Benchmark parameters + self.warmup_iterations = warmup_iterations + self.measurement_iterations = measurement_iterations + self.gpu_monitoring = gpu_monitoring + # Input parameters + self.batch_size = batch_size + self.sequence_length = sequence_length + self.num_tokens_to_generate = num_tokens_to_generate + # Generation parameters + self.attn_implementation = attn_implementation + self.sdpa_backend = sdpa_backend + # Optimization parameters + self.compile_mode = compile_mode + self.compile_options = compile_options if compile_options is not None else {} + self.kernelize = kernelize + # Constant parameters + self.dtype = "torch.bfloat16" + self.device = "cuda" + + self.check_validity(skip_validity_check) + self.name = name if name is not None else self.infer_name() + + def check_validity(self, skip_validity_check: bool = False) -> None: + if skip_validity_check: + return + # Flash attention does not support compile mode, so we turn it off # FIXME: it would be better to support it + is_fa = self.attn_implementation == "flash_attention_2" + is_fa |= self.attn_implementation == "sdpa" and self.sdpa_backend == "flash_attention" + if is_fa: + logger.warning("Flash attention does not support compile mode. Turning off compile mode.") + self.compile_mode = None + + @property + def hash(self) -> str: + return hashlib.sha256(json.dumps(self.to_dict()).encode()).hexdigest() + + def infer_name(self, compact: bool = True) -> str: + """Infer a human-readable name for the benchmark config, either compact or verbose.""" + if compact: + iter_str = f"w{self.warmup_iterations}_i{self.measurement_iterations}" + gpu_monitor_str = "monitored" if self.gpu_monitoring else "unmonitored" + dimensions_str = f"b{self.batch_size}_s{self.sequence_length}_n{self.num_tokens_to_generate}" + attn_code = self.attn_implementation + attn_code += f"_{self.sdpa_backend}" if self.attn_implementation == "sdpa" else "" + compile_str = f"compiled_{self.compile_mode}" if self.compile_mode is not None else "uncompiled" + kernelize_str = "kernelized" if self.kernelize else "unkernelized" + sep = "-" + else: + iter_str = f"{self.warmup_iterations} warmup, {self.measurement_iterations} iterations" + gpu_monitor_str = ("with" if self.gpu_monitoring else "no") + " GPU monitoring" + dimensions_str = f"batch size {self.batch_size}, sequence length {self.sequence_length}, {self.num_tokens_to_generate} generated tokens" + attn_code = f"{self.attn_implementation} attention" + attn_code += f" with {self.sdpa_backend} backend" if self.attn_implementation == "sdpa" else "" + compile_str = "compiled" if self.compile_mode is not None else "not compiled" + kernelize_str = "kernelized" if self.kernelize else "not kernelized" + sep = ", " + return sep.join([iter_str, gpu_monitor_str, dimensions_str, attn_code, compile_str, kernelize_str]) + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "warmup_iterations": self.warmup_iterations, + "measurement_iterations": self.measurement_iterations, + "gpu_monitoring": self.gpu_monitoring, + "batch_size": self.batch_size, + "sequence_length": self.sequence_length, + "num_tokens_to_generate": self.num_tokens_to_generate, + "attn_implementation": self.attn_implementation, + "sdpa_backend": self.sdpa_backend, + "compile_mode": self.compile_mode, + "compile_options": self.compile_options, + "kernelize": self.kernelize, + } + + @classmethod + def from_dict(cls, data: dict[str, Any], skip_validity_check: bool = False) -> "BenchmarkConfig": + return cls( + warmup_iterations=data.get("warmup_iterations", 5), + measurement_iterations=data.get("measurement_iterations", 20), + gpu_monitoring=data.get("gpu_monitoring", False), + batch_size=data.get("batch_size", 1), + sequence_length=data.get("sequence_length", 128), + num_tokens_to_generate=data.get("num_tokens_to_generate", 128), + attn_implementation=data.get("attn_implementation", "eager"), + sdpa_backend=data.get("sdpa_backend"), + compile_mode=data.get("compile_mode"), + compile_options=data.get("compile_options"), + kernelize=data.get("kernelize", False), + name=data.get("name"), + skip_validity_check=skip_validity_check, + ) + + +def cross_generate_configs( + attn_impl_and_sdpa_backend: list[tuple[str, Optional[str]]], + compiled_mode: list[Optional[str]], + kernelized: list[bool], + warmup_iterations: int = 5, + measurement_iterations: int = 20, + batch_size: int = 1, + sequence_length: int = 128, + num_tokens_to_generate: int = 128, + gpu_monitoring: bool = False, # this slows down the benchmark by a lot so we disable it by default +) -> list[BenchmarkConfig]: + # Create kwargs common to all configs + kwargs = { + "warmup_iterations": warmup_iterations, + "measurement_iterations": measurement_iterations, + "batch_size": batch_size, + "sequence_length": sequence_length, + "num_tokens_to_generate": num_tokens_to_generate, + "gpu_monitoring": gpu_monitoring, + } + # Cross-generate all combinations of attn_implementation, compiled_mode, and kernelized + configs = [] + for attn_implementation, sdpa_backend in list(dict.fromkeys(attn_impl_and_sdpa_backend)): + for cm in list(dict.fromkeys(compiled_mode)): + for kernelize_on in list(dict.fromkeys(kernelized)): + config = BenchmarkConfig( + attn_implementation=attn_implementation, + sdpa_backend=sdpa_backend, + compile_mode=cm, + kernelize=kernelize_on, + **kwargs, + ) + configs.append(config) + return configs + + +def generate_all_configs( + warmup_iterations: int = 5, + measurement_iterations: int = 20, + batch_size: int = 1, + sequence_length: int = 128, + num_tokens_to_generate: int = 128, + gpu_monitoring: bool = False, +) -> list[BenchmarkConfig]: + all_attn_implementations = [ + ("flash_attention_2", None), + ("eager", None), + ("sdpa", "math"), + ("sdpa", "flash_attention"), + ("flex_attention", None), + ] + return cross_generate_configs( + attn_impl_and_sdpa_backend=all_attn_implementations, + compiled_mode=[None, "default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"], + kernelized=[False, KERNELIZATION_AVAILABLE], + warmup_iterations=warmup_iterations, + measurement_iterations=measurement_iterations, + batch_size=batch_size, + sequence_length=sequence_length, + num_tokens_to_generate=num_tokens_to_generate, + gpu_monitoring=gpu_monitoring, + ) + + +def generate_default_configs( + warmup_iterations: int = 5, + measurement_iterations: int = 20, + batch_size: int = 1, + sequence_length: int = 128, + num_tokens_to_generate: int = 128, + gpu_monitoring: bool = False, +) -> list[BenchmarkConfig]: + all_attn_implementations = [ + ("flash_attention_2", None), + ("eager", None), + ("sdpa", "math"), + ("sdpa", "flash_attention"), # note: this one can fail with compile because of attn mask + ] + return cross_generate_configs( + attn_impl_and_sdpa_backend=all_attn_implementations, + compiled_mode=[None, "max-autotune"], + kernelized=[False, KERNELIZATION_AVAILABLE], + warmup_iterations=warmup_iterations, + measurement_iterations=measurement_iterations, + batch_size=batch_size, + sequence_length=sequence_length, + num_tokens_to_generate=num_tokens_to_generate, + gpu_monitoring=gpu_monitoring, + ) diff --git a/benchmark_v2/framework/benchmark_runner.py b/benchmark_v2/framework/benchmark_runner.py new file mode 100644 index 000000000000..b5c4796b3fe0 --- /dev/null +++ b/benchmark_v2/framework/benchmark_runner.py @@ -0,0 +1,388 @@ +import gc +import json +import logging +import os +import pathlib +import re +import time +from contextlib import nullcontext +from datetime import datetime +from queue import Queue +from typing import Any, Optional + +import torch +from tqdm import trange + +from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + CompileConfig, + GenerationConfig, + GenerationMixin, +) +from transformers.generation.streamers import BaseStreamer + +from .benchmark_config import BenchmarkConfig +from .data_classes import BenchmarkMetadata, BenchmarkResult, GPURawMetrics, pretty_print_dict +from .hardware_metrics import GPUMonitor + + +try: + from kernels import Mode, kernelize # noqa: F401 +except ImportError: + kernelize = None + Mode = None + + +DEFAULT_PROMPT = "\n".join([ + "The French Revolution was a period of political and societal change in France that began with the Estates General of 1789 and ended with the Coup of 18 Brumaire on 9 November 1799.", + "Many of the revolution's ideas are considered fundamental principles of liberal democracy, and its values remain central to modern French political discourse.", + "It was caused by a combination of social, political, and economic factors which the existing regime proved unable to manage.", + "Financial crisis and widespread social distress led to the convocation of the Estates General in May 1789, its first meeting since 1614.", + "The representatives of the Third Estate broke away and re-constituted themselves as a National Assembly in June.", + "The Storming of the Bastille in Paris on 14 July led to a series of radical measures by the Assembly, including the abolition of feudalism, state control over the Catholic Church in France, and issuing the Declaration of the Rights of Man and of the Citizen.", + "The next three years were dominated by a struggle for political control.", + "King Louis XVI's attempted flight to Varennes in June 1791 further discredited the monarchy, and military defeats after the outbreak of the French Revolutionary Wars in April 1792 led to the insurrection of 10 August 1792.", + "As a result, the monarchy was replaced by the French First Republic in September, followed by the execution of Louis XVI himself in January 1793.", + "After another revolt in June 1793, the constitution was suspended, and political power passed from the National Convention to the Committee of Public Safety, dominated by radical Jacobins led by Maximilien Robespierre.", + "About 16,000 people were sentenced by the Revolutionary Tribunal and executed in the Reign of Terror, which ended in July 1794 with the Thermidorian Reaction.", + "Weakened by external threats and internal opposition, the Committee of Public Safety was replaced in November 1795 by the Directory.", + "Its instability ended in the coup of 18 Brumaire and the establishment of the Consulate, with Napoleon Bonaparte as First Consul.", +]) # fmt: skip + + +def compact_json_numeric_arrays(data: dict): + # Match arrays that contain only numbers (ints/floats), whitespace, commas, and newlines + pattern = r"\[\s*\n\s*((?:\d+(?:\.\d+)?\s*,\s*)*\d+(?:\.\d+)?)\s*\n\s*\]" + + def replace_numeric_array(match): + # Get the array content + content = match.group(1) + # Remove extra whitespace but keep commas + compact_content = re.sub(r"\s+", " ", content).strip() + return f"[{compact_content}]" + + return re.sub(pattern, replace_numeric_array, json.dumps(data, indent=4, default=str), flags=re.DOTALL) + + +def get_git_revision() -> str: + base_path = pathlib.Path(__file__).parent.parent.parent + git_dir = base_path / ".git" + with (git_dir / "HEAD").open("r") as head: + ref = head.readline().split(" ")[-1].strip() + with (git_dir / ref).open("r") as git_hash: + return git_hash.readline().strip() + + +def get_sdpa_backend(backend_name: Optional[str]) -> Optional[torch.nn.attention.SDPBackend]: + """Get the SDPA backend enum from string name.""" + if backend_name is None: + return None + + try: + backend_map = { + "math": torch.nn.attention.SDPBackend.MATH, + "flash_attention": torch.nn.attention.SDPBackend.FLASH_ATTENTION, + "efficient_attention": torch.nn.attention.SDPBackend.EFFICIENT_ATTENTION, + "cudnn_attention": torch.nn.attention.SDPBackend.CUDNN_ATTENTION, + } + return backend_map.get(backend_name.lower()) + except AttributeError: + # torch.nn.attention.SDPBackend not available in older torch versions + return None + + +def flush_memory(): + """Flush GPU memory and run garbage collection.""" + gc.collect() + # Dynamo resets + torch._dynamo.reset() + torch._dynamo.reset_code_caches() + if hasattr(torch._inductor, "codecache"): + # Clear FX graph cache + if hasattr(torch._inductor.codecache, "FxGraphCache"): + torch._inductor.codecache.FxGraphCache.clear() + # Clear PyCodeCache + if hasattr(torch._inductor.codecache, "PyCodeCache"): + torch._inductor.codecache.PyCodeCache.cache_clear() + # Clear TritonFuture cache (for async compilation) + if hasattr(torch._inductor.codecache, "TritonFuture"): + if hasattr(torch._inductor.codecache.TritonFuture, "_compile_cache"): + torch._inductor.codecache.TritonFuture._compile_cache.clear() + # Clear CUDA cache + if torch.cuda.is_available(): + torch.cuda.empty_cache() + torch.cuda.reset_max_memory_allocated() + torch.cuda.reset_peak_memory_stats() + torch.cuda.synchronize() + gc.collect() + + +class BenchmarkStreamer(BaseStreamer): + def __init__(self, **kwargs) -> None: + self.timestamps = [] + self.text_queue = Queue() + + def put(self, value): + """Receives tokens and logs the timestamp of the generation.""" + self.timestamps.append(time.perf_counter()) + + def end(self): + self.timestamps.append(time.perf_counter()) + + def __iter__(self): + return self + + def __next__(self): + value = self.text_queue.get(timeout=self.timeout) + if value == self.stop_signal: + raise StopIteration() + else: + return value + + +class BenchmarkRunner: + """Main benchmark runner that coordinates benchmark execution.""" + + def __init__( + self, logger: logging.Logger, output_dir: str = "benchmark_results", commit_id: Optional[str] = None + ) -> None: + # Those stay constant for the whole run + self.logger = logger + self.output_dir = output_dir + self.commit_id = get_git_revision() if commit_id is None else commit_id + os.makedirs(self.output_dir, exist_ok=True) + self.profile_dir = None + # Attributes that are reset for each model + self._setup_for = "" + # Attributes that are reset for each run + self.model: Optional[GenerationMixin] = None + + def cleanup(self) -> None: + del self.model + self.model = None + flush_memory() + + def setup_one_run(self, model_id: str, config: BenchmarkConfig) -> None: + # Some attributes only need to be set once per model + if self._setup_for != model_id: + self.tokenizer = AutoTokenizer.from_pretrained(model_id) + # We set the EOS token to the padding token for open-ended generation + self.tokenizer.eos_token = self.tokenizer.pad_token + self._setup_for = model_id + + # Prepare inputs + self.inputs = self.tokenizer( + [DEFAULT_PROMPT for _ in range(config.batch_size)], + return_tensors="pt", + max_length=config.sequence_length, + truncation=True, + return_attention_mask=True, + ).to(config.device) + self.inputs["use_cache"] = True + + # Prepare generation config + gen_config = GenerationConfig( + do_sample=False, top_p=1.0, temperature=1.0, max_new_tokens=config.num_tokens_to_generate + ) + + # Prepare compile config + if config.compile_mode is not None: + gen_config.compile_config = CompileConfig(mode=config.compile_mode, options=config.compile_options) + gen_config.cache_implementation = "static" + + # Load model + self.logger.debug(f"Loading model {model_id} on device {config.device}...") + dtype = getattr(torch, config.dtype.removeprefix("torch.")) + self.model = AutoModelForCausalLM.from_pretrained( + model_id, dtype=dtype, attn_implementation=config.attn_implementation, generation_config=gen_config + ) + self.model = self.model.eval().to(config.device) + + # Kernelize the model if needed + if config.kernelize: + self.model = kernelize(self.model, mode=Mode.INFERENCE) + + def run_one_benchmark(self, model_id: str, config: BenchmarkConfig, num_tokens_to_profile: int = 0) -> None: + sdpa_ctx = nullcontext() + if config.attn_implementation == "sdpa": + sdpa_backend = get_sdpa_backend(config.sdpa_backend) + sdpa_ctx = torch.nn.attention.sdpa_kernel(sdpa_backend) + + with sdpa_ctx, torch.no_grad(): + self.logger.info(f"Running benchmark scenario: {config.name}") + + # Quick validation: try one measurement first to see if this scenario works + flush_memory() + e2e_latency, token_generation_times, decoded_output, gpu_metrics = self.time_generate( + max_new_tokens=1, gpu_monitor=None + ) + if e2e_latency < 0: + self.logger.warning(f"Skipping config {config.name}: {e2e_latency = } (no GPU monitoring)") + return None + + # Warmup runs + self.logger.info(f"Warming up with {config.warmup_iterations} iterations...") + for _ in trange(config.warmup_iterations): + _ = self.time_generate(max_new_tokens=config.num_tokens_to_generate) + self.logger.info("Warmup over.") + + # Measurement runs + result = BenchmarkResult() + self.logger.info(f"Benchmarking with {config.measurement_iterations} iterations.") + for _ in trange(config.measurement_iterations): + e2e_latency, token_generation_times, decoded_output, gpu_metrics = self.time_generate( + max_new_tokens=config.num_tokens_to_generate, + gpu_monitor=(GPUMonitor(logger=self.logger) if config.gpu_monitoring else None), + ) + result.accumulate(e2e_latency, token_generation_times, decoded_output, gpu_metrics) + self.logger.info("Benchmarking done. Cleaning up.") + + # Profile if needed + if num_tokens_to_profile > 0: + self.profile_generate(num_tokens_to_profile, config.name) + + return { + "metadata": BenchmarkMetadata(model_id=model_id, commit_id=self.commit_id), + "measurements": result, + "config": config, + } + + def time_generate( + self, + max_new_tokens: int, + gpu_monitor: Optional[GPUMonitor] = None, + ) -> tuple[float, list[float], str, Optional[GPURawMetrics]]: + """Time the latency of a call to model.generate() with the given (inputs) and (max_new_tokens).""" + # Prepare gpu monitoring if needed + if gpu_monitor is not None: + gpu_monitor.start() + # Prepare streamer + streamer = BenchmarkStreamer() + # Generate and time + wall_time_0 = time.perf_counter() + outputs = self.model.generate( + **self.inputs, + max_new_tokens=max_new_tokens, + streamer=streamer, + ) + wall_time_1 = time.perf_counter() + # Stop gpu monitoring if needed + gpu_metrics = gpu_monitor.stop_and_collect() if gpu_monitor is not None else None + # Check if generation had the right number of tokens + input_tokens = self.inputs["input_ids"].size(-1) + batch_size, output_tokens = outputs.shape + new_tokens = output_tokens - input_tokens + if new_tokens != max_new_tokens: + raise RuntimeError(f"Generated {new_tokens} tokens, expected {max_new_tokens}") + # Decode outputs + decoded_output = self.tokenizer.decode(outputs[0, input_tokens:], skip_special_tokens=True) + # Compute intermediate quantities + e2e_latency = wall_time_1 - wall_time_0 + token_generation_times = [t - wall_time_0 for t in streamer.timestamps[1:]] + return e2e_latency, token_generation_times, decoded_output, gpu_metrics + + def profile_generate(self, num_tokens_to_profile: int, config_name: str) -> None: + """Profile the latency of a call to model.generate() with the given (inputs) and (max_new_tokens).""" + profiler = torch.profiler.profile( + activities=[torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA], + record_shapes=True, + ) + with profiler as prof: + _ = self.model.generate( + **self.inputs, + max_new_tokens=num_tokens_to_profile, + ) + if self.profile_dir is None: + self.profile_dir = self.output_dir + "_profiles" + os.makedirs(self.profile_dir, exist_ok=True) + prof.export_chrome_trace(f"{self.profile_dir}/{config_name}.json") + + def run_benchmarks( + self, + model_id: str, + benchmark_configs: list[BenchmarkConfig], + num_tokens_to_profile: int = 0, + pretty_print_summary: bool = True, + ) -> dict[str, Any]: + all_results = {} + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + start_time = time.perf_counter() + + n_configs = len(benchmark_configs) + for i, config in enumerate(benchmark_configs): + # Handle SDPA backend if not determined by the config (needs to be done before skipping duplicates) + if config.attn_implementation == "sdpa" and config.sdpa_backend is None: + default_backend = "flash_attention" # FIXME: torch has a _cur_sdpa_kernel_backends but it fails + self.logger.warning(f"No SDPA backend provided, using {default_backend} instead.") + config.sdpa_backend = default_backend + + # Skip if already run + if config.hash in all_results: + self.logger.info(f"Skipping duplicate config {config.name} for model {model_id} ({i + 1}/{n_configs})") + continue + + # Otherwise, run the benchmark + self.setup_one_run(model_id, config) + self.logger.info( + f"Running benchmark of model {model_id} with scenario: {config.name} ({i + 1}/{n_configs})" + ) + + # Launch benchmark in a try/except block to avoid stopping the whole run if one benchmark fails + try: + results = self.run_one_benchmark(model_id, config, num_tokens_to_profile) + if results is not None: + all_results[config.hash] = results + + except Exception as e: + self.logger.error(f"Error running with scenario: {config.name}:\n{repr(e)}") + # Cleanup model and save results + self.cleanup() + self.save_results(model_id, all_results, timestamp=timestamp) + + if pretty_print_summary: + print() + print("=" * 100) + print(f"Finished benchmarks in {time.perf_counter() - start_time:.2f} seconds") + print(f"Total number of benchmarks: {len(all_results)}") + if len(all_results) > 0: + print("First run metadata:") + first_key = list(all_results.keys())[0] + first_metadata = all_results[first_key]["metadata"].to_dict() + hardware_info = first_metadata.pop("hardware_info") + pretty_print_dict(first_metadata | hardware_info, tabs=1) + for value in all_results.values(): + print("=" * 100) + print(f"Config: {value['config'].infer_name(compact=False)}\n") + value["measurements"].pprint(tabs=1) + print("=" * 100) + + return all_results + + def save_results(self, model_name: str, results: dict, timestamp: str = "") -> str: + """Save benchmark results to JSON file.""" + # Create model-specific subdirectory + model_name = model_name.replace("/", "_") + model_dir = os.path.join(self.output_dir, model_name) + os.makedirs(model_dir, exist_ok=True) + + # Create filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if not timestamp else timestamp + filename = f"{model_name}_benchmark_{timestamp}.json" + filepath = os.path.join(model_dir, filename) + + # Convert results to dict + converted_results = {} + for cfg_hash in results.keys(): + converted_results[cfg_hash] = { + "metadata": results[cfg_hash]["metadata"].to_dict(), + "measurements": results[cfg_hash]["measurements"].to_dict(), + "config": results[cfg_hash]["config"].to_dict(), + } + + # Save to JSON file + with open(filepath, "w") as f: + f.write(compact_json_numeric_arrays(converted_results)) + + self.logger.info(f"Results saved to {filepath}") + return filepath diff --git a/benchmark_v2/framework/data_classes.py b/benchmark_v2/framework/data_classes.py new file mode 100644 index 000000000000..b47119341a7a --- /dev/null +++ b/benchmark_v2/framework/data_classes.py @@ -0,0 +1,152 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Optional, Union + +import numpy as np + +from .hardware_metrics import GPURawMetrics, HardwareInfo + + +def compute_basic_statistics(measurements: list[float]) -> dict[str, float]: + return { + "avg": np.mean(measurements), + "std": np.std(measurements), + "min": np.min(measurements), + "med": np.median(measurements), + "max": np.max(measurements), + "p95": np.percentile(measurements, 95), + } + + +def add_unit_to_duration(stats: dict[str, float]) -> dict[str, str]: + for key in list(stats.keys()): + value = stats[key] + if value > 3600: + stats[key] = f"{(value / 3600):.2f}hr" + elif value > 60: + stats[key] = f"{(value / 60):.2f}min" + elif value > 1: + stats[key] = f"{value:.2f}s" + elif value > 1e-3: + stats[key] = f"{(value * 1e3):.2f}ms" + elif value > 1e-6: + stats[key] = f"{(value * 1e6):.2f}us" + else: + stats[key] = f"{(value * 1e9):.2f}ns" + return stats + + +def equalize_lengths_and_collate(stats: list[dict[str, str]]) -> list[str]: + keys = ["avg", "std", "min", "med", "max", "p95"] + for key in keys: + max_length = max(len(stat[key]) for stat in stats) + for stat in stats: + stat[key] = stat[key].ljust(max_length, " ") + return [" ".join([f"{key}={stat[key]}" for key in keys]) for stat in stats] + + +def pretty_print_dict(data: dict[str, Any], tabs: int = 0) -> None: + max_key_length = max([len(key) for key in data.keys()]) + for key, value in data.items(): + tabs_str = " " * tabs + padded_key = key.ljust(max_key_length + 1, ".") + print(f"{tabs_str}{padded_key}: {value}") + + +@dataclass +class BenchmarkMetadata: + """Metadata collected for each benchmark run.""" + + model_id: str + timestamp: str + commit_id: str + hardware_info: HardwareInfo + + def __init__(self, model_id: str, commit_id: str): + self.model_id = model_id + self.timestamp = datetime.utcnow().isoformat() + self.commit_id = commit_id + self.hardware_info = HardwareInfo() + + def to_dict(self) -> dict[str, Any]: + return { + "timestamp": self.timestamp, + "commit_id": self.commit_id, + "hardware_info": self.hardware_info.to_dict(), + } + + +class BenchmarkResult: + """Result from a series of benchmark runs.""" + + def __init__(self) -> None: + self.e2e_latency = [] + self.token_generation_times = [] # time at which each token was generated (relative to start of the generation) + self.decoded_outputs = [] + self.gpu_metrics = [] + + def accumulate( + self, + e2e_latency: float, + token_generation_times: list[float], + decoded_output: str, + gpu_metrics: Optional[GPURawMetrics], + ) -> None: + self.e2e_latency.append(e2e_latency) + self.token_generation_times.append(token_generation_times) + self.decoded_outputs.append(decoded_output) + self.gpu_metrics.append(gpu_metrics) + + def to_dict(self) -> dict[str, Union[None, int, float]]: + # Save GPU metrics as None if it contains only None values + if all(gm is None for gm in self.gpu_metrics): + gpu_metrics = None + else: + gpu_metrics = [gm.to_dict() for gm in self.gpu_metrics] + return { + "e2e_latency": self.e2e_latency, + "token_generation_times": self.token_generation_times, + "decoded_outputs": self.decoded_outputs, + "gpu_metrics": gpu_metrics, + } + + @classmethod + def from_dict(cls, data: dict[str, Union[None, int, float]]) -> "BenchmarkResult": + # Handle GPU metrics, which is saved as None if it contains only None values + if data["gpu_metrics"] is None: + gpu_metrics = [None for _ in range(len(data["e2e_latency"]))] + else: + gpu_metrics = [GPURawMetrics.from_dict(gm) for gm in data["gpu_metrics"]] + # Create a new instance and accumulate the data + new_instance = cls() + for i in range(len(data["e2e_latency"])): + new_instance.accumulate( + e2e_latency=data["e2e_latency"][i], + token_generation_times=data["token_generation_times"][i], + decoded_output=data["decoded_output"][i], + gpu_metrics=gpu_metrics[i], + ) + return new_instance + + def get_measured_ttft(self) -> list[float]: + return [dt[0] for dt in self.token_generation_times if len(dt) > 0] + + def get_measured_itl(self) -> list[float]: + return [(dt[-1] - dt[0]) / (len(dt) - 1) for dt in self.token_generation_times if len(dt) > 1] + + def pprint(self, tabs: int = 0) -> None: + collated_stats = equalize_lengths_and_collate( + [ + add_unit_to_duration(compute_basic_statistics(self.e2e_latency)), + add_unit_to_duration(compute_basic_statistics(self.get_measured_ttft())), + add_unit_to_duration(compute_basic_statistics(self.get_measured_itl())), + ] + ) + pretty_print_dict( + { + "E2E Latency": collated_stats[0], + "Time to First Token": collated_stats[1], + "Inter-Token Latency": collated_stats[2], + }, + tabs=tabs, + ) diff --git a/benchmark_v2/framework/hardware_metrics.py b/benchmark_v2/framework/hardware_metrics.py new file mode 100644 index 000000000000..d301e5dbc4e3 --- /dev/null +++ b/benchmark_v2/framework/hardware_metrics.py @@ -0,0 +1,172 @@ +import json +import logging +import subprocess +import sys +import threading +import time +from dataclasses import dataclass +from enum import Enum +from logging import Logger +from typing import Optional, Union + +import gpustat +import psutil +import torch + + +# Data class to hold the hardware information +def get_device_name_and_memory_total() -> tuple[str, float]: + """Returns the name and memory total of GPU 0.""" + device_name = torch.cuda.get_device_properties(0).name + device_memory_total = torch.cuda.get_device_properties(0).total_memory / 1024**3 + return device_name, device_memory_total + + +class HardwareInfo: + """A class to hold information about the hardware.""" + + def __init__(self) -> None: + # Retrieve GPU stats + try: + self.gpu_name, self.gpu_memory_total_gb = get_device_name_and_memory_total() + except Exception: + self.gpu_name, self.gpu_memory_total_gb = None, None + # Retrieve python, torch and CUDA version + self.python_version = f"{sys.version.split()[0]}" + self.torch_version = torch.__version__ + if hasattr(torch, "cuda") and torch.cuda.is_available(): + self.cuda_version = torch.version.cuda + else: + self.cuda_version = None + # Retrieve general hardware information + self.cpu_count = psutil.cpu_count() + self.memory_total_mb = int(psutil.virtual_memory().total / (1024 * 1024)) + + def to_dict(self) -> dict[str, Union[None, int, float, str]]: + return { + "gpu_name": self.gpu_name, + "gpu_memory_total_gb": self.gpu_memory_total_gb, + "python_version": self.python_version, + "torch_version": self.torch_version, + } + + +# Functions to get information about the GPU +def get_amd_gpu_stats() -> tuple[int, float]: + """Returns the utilization and memory used of an AMD GPU, both in percent""" + rocm_smi_output = subprocess.check_output(["rocm-smi", "--json", "--showuse", "--showmeminfo", "VRAM"]) + gpu_stats = json.loads(rocm_smi_output.decode("utf-8")) + gpu_stats = [ + (card_id, stats["GPU use (%)"], stats["VRAM Total Used Memory (B)"]) for card_id, stats in gpu_stats.items() + ] + gpu_stats.sort(key=lambda x: x[1], reverse=True) + return int(gpu_stats[0][1]), float(gpu_stats[0][2]) / 1024**3 + + +def get_nvidia_gpu_stats() -> tuple[int, float]: + """Returns the utilization and memory used of an NVIDIA GPU, both in percent""" + gpu_stats = gpustat.GPUStatCollection.new_query() + gpu_stats = gpu_stats[0] + return int(gpu_stats["utilization.gpu"]), float(gpu_stats["memory.used"]) / 1024**3 + + +class GPUStatsCollector: + """A class to get statistics about the GPU. It serves as a wrapper that holds the GPU total memory and its name, + which is used to call the right function to get the utilization and memory used.""" + + def __init__(self) -> None: + self.device_name, self.device_memory_total = get_device_name_and_memory_total() + # Monkey patch the get_utilization_and_memory_used method based on the GPU type + if "amd" in self.device_name.lower(): + self.get_utilization_and_memory_used = get_amd_gpu_stats + elif "nvidia" in self.device_name.lower(): + self.get_utilization_and_memory_used = get_nvidia_gpu_stats + else: + raise RuntimeError(f"Unsupported GPU: {self.device_name}") + + def get_measurements(self) -> tuple[int, float]: + """Get the utilization and memory used of the GPU, both in percent""" + raise NotImplementedError("This method is meant to be monkey patched during __init__") + + +# Simple data classes to hold the raw GPU metrics +class GPUMonitoringStatus(Enum): + """Status of GPU monitoring.""" + + SUCCESS = "success" + FAILED = "failed" + NO_GPUS_AVAILABLE = "no_gpus_available" + NO_SAMPLES_COLLECTED = "no_samples_collected" + + +@dataclass +class GPURawMetrics: + """Raw values for GPU utilization and memory used.""" + + utilization: list[float] # in percent + memory_used: list[float] # in GB + timestamps: list[float] # in seconds + timestamp_0: float # in seconds + monitoring_status: GPUMonitoringStatus + + def to_dict(self) -> dict[str, Union[None, int, float, str]]: + return { + "utilization": self.utilization, + "memory_used": self.memory_used, + "timestamps": self.timestamps, + "timestamp_0": self.timestamp_0, + "monitoring_status": self.monitoring_status.value, + } + + +# Main class, used to monitor the GPU utilization during benchmark execution +class GPUMonitor: + """Monitor GPU utilization during benchmark execution.""" + + def __init__(self, sample_interval_sec: float = 0.1, logger: Optional[Logger] = None): + self.sample_interval_sec = sample_interval_sec + self.logger = logger if logger is not None else logging.getLogger(__name__) + + self.num_available_gpus = torch.cuda.device_count() + if self.num_available_gpus == 0: + raise RuntimeError("No GPUs detected by torch.cuda.device_count().") + self.gpu_stats_getter = GPUStatsCollector() + + def start(self): + """Start monitoring GPU metrics.""" + # Clear the stop event to enable monitoring + self.stop_event = threading.Event() + self.gpu_utilization = [] + self.gpu_memory_used = [] + self.timestamps = [] + self.thread = threading.Thread(target=self._monitor_loop) + self.thread.start() + self.logger.debug("GPU monitoring started") + + def stop_and_collect(self) -> GPURawMetrics: + """Stop monitoring and return collected metrics.""" + self.stop_event.set() + self.thread.join() + if self.gpu_utilization: + timestamp_0 = self.timestamps[0] + metrics = GPURawMetrics( + utilization=self.gpu_utilization, + memory_used=self.gpu_memory_used, + timestamps=[t - timestamp_0 for t in self.timestamps], + timestamp_0=timestamp_0, + monitoring_status=GPUMonitoringStatus.SUCCESS, + ) + self.logger.debug(f"GPU monitoring completed: {len(self.gpu_utilization)} samples collected") + else: + metrics = GPURawMetrics(monitoring_status=GPUMonitoringStatus.NO_SAMPLES_COLLECTED) + return metrics + + def _monitor_loop(self): + """Background monitoring loop using threading.Event for communication.""" + while not self.stop_event.is_set(): + utilization, memory_used = self.gpu_stats_getter.get_utilization_and_memory_used() + self.gpu_utilization.append(utilization) + self.gpu_memory_used.append(memory_used) + self.timestamps.append(time.time()) + if self.stop_event.wait(timeout=self.sample_interval_sec): + break diff --git a/benchmark_v2/run_benchmarks.py b/benchmark_v2/run_benchmarks.py index d04069887f2d..85fb5a9493f5 100755 --- a/benchmark_v2/run_benchmarks.py +++ b/benchmark_v2/run_benchmarks.py @@ -19,477 +19,93 @@ """ import argparse -import importlib.util -import json import logging -import os +import random import sys import uuid -from datetime import datetime -from pathlib import Path -from typing import Any, Optional +from framework.benchmark_config import BenchmarkConfig, generate_all_configs +from framework.benchmark_runner import BenchmarkRunner -def setup_logging(log_level: str = "INFO", enable_file_logging: bool = False) -> logging.Logger: - """Setup logging configuration.""" - numeric_level = getattr(logging, log_level.upper(), None) - if not isinstance(numeric_level, int): - raise ValueError(f"Invalid log level: {log_level}") - - handlers = [logging.StreamHandler(sys.stdout)] - - if enable_file_logging: - handlers.append(logging.FileHandler(f"benchmark_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")) - - logging.basicConfig( - level=numeric_level, format="[%(levelname)s - %(asctime)s] %(name)s: %(message)s", handlers=handlers - ) - - return logging.getLogger(__name__) - - -def discover_benchmarks(benches_dir: str) -> list[dict[str, Any]]: - """ - Discover all benchmark modules in the benches directory. - - Returns: - List of dictionaries containing benchmark module info - """ - benchmarks = [] - benches_path = Path(benches_dir) - - if not benches_path.exists(): - raise FileNotFoundError(f"Benches directory not found: {benches_dir}") - - for py_file in benches_path.glob("*.py"): - if py_file.name.startswith("__"): - continue - - module_name = py_file.stem - - try: - # Import the module - spec = importlib.util.spec_from_file_location(module_name, py_file) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - # Check if it has a benchmark runner function - if hasattr(module, f"run_{module_name}"): - benchmarks.append( - { - "name": module_name, - "path": str(py_file), - "module": module, - "runner_function": getattr(module, f"run_{module_name}"), - } - ) - elif hasattr(module, "run_benchmark"): - benchmarks.append( - { - "name": module_name, - "path": str(py_file), - "module": module, - "runner_function": getattr(module, "run_benchmark"), - } - ) - else: - logging.warning(f"No runner function found in {py_file}") - - except Exception as e: - logging.error(f"Failed to import {py_file}: {e}") - - return benchmarks - - -def run_single_benchmark( - benchmark_info: dict[str, Any], output_dir: str, logger: logging.Logger, **kwargs -) -> Optional[str]: - """ - Run a single benchmark and return the output file path. - - Args: - benchmark_info: Dictionary containing benchmark module info - output_dir: Base output directory - logger: Logger instance - **kwargs: Additional arguments to pass to the benchmark - - Returns: - Path to the output file if successful, None otherwise - """ - benchmark_name = benchmark_info["name"] - runner_func = benchmark_info["runner_function"] - - logger.info(f"Running benchmark: {benchmark_name}") - - try: - # Check function signature to determine what arguments to pass - import inspect - - sig = inspect.signature(runner_func) - - # Prepare arguments based on function signature - func_kwargs = {"logger": logger, "output_dir": output_dir} - - # Add other kwargs if the function accepts them - for param_name in sig.parameters: - if param_name in kwargs: - func_kwargs[param_name] = kwargs[param_name] - - # Filter kwargs to only include parameters the function accepts - # If function has **kwargs, include all provided kwargs - has_var_kwargs = any(param.kind == param.VAR_KEYWORD for param in sig.parameters.values()) - if has_var_kwargs: - valid_kwargs = {**func_kwargs, **kwargs} - else: - valid_kwargs = {k: v for k, v in func_kwargs.items() if k in sig.parameters} - - # Run the benchmark - result = runner_func(**valid_kwargs) - - if isinstance(result, str): - # Function returned a file path - return result - else: - logger.info(f"Benchmark {benchmark_name} completed successfully") - return "completed" - - except Exception as e: - logger.error(f"Benchmark {benchmark_name} failed: {e}") - import traceback - - logger.debug(traceback.format_exc()) - return None - - -def generate_summary_report( - output_dir: str, - benchmark_results: dict[str, Any], - logger: logging.Logger, - benchmark_run_uuid: Optional[str] = None, -) -> str: - """Generate a summary report of all benchmark runs.""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - summary_file = os.path.join(output_dir, f"benchmark_summary_{timestamp}.json") - - summary_data = { - "run_metadata": { - "timestamp": datetime.utcnow().isoformat(), - "benchmark_run_uuid": benchmark_run_uuid, - "total_benchmarks": len(benchmark_results), - "successful_benchmarks": len([r for r in benchmark_results.values() if r is not None]), - "failed_benchmarks": len([r for r in benchmark_results.values() if r is None]), - }, - "benchmark_results": benchmark_results, - "output_directory": output_dir, - } - - with open(summary_file, "w") as f: - json.dump(summary_data, f, indent=2, default=str) - - logger.info(f"Summary report saved to: {summary_file}") - return summary_file - - -def upload_results_to_hf_dataset( - output_dir: str, - summary_file: str, - dataset_name: str, - run_id: Optional[str] = None, - token: Optional[str] = None, - logger: Optional[logging.Logger] = None, -) -> Optional[str]: - """ - Upload benchmark results to a HuggingFace Dataset. - Based on upload_collated_report() from utils/collated_reports.py - Args: - output_dir: Local output directory containing results - summary_file: Path to the summary file - dataset_name: Name of the HuggingFace dataset to upload to - run_id: Unique run identifier (if None, will generate one) - token: HuggingFace token for authentication (if None, will use environment variables) - logger: Logger instance - Returns: - The run_id used for the upload, None if upload failed - """ - if logger is None: - logger = logging.getLogger(__name__) - - import os - - from huggingface_hub import HfApi - - api = HfApi() - - if run_id is None: - github_run_number = os.getenv("GITHUB_RUN_NUMBER") - github_run_id = os.getenv("GITHUB_RUN_ID") - if github_run_number and github_run_id: - run_id = f"{github_run_number}-{github_run_id}" - - date_folder = datetime.now().strftime("%Y-%m-%d") - - github_event_name = os.getenv("GITHUB_EVENT_NAME") - if github_event_name != "schedule": - # Non-scheduled runs go under a runs subfolder - repo_path = f"{date_folder}/runs/{run_id}/benchmark_results" - else: - # Scheduled runs go directly under the date - repo_path = f"{date_folder}/{run_id}/benchmark_results" - - logger.info(f"Uploading benchmark results to dataset '{dataset_name}' at path '{repo_path}'") - - try: - # Upload all files in the output directory - from pathlib import Path - - output_path = Path(output_dir) - - for file_path in output_path.rglob("*"): - if file_path.is_file(): - # Calculate relative path from output_dir - relative_path = file_path.relative_to(output_path) - path_in_repo = f"{repo_path}/{relative_path}" - - logger.debug(f"Uploading {file_path} to {path_in_repo}") - - api.upload_file( - path_or_fileobj=str(file_path), - path_in_repo=path_in_repo, - repo_id=dataset_name, - repo_type="dataset", - token=token, - commit_message=f"Upload benchmark results for run {run_id}", - ) - - logger.info( - f"Successfully uploaded results to: https://huggingface.co/datasets/{dataset_name}/tree/main/{repo_path}" - ) - - return run_id - - except Exception as upload_error: - logger.error(f"Failed to upload results: {upload_error}") - import traceback - - logger.debug(traceback.format_exc()) - return None - - -def main(): - """Main entry point for the benchmarking script.""" - # Generate a unique UUID for this benchmark run - benchmark_run_uuid = str(uuid.uuid4())[:8] - - parser = argparse.ArgumentParser( - description="Run all benchmarks in the ./benches directory", - epilog=""" -Examples: - # Run all available benchmarks - python3 run_benchmarks.py - - # Run with specific model and upload to HuggingFace Dataset - python3 run_benchmarks.py --model-id meta-llama/Llama-2-7b-hf --upload-to-hf username/benchmark-results - - # Run with custom run ID and upload to HuggingFace Dataset - python3 run_benchmarks.py --run-id experiment_v1 --upload-to-hf org/benchmarks - - # Run only specific benchmarks with file logging - python3 run_benchmarks.py --include llama --enable-file-logging - """, # noqa: W293 - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - parser.add_argument( - "--output-dir", - type=str, - default="benchmark_results", - help="Base output directory for benchmark results (default: benchmark_results)", - ) - - parser.add_argument( - "--benches-dir", - type=str, - default="./benches", - help="Directory containing benchmark implementations (default: ./benches)", - ) - - parser.add_argument( - "--log-level", - type=str, - choices=["DEBUG", "INFO", "WARNING", "ERROR"], - default="INFO", - help="Logging level (default: INFO)", - ) +if __name__ == "__main__": + # Parse arguments + parser = argparse.ArgumentParser() + parser.add_argument("--output-dir", type=str, default="benchmark_results", help="Output dir for benchmark results") + parser.add_argument("--log-level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR"], default="INFO") parser.add_argument("--model-id", type=str, help="Specific model ID to benchmark (if supported by benchmarks)") - parser.add_argument("--warmup-iterations", type=int, default=3, help="Number of warmup iterations (default: 3)") + parser.add_argument("--warmup", type=int, default=5, help="Number of warmup iterations") + parser.add_argument("--iterations", type=int, default=20, help="Number of measurement iterations") - parser.add_argument( - "--measurement-iterations", type=int, default=5, help="Number of measurement iterations (default: 5)" - ) - - parser.add_argument( - "--num-tokens-to-generate", - type=int, - default=100, - help="Number of tokens to generate in benchmarks (default: 100)", - ) + parser.add_argument("--batch-size", "-b", type=int, nargs="+", help="Batch size") + parser.add_argument("--sequence-length", "-s", type=int, nargs="+", help="Sequence length") + parser.add_argument("--num-tokens-to-generate", "-n", type=int, nargs="+", help="Number of tokens to generate") - parser.add_argument("--include", type=str, nargs="*", help="Only run benchmarks matching these names") - - parser.add_argument("--exclude", type=str, nargs="*", help="Exclude benchmarks matching these names") - - parser.add_argument("--enable-file-logging", action="store_true", help="Enable file logging (disabled by default)") - - parser.add_argument( - "--commit-id", type=str, help="Git commit ID for metadata (if not provided, will auto-detect from git)" - ) - - parser.add_argument( - "--push-to-hub", - type=str, - help="Upload results to HuggingFace Dataset (provide dataset name, e.g., 'username/benchmark-results')", - ) - - parser.add_argument( - "--run-id", type=str, help="Custom run ID for organizing results (if not provided, will generate a unique ID)" - ) - - parser.add_argument( - "--token", - type=str, - help="HuggingFace token for dataset uploads (if not provided, will use HF_TOKEN environment variable)", - ) + parser.add_argument("--num-tokens-to-profile", "-p", type=int, default=0, help="Number of tokens to profile") + parser.add_argument("--commit-id", type=str, help="Git commit ID (if not provided, will auto-detect from git)") args = parser.parse_args() # Setup logging - logger = setup_logging(args.log_level, args.enable_file_logging) + benchmark_run_uuid = str(uuid.uuid4())[:8] + numeric_level = getattr(logging, args.log_level.upper()) + handlers = [logging.StreamHandler(sys.stdout)] + logging.basicConfig( + level=numeric_level, format="[%(levelname)s - %(asctime)s] %(name)s: %(message)s", handlers=handlers + ) + + logger = logging.getLogger("benchmark_v2") logger.info("Starting benchmark discovery and execution") logger.info(f"Benchmark run UUID: {benchmark_run_uuid}") logger.info(f"Output directory: {args.output_dir}") - logger.info(f"Benches directory: {args.benches_dir}") - - # Create output directory - os.makedirs(args.output_dir, exist_ok=True) - - try: - # Discover benchmarks - benchmarks = discover_benchmarks(args.benches_dir) - logger.info(f"Discovered {len(benchmarks)} benchmark(s): {[b['name'] for b in benchmarks]}") - - if not benchmarks: - logger.warning("No benchmarks found!") - return 1 - - # Filter benchmarks based on include/exclude - filtered_benchmarks = benchmarks - - if args.include: - filtered_benchmarks = [ - b for b in filtered_benchmarks if any(pattern in b["name"] for pattern in args.include) - ] - logger.info(f"Filtered to include: {[b['name'] for b in filtered_benchmarks]}") - if args.exclude: - filtered_benchmarks = [ - b for b in filtered_benchmarks if not any(pattern in b["name"] for pattern in args.exclude) - ] - logger.info(f"After exclusion: {[b['name'] for b in filtered_benchmarks]}") + # Error out if one of the arguments is not provided + if len(args.batch_size) * len(args.sequence_length) * len(args.num_tokens_to_generate) == 0: + raise ValueError( + "At least one of the arguments --batch-size, --sequence-length, or --num-tokens-to-generate is required" + ) - if not filtered_benchmarks: - logger.warning("No benchmarks remaining after filtering!") - return 1 + # If there is only one (batch_size, sequence_length, num_tokens_to_generate), we benchmark across configs + elif len(args.batch_size) * len(args.sequence_length) * len(args.num_tokens_to_generate) == 1: + benchmark_configs = generate_all_configs( + warmup_iterations=args.warmup, + measurement_iterations=args.iterations, + batch_size=args.batch_size[0], + sequence_length=args.sequence_length[0], + num_tokens_to_generate=args.num_tokens_to_generate[0], + ) + random.shuffle(benchmark_configs) - # Prepare common kwargs for benchmarks - benchmark_kwargs = { - "warmup_iterations": args.warmup_iterations, - "measurement_iterations": args.measurement_iterations, - "num_tokens_to_generate": args.num_tokens_to_generate, + # Otherwise, we benchmark across all combinations of dimensions + else: + kwargs = { + "warmup_iterations": args.warmup, + "measurement_iterations": args.iterations, + "gpu_monitoring": False, + "batch_size": args.batch_size[0], + "sequence_length": args.sequence_length[0], + "num_tokens_to_generate": args.num_tokens_to_generate[0], + "attn_implementation": "flex_attention", + "sdpa_backend": None, + "compile_mode": "default", + "kernelize": False, } - - if args.model_id: - benchmark_kwargs["model_id"] = args.model_id - - # Add commit_id if provided - if args.commit_id: - benchmark_kwargs["commit_id"] = args.commit_id - - # Run benchmarks - benchmark_results = {} - successful_count = 0 - - for benchmark_info in filtered_benchmarks: - result = run_single_benchmark(benchmark_info, args.output_dir, logger, **benchmark_kwargs) - - benchmark_results[benchmark_info["name"]] = result - - if result is not None: - successful_count += 1 - - # Generate summary report - summary_file = generate_summary_report(args.output_dir, benchmark_results, logger, benchmark_run_uuid) - - # Upload results to HuggingFace Dataset if requested - upload_run_id = None - if args.push_to_hub: - logger.info("=" * 60) - logger.info("UPLOADING TO HUGGINGFACE DATASET") - logger.info("=" * 60) - # Use provided run_id or fallback to benchmark run UUID - effective_run_id = args.run_id or benchmark_run_uuid - upload_run_id = upload_results_to_hf_dataset( - output_dir=args.output_dir, - summary_file=summary_file, - dataset_name=args.push_to_hub, - run_id=effective_run_id, - token=args.token, - logger=logger, - ) - if upload_run_id: - logger.info(f"Upload completed with run ID: {upload_run_id}") - else: - logger.warning("Upload failed - continuing with local results") - - # Final summary - total_benchmarks = len(filtered_benchmarks) - failed_count = total_benchmarks - successful_count - - logger.info("=" * 60) - logger.info("BENCHMARK RUN SUMMARY") - logger.info("=" * 60) - logger.info(f"Total benchmarks: {total_benchmarks}") - logger.info(f"Successful: {successful_count}") - logger.info(f"Failed: {failed_count}") - logger.info(f"Output directory: {args.output_dir}") - logger.info(f"Summary report: {summary_file}") - - if args.push_to_hub: - if upload_run_id: - logger.info(f"HuggingFace Dataset: {args.push_to_hub}") - logger.info(f"Run ID: {upload_run_id}") - logger.info( - f"View results: https://huggingface.co/datasets/{args.push_to_hub}/tree/main/{datetime.now().strftime('%Y-%m-%d')}/runs/{upload_run_id}" - ) - else: - logger.warning("Upload to HuggingFace Dataset failed") - - if failed_count > 0: - logger.warning(f"{failed_count} benchmark(s) failed. Check logs for details.") - return 1 - else: - logger.info("All benchmarks completed successfully!") - return 0 - - except Exception as e: - logger.error(f"Benchmark run failed: {e}") - import traceback - - logger.debug(traceback.format_exc()) - return 1 - - -if __name__ == "__main__": - sys.exit(main()) + benchmark_configs = [] + for num_tokens_to_generate in args.num_tokens_to_generate: + for sequence_length in args.sequence_length: + for batch_size in args.batch_size: + kwargs["batch_size"] = batch_size + kwargs["sequence_length"] = sequence_length + kwargs["num_tokens_to_generate"] = num_tokens_to_generate + benchmark_configs.append(BenchmarkConfig(**kwargs)) + + runner = BenchmarkRunner(logger, args.output_dir, args.commit_id) + results = runner.run_benchmarks( + args.model_id, + benchmark_configs[:3], + args.num_tokens_to_profile, + pretty_print_summary=True, + ) + # runner.save_results(args.model_id, results)