# Parallel Benchmark Test

Run multiple benchmarks in parallel on separate RunPod GPU instances.

## Features
- **Python API**: Configure benchmarks programmatically
- **Parallel Execution**: Run multiple pods in parallel using `ThreadPoolExecutor`
- **Save Results**: Logs saved to `./benchmark_results/` as `.txt` and `.json`

**Note:** Uses `ThreadPoolExecutor` instead of `multiprocessing.Pool` because:
- Works in Jupyter notebooks on macOS (no pickling issues)
- Benchmark tasks are I/O-bound (API calls), not CPU-bound

In [None]:
import benchmaxxing
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

print(f"benchmaxxing version: {benchmaxxing.__version__}")

## Option 1: Parallel Execution with Config Files

Use separate YAML config files for each benchmark configuration.

In [None]:
# Define config files to run in parallel
config_files = [
    "6_config_1.yaml",  # 1x GPU, TP=1
    "6_config_2.yaml",  # 2x GPU, TP=2
    "6_config_3.yaml",  # 4x GPU, TP=4 and TP=2+DP=2
]

In [None]:
def run_config_file(config_path: str) -> dict:
    """Run benchmark from config file."""
    try:
        return {"config": config_path, "result": benchmaxxing.runpod.bench(config_path)}
    except Exception as e:
        return {"config": config_path, "status": "error", "error": str(e)}


# Run all configs in parallel using ThreadPoolExecutor
results = []
with ThreadPoolExecutor(max_workers=len(config_files)) as executor:
    futures = {executor.submit(run_config_file, cfg): cfg for cfg in config_files}
    for future in as_completed(futures):
        results.append(future.result())

for r in results:
    print(f"{r['config']}: {r.get('result', r.get('error'))}")

## Option 2: Parallel Execution with Python Kwargs

Define benchmark configs as Python dicts. Useful when you want:
- Dynamic configuration at runtime
- Unique `pod_name` per process for identification
- Override specific params while sharing a base config file

In [None]:
# Define multiple benchmark configurations
benchmark_configs = [
    {
        "config_path": "5_python_config.yaml",  # Base config with model/image settings
        "gpu_count": 1,
        "pod_name": "bench_1gpu",
        "context_sizes": [1024, 2048],
        "parallelism_pairs": [
            {"tensor_parallel": 1, "data_parallel": 1, "pipeline_parallel": 1},
        ],
        "save_results": True,
    },
    {
        "config_path": "5_python_config.yaml",
        "gpu_count": 2,
        "pod_name": "bench_2gpu",
        "context_sizes": [1024, 2048, 4096],
        "parallelism_pairs": [
            {"tensor_parallel": 2, "data_parallel": 1, "pipeline_parallel": 1},
        ],
        "save_results": True,
    },
    {
        "config_path": "5_python_config.yaml",
        "gpu_count": 4,
        "pod_name": "bench_4gpu",
        "context_sizes": [1024, 2048, 4096],
        "parallelism_pairs": [
            {"tensor_parallel": 4, "data_parallel": 1, "pipeline_parallel": 1},
            {"tensor_parallel": 2, "data_parallel": 2, "pipeline_parallel": 1},
        ],
        "save_results": True,
    },
]

In [None]:
def run_benchmark(config: dict) -> dict:
    """Run benchmark with kwargs."""
    pod_name = config.get("pod_name", "unknown")
    try:
        return {"pod_name": pod_name, "result": benchmaxxing.runpod.bench(**config)}
    except Exception as e:
        return {"pod_name": pod_name, "status": "error", "error": str(e)}


# Run all benchmarks in parallel (each on separate RunPod pod)
results = []
with ThreadPoolExecutor(max_workers=len(benchmark_configs)) as executor:
    futures = {executor.submit(run_benchmark, cfg): cfg for cfg in benchmark_configs}
    for future in as_completed(futures):
        results.append(future.result())

# Display results
for r in results:
    print(f"{r['pod_name']}: {r.get('result', r.get('error'))}")

## Option 3: Dynamic GPU Scaling Test

Generate configs programmatically for GPU scaling tests.

In [None]:
def create_scaling_config(gpu_count: int) -> dict:
    """Create config for N GPUs with TP=N."""
    return {
        "config_path": "5_python_config.yaml",
        "gpu_count": gpu_count,
        "pod_name": f"scale_{gpu_count}gpu",
        "context_sizes": [1024, 2048, 4096],
        "concurrency": [50, 100],
        "num_prompts": [100],
        "output_len": [128],
        "parallelism_pairs": [
            {"tensor_parallel": gpu_count, "data_parallel": 1, "pipeline_parallel": 1},
        ],
        "save_results": True,
    }


# Test scaling from 1 to 8 GPUs
gpu_counts = [1, 2, 4, 8]
scaling_configs = [create_scaling_config(n) for n in gpu_counts]

print(f"Created {len(scaling_configs)} scaling configs:")
for cfg in scaling_configs:
    print(f"  - {cfg['pod_name']}: {cfg['gpu_count']} GPU(s), TP={cfg['parallelism_pairs'][0]['tensor_parallel']}")

In [None]:
# Run scaling test in parallel
results = []
with ThreadPoolExecutor(max_workers=len(scaling_configs)) as executor:
    futures = {executor.submit(run_benchmark, cfg): cfg for cfg in scaling_configs}
    for future in as_completed(futures):
        results.append(future.result())

print("=== Scaling Test Results ===")
for r in results:
    print(f"{r['pod_name']}: {r.get('result', r.get('error'))}")

## Check Saved Results

Results are saved to `./benchmark_results/` with naming:
```
{name}_TP{tp}_DP{dp}_CTX{ctx}_C{concurrency}_P{prompts}_O{output}.txt
{name}_TP{tp}_DP{dp}_CTX{ctx}_C{concurrency}_P{prompts}_O{output}.json
```

In [None]:
results_dir = "./benchmark_results"

if os.path.exists(results_dir):
    files = sorted(os.listdir(results_dir))
    txt_files = [f for f in files if f.endswith(".txt")]
    json_files = [f for f in files if f.endswith(".json")]
    
    print(f"Found {len(txt_files)} .txt files and {len(json_files)} .json files:\n")
    
    print("TXT files (benchmark logs):")
    for f in txt_files:
        print(f"  {f}")
    
    print("\nJSON files (structured results):")
    for f in json_files:
        print(f"  {f}")
else:
    print(f"No results directory found at {results_dir}")

In [None]:
# Read a sample result file
import json

if os.path.exists(results_dir):
    json_files = [f for f in os.listdir(results_dir) if f.endswith(".json")]
    if json_files:
        sample_file = os.path.join(results_dir, json_files[0])
        with open(sample_file) as f:
            data = json.load(f)
        print(f"Sample result from {json_files[0]}:")
        print(json.dumps(data, indent=2))