diff --git a/perf-changelog.yaml b/perf-changelog.yaml index def63fd87..69d5c7d8a 100644 --- a/perf-changelog.yaml +++ b/perf-changelog.yaml @@ -3171,3 +3171,9 @@ description: - "Validates measured-power aggregation pipeline (PR #1558) on both NVIDIA (H200) and AMD (MI355X) hardware — different SMI tools (nvidia-smi vs amd-smi), different CSV schemas (power.draw [W] vs socket_power), same aggregator. No config change. Entry intentionally kept past merge so run-sweep produces canonical agg JSONs with avg_power_w + joules_per_output_token on main for both vendors, seeding the dashboard's day-zero data." pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1558 + +- config-keys: + - dsv4-fp4-gb300-dynamo-sglang + description: + - "Smoke run validating multinode measured-power aggregation (PR #1574). No config change; entry exists to trigger a sweep that produces the first multinode agg JSON with avg_power_w + joules_per_*_token populated from per-node srt-slurm perfmon CSVs. Validates per-source GPU-id namespacing in aggregate_power.py (without it, 14 nodes × 4 GPUs would report num_gpus=4 instead of 56) and the GPU_METRICS_CSV_GLOB env var bridge in process_result.py. Only the gb300-cw runner has the perfmon launcher changes; any gb300-nv runs in the sweep will succeed normally without power fields, which the dashboard handles gracefully (chart gates on field presence)." + pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1574 diff --git a/runners/launch_gb300-cw.sh b/runners/launch_gb300-cw.sh index 25e7f4db5..9f3222dad 100644 --- a/runners/launch_gb300-cw.sh +++ b/runners/launch_gb300-cw.sh @@ -12,8 +12,13 @@ if [[ $MODEL_PREFIX == "dsv4" && $PRECISION == "fp4" ]]; then export MODEL_PATH="/mnt/vast/models/dsv4" if [[ $FRAMEWORK == "dynamo-sglang" ]]; then - SRT_SLURM_RECIPES_REPO="https://github.com/NVIDIA/srt-slurm.git" - SRT_SLURM_RECIPES_REF="main" + # Pinned to our SemiAnalysisAI fork of NVIDIA/srt-slurm to pick up + # PR #35 (per-node nvidia-smi monitoring during the benchmark sweep) + # ahead of its upstream merge. The branch tracks PR #35's head SHA: + # to bump, re-fetch refs/pull/35/head from NVIDIA/srt-slurm and force- + # push to SemiAnalysisAI/srt-slurm:feat/inferencex-perfmon. + SRT_SLURM_RECIPES_REPO="https://github.com/SemiAnalysisAI/srt-slurm.git" + SRT_SLURM_RECIPES_REF="feat/inferencex-perfmon" SRT_RECIPE_SRC="$GITHUB_WORKSPACE/benchmarks/multi_node/srt-slurm-recipes/sglang/deepseek-v4" SRT_RECIPE_DST="recipes/sglang/deepseek-v4" elif [[ $FRAMEWORK == "dynamo-vllm" ]]; then @@ -106,6 +111,19 @@ git checkout "$SRT_SLURM_RECIPES_REF" mkdir -p "$SRT_RECIPE_DST" cp -rT "$SRT_RECIPE_SRC" "$SRT_RECIPE_DST" +# Enable per-node GPU perfmon (PR #35) on every overlaid recipe. `monitoring` +# is a top-level SrtConfig field and defaults to None, so without this the +# orchestrator's _start_perf_monitor short-circuits and no perf_samples_*.csv +# are ever written — multinode measured-power aggregation would silently +# skip. Idempotent: skips recipes that already declare `monitoring:`. +for recipe in "$SRT_RECIPE_DST"/*.yaml; do + [ -f "$recipe" ] || continue + if ! grep -q '^monitoring:' "$recipe"; then + printf '\nmonitoring:\n enabled: true\n sample_interval: 1.0\n' >> "$recipe" + echo "[perfmon] enabled monitoring in recipe: $recipe" + fi +done + echo "Installing srtctl..." # CRITICAL — uv install location. # Runner pod is x86 but compute nodes are aarch64, and /mnt/home is @@ -279,6 +297,25 @@ else echo "Warning: Logs directory not found at $LOGS_DIR" fi +# Hand the per-node perfmon CSVs off to the downstream "Process result" step +# in benchmark-multinode-tmpl.yml. srt-slurm's perfmon (PR #35) writes +# perf_samples_{node}.csv straight into $LOGS_DIR on the host. process_result.py +# already invokes aggregate_power.run() inline; teaching it to read +# GPU_METRICS_CSV_GLOB lets utils/aggregate_power.py do the multi-CSV +# aggregation (each agg JSON gets avg_power_w / joules_per_*_token patched in +# place). Use an absolute glob because process_result.py runs from +# $GITHUB_WORKSPACE, not from this srt-slurm checkout. +if [ -d "$LOGS_DIR" ]; then + perf_glob_dir="$(pwd)/$LOGS_DIR" + perf_csv_count=$(ls "$perf_glob_dir"/perf_samples_*.csv 2>/dev/null | wc -l | tr -d ' ') + if [ "$perf_csv_count" -gt 0 ]; then + echo "[perfmon] Found $perf_csv_count per-node perf_samples_*.csv under $perf_glob_dir/" + echo "GPU_METRICS_CSV_GLOB=$perf_glob_dir/perf_samples_*.csv" >> "$GITHUB_ENV" + else + echo "[perfmon] WARNING: monitoring enabled but no perf_samples_*.csv found in $perf_glob_dir — measured power aggregation will be skipped" + fi +fi + if [[ "${EVAL_ONLY:-false}" != "true" ]]; then if [ ! -d "$LOGS_DIR" ]; then exit 1 diff --git a/utils/aggregate_power.py b/utils/aggregate_power.py index 3c204085a..ab6fcef3e 100644 --- a/utils/aggregate_power.py +++ b/utils/aggregate_power.py @@ -1,12 +1,19 @@ """Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON. -Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi), -filters samples to the benchmark load window using start/end Unix timestamps -written by benchmark_serving.py, and patches two keys into the aggregated -result JSON consumed by InferenceX-app's ETL: +Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi) +or by srt-slurm's per-node perfmon (multinode), filters samples to the benchmark +load window using start/end Unix timestamps written by benchmark_serving.py, and +patches three keys into the aggregated result JSON consumed by InferenceX-app's +ETL: - avg_power_w: mean per-GPU power draw (W) during the load window - joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens + - joules_per_total_token: same, divided by (input + output) tokens + +Multinode: accepts multiple CSV paths (one per worker node). GPU indices are +namespaced by source CSV stem to avoid the same-index collision across nodes — +e.g. 8 nodes each reporting indices 0..3 would otherwise be miscounted as 4 +total GPUs instead of 32. The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric field in the agg JSON into the `metrics` JSONB column, so no schema migration @@ -14,8 +21,8 @@ Vendor schema detection is regex-based: any timestamp-like column + any column whose name contains "power" (excluding "limit"/"cap"/"max") is picked up. -NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are -handled. +NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version; srt-slurm's +perfmon emits "power_w". All are handled. This script is best-effort. Missing or malformed CSV exits 0 without patching so a monitoring hiccup never breaks the benchmark upload. @@ -25,9 +32,11 @@ import argparse import csv +import glob as glob_module import json import re import sys +from collections.abc import Iterable from datetime import datetime, timezone from pathlib import Path from statistics import mean @@ -109,74 +118,84 @@ def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | No def aggregate_power( - csv_path: Path, + csv_path: Path | Iterable[Path], start_unix: float, end_unix: float, ) -> tuple[float, int] | None: """Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end]. - Returns None if the CSV is missing, empty, has no detectable power column, - or no rows fall in the window. + Accepts either a single Path (single-node case) or an iterable of Paths + (multinode case: one CSV per worker node, all written by srt-slurm's + perfmon). For multi-path inputs, GPU indices are namespaced by source + CSV stem so the distinct-id count reflects the true total — each node + independently reports indices 0..N, and without namespacing the union + would collapse to a single node's worth. + + Returns None if no CSVs are usable, none have a detectable power column, + or no rows fall in the window across all paths. """ - if not csv_path.is_file() or csv_path.stat().st_size == 0: - return None - if end_unix <= start_unix: + paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path) + if not paths or end_unix <= start_unix: return None - try: - with csv_path.open("r", newline="", encoding="utf-8", errors="replace") as f: - reader = csv.DictReader(f, skipinitialspace=True) - header = [c.strip() for c in (reader.fieldnames or [])] - reader.fieldnames = header - timestamp_col, power_col, gpu_col = _detect_columns(header) - if not timestamp_col or not power_col: - return None - - # Group power readings by sample timestamp so per-sample total power - # (sum across GPUs) is computed correctly even if rows are interleaved. - # - # per_sample_row_count is the structural divisor: it's incremented for - # every contributing row regardless of whether a GPU-index column was - # detected. per_sample_gpus / gpu_keys are only populated when gpu_col - # is present and provide the canonical num_gpus via distinct-id count. - # When gpu_col is absent (vendor schema variant whose header doesn't - # match _GPU_INDEX_COL_RE), we fall back to inferring num_gpus from - # the modal row count per timestamp — assuming one row per GPU per - # sample, which is what every SMI tool we've seen actually emits. - per_sample_total: dict[float, float] = {} - per_sample_row_count: dict[float, int] = {} - per_sample_gpus: dict[float, set[str]] = {} - gpu_keys: set[str] = set() - - for row in reader: - ts_raw = (row.get(timestamp_col) or "").strip() - pw_raw = (row.get(power_col) or "").strip() - ts = _parse_timestamp(ts_raw) - pw = _parse_power(pw_raw) - if ts is None or pw is None: - continue - if ts < start_unix or ts > end_unix: + # Only namespace when there are multiple sources — keeps single-node + # gpu_keys identical to the pre-multinode behavior so existing callers + # see the same num_gpus values. + namespace = len(paths) > 1 + + # Per-sample state accumulates across ALL paths. Bucketed by ms-rounded + # timestamp so nodes whose clocks drift sub-ms still end up in the same + # bucket (they reliably do — all sample on `time.sleep(interval)` against + # the same NTP-synced cluster clock). + per_sample_total: dict[float, float] = {} + per_sample_row_count: dict[float, int] = {} + per_sample_gpus: dict[float, set[str]] = {} + gpu_keys: set[str] = set() + saw_gpu_col = False + + for path in paths: + if not path.is_file() or path.stat().st_size == 0: + continue + try: + with path.open("r", newline="", encoding="utf-8", errors="replace") as f: + reader = csv.DictReader(f, skipinitialspace=True) + header = [c.strip() for c in (reader.fieldnames or [])] + reader.fieldnames = header + timestamp_col, power_col, gpu_col = _detect_columns(header) + if not timestamp_col or not power_col: continue - # Bucket by sample timestamp (rounded to ms to absorb sub-ms drift). - bucket = round(ts, 3) - per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw - per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1 if gpu_col: - gpu_id = (row.get(gpu_col) or "").strip() - if gpu_id: - per_sample_gpus.setdefault(bucket, set()).add(gpu_id) - gpu_keys.add(gpu_id) - except (OSError, csv.Error): - return None + saw_gpu_col = True + + for row in reader: + ts_raw = (row.get(timestamp_col) or "").strip() + pw_raw = (row.get(power_col) or "").strip() + ts = _parse_timestamp(ts_raw) + pw = _parse_power(pw_raw) + if ts is None or pw is None: + continue + if ts < start_unix or ts > end_unix: + continue + bucket = round(ts, 3) + per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw + per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1 + if gpu_col: + gpu_id = (row.get(gpu_col) or "").strip() + if gpu_id: + ns_id = f"{path.stem}:{gpu_id}" if namespace else gpu_id + per_sample_gpus.setdefault(bucket, set()).add(ns_id) + gpu_keys.add(ns_id) + except (OSError, csv.Error): + continue if not per_sample_total: return None # Per-sample divisor and overall num_gpus. - # - If a GPU column was detected, trust distinct GPU IDs (correct for any - # sampling pattern, including hot-swap or partial visibility). - # - Otherwise, infer from row count (one row per GPU per sample). - if gpu_col and gpu_keys: + # - If any path exposed a GPU column, trust distinct (namespaced) GPU IDs. + # - Otherwise, infer from row count (one row per GPU per sample, summed + # across all paths' rows that fell into the same timestamp bucket). + if saw_gpu_col and gpu_keys: num_gpus = len(gpu_keys) per_sample_mean_per_gpu = [ total / max(len(per_sample_gpus.get(ts, ())), 1) @@ -194,7 +213,16 @@ def _load_bench_window( bench_result_path: Path, ) -> tuple[float, float, float, int, int] | None: """Read (start_unix, end_unix, duration_s, total_output_tokens, total_input_tokens) - from the raw bench JSON. Returns None if any required field is missing. + from the raw bench JSON. Returns None if a window cannot be resolved. + + Window resolution order, tried in turn: + 1. benchmark_start_time_unix + benchmark_end_time_unix (our benchmark_serving.py + writes both — single-node, brackets the actual load window exactly). + 2. date + duration (srt-slurm sa-bench writes "YYYYMMDD-HHMMSS" UTC as the + result write time — multinode; treat as bench end and subtract duration + for start. Overshoots by post-bench JSON serialization, typically <5s). + 3. file mtime + duration (last resort if `date` is absent or unparseable — + same end-of-bench proxy as #2 via the result file's mtime). total_input_tokens defaults to 0 if absent (older bench JSONs may not have it); this only degrades joules_per_total_token to equal joules_per_output_token in @@ -204,18 +232,52 @@ def _load_bench_window( bench = json.loads(bench_result_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None - start = bench.get("benchmark_start_time_unix") - end = bench.get("benchmark_end_time_unix") duration = bench.get("duration") total_output = bench.get("total_output_tokens") total_input = bench.get("total_input_tokens", 0) - if not all(isinstance(v, (int, float)) for v in (start, end, duration)): + if not isinstance(duration, (int, float)): return None if not isinstance(total_output, int) or total_output <= 0: return None if not isinstance(total_input, int) or total_input < 0: total_input = 0 - return float(start), float(end), float(duration), int(total_output), int(total_input) + + # Tier 1: explicit Unix timestamps (single-node bench_serving.py). + start = bench.get("benchmark_start_time_unix") + end = bench.get("benchmark_end_time_unix") + if isinstance(start, (int, float)) and isinstance(end, (int, float)): + return float(start), float(end), float(duration), int(total_output), int(total_input) + + # Tier 2: parse `date` field (srt-slurm sa-bench multinode). On observed + # runs the string matches file mtime to the second, confirming it's the + # JSON write time. + date_str = bench.get("date") + if isinstance(date_str, str): + try: + end_dt = datetime.strptime(date_str, "%Y%m%d-%H%M%S").replace(tzinfo=timezone.utc) + end_unix = end_dt.timestamp() + return ( + float(end_unix - duration), + float(end_unix), + float(duration), + int(total_output), + int(total_input), + ) + except ValueError: + pass + + # Tier 3: file mtime as last-resort bench-end proxy. + try: + end_unix = bench_result_path.stat().st_mtime + except OSError: + return None + return ( + float(end_unix - duration), + float(end_unix), + float(duration), + int(total_output), + int(total_input), + ) def patch_agg_result( @@ -234,7 +296,7 @@ def patch_agg_result( tmp_path.replace(agg_path) -def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: +def run(csv_path: Path | Iterable[Path], bench_result: Path, agg_result: Path) -> int: window = _load_bench_window(bench_result) if window is None: print( @@ -244,10 +306,12 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: return 0 start, end, duration, total_output, total_input = window - result = aggregate_power(csv_path, start, end) + paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path) + result = aggregate_power(paths, start, end) if result is None: + label = str(paths[0]) if len(paths) == 1 else f"{len(paths)} CSVs" print( - f"[aggregate_power] No usable power samples in {csv_path} for " + f"[aggregate_power] No usable power samples in {label} for " f"window [{start}, {end}] — skipping", file=sys.stderr, ) @@ -291,11 +355,20 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: def main() -> int: parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) - parser.add_argument( + source = parser.add_mutually_exclusive_group() + source.add_argument( "--csv", type=Path, - default=Path("/workspace/gpu_metrics.csv"), - help="Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)", + default=None, + help="Single gpu_metrics.csv from start_gpu_monitor (single-node). " + "Falls back to /workspace/gpu_metrics.csv when neither --csv nor --csv-glob is set.", + ) + source.add_argument( + "--csv-glob", + type=str, + default=None, + help="Shell glob expanding to per-node perf_samples_*.csv files (multinode, " + "written by srt-slurm's perfmon). GPU indices are namespaced by source CSV stem.", ) parser.add_argument( "--bench-result", @@ -310,7 +383,17 @@ def main() -> int: help="Path to the agg_.json output of process_result.py (will be patched in place)", ) args = parser.parse_args() - return run(args.csv, args.bench_result, args.agg_result) + + if args.csv_glob: + paths = sorted(Path(p) for p in glob_module.glob(args.csv_glob)) + if not paths: + print( + f"[aggregate_power] No CSVs matched glob {args.csv_glob!r} — skipping", + file=sys.stderr, + ) + return 0 + return run(paths, args.bench_result, args.agg_result) + return run(args.csv or Path("/workspace/gpu_metrics.csv"), args.bench_result, args.agg_result) if __name__ == "__main__": diff --git a/utils/process_result.py b/utils/process_result.py index 5fb059473..0510fe023 100644 --- a/utils/process_result.py +++ b/utils/process_result.py @@ -139,20 +139,41 @@ def get_required_env_vars(required_vars): # Best-effort: patch measured power into the agg JSON. Never fails the run. try: + import glob as _glob_module from aggregate_power import run as _aggregate_power_run - _csv_candidates = [ - os.environ.get('GPU_METRICS_CSV'), - 'gpu_metrics.csv', - '/workspace/gpu_metrics.csv', - ] - _csv_path = next( - (Path(p) for p in _csv_candidates if p and Path(p).is_file()), - None, - ) - if _csv_path is not None: + # Multinode path: srt-slurm launchers set GPU_METRICS_CSV_GLOB after the job + # to a shell glob expanding to one perf_samples_.csv per worker node. + # Takes precedence over the single-CSV fallback — if the launcher set the + # glob, the run was multinode and there is no single-CSV fallback to make. + _csv_arg = None + _glob_pattern = os.environ.get('GPU_METRICS_CSV_GLOB') + if _glob_pattern: + _matched = sorted(Path(p) for p in _glob_module.glob(_glob_pattern)) + if _matched: + _csv_arg = _matched + else: + print( + f'[process_result] GPU_METRICS_CSV_GLOB={_glob_pattern!r} matched no files', + file=sys.stderr, + ) + + if _csv_arg is None: + # Single-node path: gpu_metrics.csv written by start_gpu_monitor in the + # bench container. + _csv_candidates = [ + os.environ.get('GPU_METRICS_CSV'), + 'gpu_metrics.csv', + '/workspace/gpu_metrics.csv', + ] + _csv_arg = next( + (Path(p) for p in _csv_candidates if p and Path(p).is_file()), + None, + ) + + if _csv_arg is not None: _aggregate_power_run( - csv_path=_csv_path, + csv_path=_csv_arg, bench_result=Path(f'{result_filename}.json'), agg_result=agg_path, ) diff --git a/utils/test_aggregate_power.py b/utils/test_aggregate_power.py index bf81ee7b1..b6f040ce8 100644 --- a/utils/test_aggregate_power.py +++ b/utils/test_aggregate_power.py @@ -15,7 +15,7 @@ import json import sys -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path import pytest @@ -445,3 +445,237 @@ def test_patch_agg_result_is_atomic_via_tempfile(tmp_path: Path): assert data["joules_per_total_token"] == 0.5 # No .tmp leftover. assert not (tmp_path / "agg.json.tmp").exists() + + +# --------------------------------------------------------------------------- # +# Multi-node CSV aggregation +# --------------------------------------------------------------------------- # + + +def test_aggregate_power_multi_node_namespaces_local_gpu_indices(tmp_path: Path): + """Two per-node CSVs each report local GPU indices 0..3. + + Without per-source namespacing the union of gpu_keys would collapse to 4 + instead of 8 — the bug this whole multinode change exists to prevent.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([node1, node2], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_multi_node_with_sub_second_clock_drift(tmp_path: Path): + """Per-node polls drift sub-second even on NTP-synced clusters. + + Node1 polls at base+s, node2 at base+s+0.3 — rows land in different ms + buckets. Each bucket is then a single-node 4-GPU slice averaging to 500W, + and the mean across all buckets is the cluster per-GPU mean.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + s + 0.3, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([node1, node2], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_multi_node_asymmetric_prefill_decode_power(tmp_path: Path): + """Disagg topologies draw different per-GPU power on prefill vs decode nodes. + + 4 prefill GPUs at 600W + 4 decode GPUs at 400W: cluster mean is the + weighted average across all 8 GPUs = (4*600 + 4*400)/8 = 500W.""" + base = 1_700_000_000.0 + prefill = tmp_path / "perf_samples_prefill0.csv" + decode = tmp_path / "perf_samples_decode0.csv" + _write_nvidia_csv(prefill, [(base + s, gpu, 600.0) for s in range(3) for gpu in range(4)]) + _write_nvidia_csv(decode, [(base + s, gpu, 400.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([prefill, decode], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_multi_node_skips_missing_csv_silently(tmp_path: Path): + """If a node failed to start perfmon, its CSV will be absent. + + Aggregating over the remaining nodes is preferable to returning None — + losing one node's power data should not zero out the whole metric.""" + base = 1_700_000_000.0 + present = tmp_path / "perf_samples_node1.csv" + missing = tmp_path / "perf_samples_node2.csv" # never written + _write_nvidia_csv(present, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([present, missing], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 4 # only the node that emitted data + + +def test_aggregate_power_single_path_in_list_matches_bare_path(tmp_path: Path): + """Backward compat: aggregate_power([csv], ...) == aggregate_power(csv, ...). + + Single-source behavior must not change when the caller wraps the path in a + list — otherwise process_result.py-style callers that defensively normalize + to a list would see different num_gpus values than legacy bare-path calls.""" + base = 1_700_000_000.0 + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv(csv, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(8)]) + + bare = aggregate_power(csv, base, base + 10) + listed = aggregate_power([csv], base, base + 10) + assert bare == listed + assert bare == (pytest.approx(500.0), 8) + + +def test_aggregate_power_accepts_iterable_not_just_list(tmp_path: Path): + """Signature is Iterable[Path] — generators (e.g. Path.glob()) must work.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + + result = aggregate_power(tmp_path.glob("perf_samples_*.csv"), base, base + 10) + assert result is not None + _, num_gpus = result + assert num_gpus == 8 + + +def test_run_multi_node_e2e_computes_joules_from_total_gpus(tmp_path: Path): + """End-to-end multinode: run() with a list of CSVs patches the agg JSON. + + 8 GPUs total at 500W for 10s → 40_000 J → 2.0 J/output_token for 20_000 tokens.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + 1 + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + 1 + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=base, end=base + 10, duration=10.0, total_output=20_000) + agg.write_text(json.dumps({"hw": "gb300", "conc": 8192}), encoding="utf-8") + + exit_code = run([node1, node2], bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + assert patched["joules_per_output_token"] == pytest.approx(2.0) + + +def test_run_multi_node_skips_when_all_csvs_missing(tmp_path: Path): + """Entire monitoring failure (all per-node CSVs absent) skips cleanly without patching.""" + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=0.0, end=10.0, duration=10.0, total_output=1000) + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + exit_code = run([tmp_path / "absent1.csv", tmp_path / "absent2.csv"], bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + assert "avg_power_w" not in patched + + +# --------------------------------------------------------------------------- # +# _load_bench_window fallbacks for srt-slurm multinode result JSONs +# +# srt-slurm's sa-bench result writer emits `date` + `duration` but NOT the +# benchmark_*_time_unix fields our single-node benchmark_serving.py adds. +# Without a fallback, multinode runs would always hit "No bench window in +# {bench_result}" and silently skip power aggregation end-to-end. +# --------------------------------------------------------------------------- # + + +def test_run_uses_date_field_when_unix_timestamps_absent(tmp_path: Path): + """Tier 2: parse `date` ("YYYYMMDD-HHMMSS" UTC) + `duration` for the window.""" + # End of bench at a known UTC instant; CSV samples land in [end-10, end]. + end_unix = datetime(2026, 5, 20, 3, 10, 29, tzinfo=timezone.utc).timestamp() + csv = tmp_path / "perf_samples_node0.csv" + _write_nvidia_csv(csv, [(end_unix - 1 - s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + bench = tmp_path / "bench.json" + bench.write_text( + json.dumps( + { + "date": "20260520-031029", + "duration": 10.0, + "total_output_tokens": 1000, + "total_input_tokens": 8000, + } + ), + encoding="utf-8", + ) + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + assert run([csv], bench, agg) == 0 + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + # 4 GPUs × 500W × 10s = 20_000 J / 1000 output tokens = 20.0 J/output_token. + assert patched["joules_per_output_token"] == pytest.approx(20.0) + # 20_000 J / (1000 + 8000) total tokens ≈ 2.222 J/total_token. + assert patched["joules_per_total_token"] == pytest.approx(20_000 / 9_000) + + +def test_run_uses_mtime_when_date_unparseable(tmp_path: Path): + """Tier 3a: malformed `date` falls through to file mtime as bench-end proxy.""" + csv = tmp_path / "perf_samples_node0.csv" + bench = tmp_path / "bench.json" + bench.write_text( + json.dumps({"date": "not-a-date", "duration": 10.0, "total_output_tokens": 1000}), + encoding="utf-8", + ) + # CSV samples bracket bench file's mtime so they fall inside the derived window. + end_unix = bench.stat().st_mtime + _write_nvidia_csv(csv, [(end_unix - 1 - s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + assert run([csv], bench, agg) == 0 + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + + +def test_run_uses_mtime_when_no_date_field(tmp_path: Path): + """Tier 3b: bench JSON has only `duration` → file mtime is end-of-bench.""" + csv = tmp_path / "perf_samples_node0.csv" + bench = tmp_path / "bench.json" + bench.write_text( + json.dumps({"duration": 10.0, "total_output_tokens": 1000}), + encoding="utf-8", + ) + end_unix = bench.stat().st_mtime + _write_nvidia_csv(csv, [(end_unix - 1 - s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + assert run([csv], bench, agg) == 0 + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + + +def test_run_skips_when_duration_missing(tmp_path: Path): + """No tier can resolve a window without `duration` — skip cleanly.""" + csv = tmp_path / "perf_samples_node0.csv" + _write_nvidia_csv(csv, [(1_700_000_000.0, 0, 400.0)]) + bench = tmp_path / "bench.json" + bench.write_text(json.dumps({"total_output_tokens": 1000}), encoding="utf-8") + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + assert run([csv], bench, agg) == 0 + assert "avg_power_w" not in json.loads(agg.read_text()) diff --git a/utils/test_process_result.py b/utils/test_process_result.py index 4037689ea..61d3b45fc 100644 --- a/utils/test_process_result.py +++ b/utils/test_process_result.py @@ -649,3 +649,108 @@ def test_missing_bench_timestamps_does_not_patch(self, tmp_path, single_node_env patched = json.loads(agg_path.read_text()) assert "avg_power_w" not in patched assert "joules_per_output_token" not in patched + + def test_multinode_csv_glob_aggregates_across_per_node_csvs(self, tmp_path, single_node_env_vars): + """Multinode wiring: srt-slurm launchers set GPU_METRICS_CSV_GLOB to a + shell glob expanding to one perf_samples_.csv per worker node. + process_result.py must expand it and hand the list to the aggregator, + which namespaces local GPU indices per source so they don't collide. + + Without this bridge the launcher would set the env var, process_result.py + would ignore it (fall back to a non-existent /workspace/gpu_metrics.csv), + and the chart would silently show no power data — the failure mode that + motivated catching this in the contract check.""" + start, end = 1_700_000_100.0, 1_700_000_160.0 # 60s bench window + # Two per-node CSVs at the same local indices 0-3. Without per-source + # namespacing the union would collapse to 4 GPUs instead of 8. + self._write_nvidia_csv( + tmp_path / "perf_samples_node1.csv", start, end, watts_per_gpu=600.0, num_gpus=4 + ) + self._write_nvidia_csv( + tmp_path / "perf_samples_node2.csv", start, end, watts_per_gpu=600.0, num_gpus=4 + ) + + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + "benchmark_start_time_unix": start, + "benchmark_end_time_unix": end, + "duration": 60.0, + "total_output_tokens": 30_000, + } + env = { + **single_node_env_vars, + "GPU_METRICS_CSV_GLOB": str(tmp_path / "perf_samples_*.csv"), + } + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + # 2 nodes × 4 GPUs = 8 total. Per-GPU mean stays at 600W. + assert patched["avg_power_w"] == pytest.approx(600.0, abs=0.5) + # 600W × 8 GPUs × 60s / 30_000 tokens = 9.6 J/tok. + # If namespacing failed we'd see ~4.8 (only 4 GPUs counted). + assert patched["joules_per_output_token"] == pytest.approx(9.6, abs=0.05) + + def test_multinode_csv_glob_takes_precedence_over_single_csv(self, tmp_path, single_node_env_vars): + """If both GLOB and single CSV are set, the glob wins. + + Reflects the ownership split: the multinode launcher sets the glob + after the job, while the single CSV env var is only meaningful for + single-node runs. If a stale single-CSV value leaks through (e.g. a + runner with persistent env), the glob should still take precedence.""" + start, end = 1_700_000_100.0, 1_700_000_160.0 + glob_csv = tmp_path / "perf_samples_node1.csv" + stale_csv = tmp_path / "stale_single.csv" + self._write_nvidia_csv(glob_csv, start, end, watts_per_gpu=600.0, num_gpus=4) + self._write_nvidia_csv(stale_csv, start, end, watts_per_gpu=100.0, num_gpus=1) + + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + "benchmark_start_time_unix": start, + "benchmark_end_time_unix": end, + "duration": 60.0, + "total_output_tokens": 30_000, + } + env = { + **single_node_env_vars, + "GPU_METRICS_CSV_GLOB": str(tmp_path / "perf_samples_*.csv"), + "GPU_METRICS_CSV": str(stale_csv), + } + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + # Glob respected → 600W (4 GPUs). Stale fallback would give 100W (1 GPU). + assert patched["avg_power_w"] == pytest.approx(600.0, abs=0.5) + + def test_multinode_csv_glob_empty_match_falls_through_silently(self, tmp_path, single_node_env_vars): + """If GPU_METRICS_CSV_GLOB is set but matches no files (perfmon failed + to start on any node), process_result.py still succeeds and writes the + agg JSON without power fields. The run must not block on telemetry.""" + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + } + env = { + **single_node_env_vars, + "GPU_METRICS_CSV_GLOB": str(tmp_path / "perf_samples_*.csv"), + } + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + assert "avg_power_w" not in patched