From 164bd72270488b704ef8d8d524dd902d8e40331e Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Sun, 16 Nov 2025 14:34:01 -0500 Subject: [PATCH 01/13] NCCL_SOCKET_IFNAME is required for multinode rccl-test runs --- lib/rccl_lib.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index 9c828c2a..f7ee7ee0 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -317,6 +317,7 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, -x PATH={PATH} \ -x LD_LIBRARY_PATH={LD_LIBRARY_PATH} \ -x NCCL_IB_HCA={ib_hca_list} \ + -x NCCL_SOCKET_IFNAME={oob_port} \ --mca btl ^vader,openib \ --mca btl_tcp_if_include {oob_port}\ -x UCX_NET_DEVICES={net_dev_list} \ @@ -484,6 +485,7 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod -x PATH={PATH} \ -x LD_LIBRARY_PATH={LD_LIBRARY_PATH} \ -x NCCL_IB_HCA={ib_hca_list} \ + -x NCCL_SOCKET_IFNAME={oob_port} \ --mca btl ^vader,openib \ --mca btl_tcp_if_include {oob_port}\ -x UCX_NET_DEVICES={net_dev_list} \ From b132cf018eaa093e4d012d35ad372505493aa96a Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 07:59:43 -0500 Subject: [PATCH 02/13] Pydantic models for validating rccl-tests results --- models/rccl.py | 120 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 models/rccl.py diff --git a/models/rccl.py b/models/rccl.py new file mode 100644 index 00000000..abb75f2f --- /dev/null +++ b/models/rccl.py @@ -0,0 +1,120 @@ +from pydantic import BaseModel, Field, model_validator, ConfigDict, field_validator +from typing import Annotated, Literal +import math + +NonNegativeInt = Annotated[int, Field(ge=0)] +PositiveInt = Annotated[int, Field(gt=0)] +NonNegativeFloat = Annotated[float, Field(ge=0.0)] +Collective = Literal['AllReduce', 'AllGather', 'Scatter', 'Gather', 'ReduceScatter', 'SendRecv', 'AllToAll', 'AllToAllV', 'Broadcast'] +Type = Literal[ + 'int8', 'int32', 'int64', + 'uint8', 'uint32', 'uint64', + 'float', 'double', + 'half', 'bfloat16', + 'fp8_e4m3', 'fp8_e5m2' +] +Redop = Literal['sum', 'prod', 'min', 'max', 'avg', 'all', 'none'] +InPlace = Literal[0, 1] + +class RcclTestsSingleNodeRaw(BaseModel): + """ + This class represents the schema for single noderccl-test results, while serializing rccl-test input + if we don't adhere to this schema, we fail immediately preventing weird behaviour later on + in the processing pipeline + """ + model_config = ConfigDict(frozen=True) + numCycle: NonNegativeInt + name: Collective + size: PositiveInt + type: Type + redop: Redop + inPlace: InPlace + time: NonNegativeFloat + algBw: NonNegativeFloat + busBw: NonNegativeFloat + wrong: int + + @field_validator('time', 'algBw', 'busBw') + @classmethod + def validate_not_nan_inf(cls, v: float, info) -> float: + """Ensure no NaN/Inf values in measurements.""" + if math.isnan(v) or math.isinf(v): + raise ValueError(f'{info.field_name} cannot be NaN/Inf, got {v}') + return v + + +class RcclTestsMultinodeRaw(BaseModel): + """ + This class represents the schema for multi node rccl-test results, while serializing rccl-test input + if we don't adhere to this schema, we fail immediately preventing weird behaviour later on + in the processing pipeline + """ + model_config = ConfigDict(frozen=True) + numCycle: NonNegativeInt + name: Collective + nodes: PositiveInt + ranks: PositiveInt + ranksPerNode: PositiveInt + gpusPerRank: PositiveInt + size: PositiveInt + type: Type + redop: Redop + inPlace: InPlace + time: NonNegativeFloat + algBw: NonNegativeFloat + busBw: NonNegativeFloat + wrong: int + + @model_validator(mode='after') + def validate_ranks_relationship(self): + """Ensure ranks = nodes * ranksPerNode.""" + expected_ranks = self.nodes * self.ranksPerNode + if self.ranks != expected_ranks: + raise ValueError( + f"ranks ({self.ranks}) must equal nodes ({self.nodes}) × " + f"ranksPerNode ({self.ranksPerNode}) = {expected_ranks}" + ) + return self + + @field_validator('time', 'algBw', 'busBw') + @classmethod + def validate_not_nan_inf(cls, v: float, info) -> float: + """Ensure no NaN/Inf values in measurements.""" + if math.isnan(v) or math.isinf(v): + raise ValueError(f'{info.field_name} cannot be NaN/Inf, got {v}') + return v + +class RcclTestsAggregated(BaseModel): + """ + This class represents the aggregated schema for rccl-test results + """ + # Grouping keys + model_config = ConfigDict(frozen=True, populate_by_name=True) + name: Collective = Field(alias='collective') + size: PositiveInt + type: Type + + # Aggregated metrics + busBw_mean: NonNegativeFloat = Field(alias='busbw_mean') + busBw_std: NonNegativeFloat = Field(alias='busbw_std') + algBw_mean: NonNegativeFloat = Field(alias='algbw_mean') + algBw_std: NonNegativeFloat = Field(alias='algbw_std') + time_mean: NonNegativeFloat = Field(alias='time_mean') + time_std: NonNegativeFloat = Field(alias='time_std') + + #Metadata + num_runs: NonNegativeInt = Field(alias='numCycles', description='Number of cycles aggregated') + + @field_validator('busBw_std', 'algBw_std', 'time_std') + @classmethod + def handle_nan_std(cls, v: float, info) -> float: + """ + Convert NaN (from single-value std) to 0.0. + Pandas returns NaN for std of single value, which is correct mathematically, + but we interpret it as 0 variability. + """ + if math.isnan(v): + return 0.0 + if math.isinf(v): + raise ValueError(f'{info.field_name} cannot be Inf') + return v \ No newline at end of file From 357992022e65a9d050bd630605cffc33d6b9e0b4 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 07:59:59 -0500 Subject: [PATCH 03/13] make models a module --- models/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 models/__init__.py diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 00000000..e69de29b From da9a6817e506f41e7264171120dbad26b01d2cbf Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 08:01:14 -0500 Subject: [PATCH 04/13] Pass dtype and number of cycles are argument from config --- tests/rccl/rccl_multinode_cvs.py | 2 ++ tests/rccl/rccl_multinode_default_cvs.py | 2 ++ tests/rccl/rccl_singlenode_cvs.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/tests/rccl/rccl_multinode_cvs.py b/tests/rccl/rccl_multinode_cvs.py index 93cb282f..2aa6150e 100644 --- a/tests/rccl/rccl_multinode_cvs.py +++ b/tests/rccl/rccl_multinode_cvs.py @@ -407,6 +407,8 @@ def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective, rccl_ end_msg_size = config_dict['end_msg_size'], \ step_function = config_dict['step_function'], \ threads_per_gpu = config_dict['threads_per_gpu'], \ + data_types = config_dict['data_types'], \ + no_of_cycles = config_dict['no_of_cycles'], \ warmup_iterations = config_dict['warmup_iterations'], \ no_of_iterations = config_dict['no_of_iterations'], \ check_iteration_count = config_dict['check_iteration_count'], \ diff --git a/tests/rccl/rccl_multinode_default_cvs.py b/tests/rccl/rccl_multinode_default_cvs.py index 36800ef4..40d44be1 100644 --- a/tests/rccl/rccl_multinode_default_cvs.py +++ b/tests/rccl/rccl_multinode_default_cvs.py @@ -345,6 +345,8 @@ def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective ): end_msg_size = config_dict['end_msg_size'], \ step_function = config_dict['step_function'], \ threads_per_gpu = config_dict['threads_per_gpu'], \ + data_types = config_dict['data_types'], \ + no_of_cycles = config_dict['no_of_cycles'], \ warmup_iterations = config_dict['warmup_iterations'], \ no_of_iterations = config_dict['no_of_iterations'], \ check_iteration_count = config_dict['check_iteration_count'], \ diff --git a/tests/rccl/rccl_singlenode_cvs.py b/tests/rccl/rccl_singlenode_cvs.py index 3cd62bb5..3ee6142f 100644 --- a/tests/rccl/rccl_singlenode_cvs.py +++ b/tests/rccl/rccl_singlenode_cvs.py @@ -309,6 +309,8 @@ def test_singlenode_perf(phdl, cluster_dict, config_dict, rccl_collective ): debug_level = config_dict['debug_level'], \ rccl_result_file = config_dict['rccl_result_file'], \ no_of_local_ranks = config_dict['no_of_local_ranks'], \ + data_types = config_dict['data_types'], \ + no_of_cycles = config_dict['no_of_cycles'], \ verify_bus_bw = config_dict['verify_bus_bw'], \ verify_bw_dip = config_dict['verify_bw_dip'], \ verify_lat_dip = config_dict['verify_lat_dip'], \ From 0a30ec3b9bc8996cf865f38f222299a3b9233925 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 08:03:16 -0500 Subject: [PATCH 05/13] updated requirements for pydantic & pandas --- .gitignore | 3 +++ requirements.txt | 2 ++ 2 files changed, 5 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..88d0dd45 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +pycache/ +__pycache__/ +*.pyc diff --git a/requirements.txt b/requirements.txt index f88fbab6..11f34007 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ pytest-html pytest-repeat pytest-dependency xlsxwriter +pydantic>=2.0 +pandas \ No newline at end of file From 80149a2d8e5fc4de2f14fc0ccc037c70b4cb008b Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 09:59:56 -0500 Subject: [PATCH 06/13] Refractor to reduce model duplication use inheritance --- models/__init__.py | 13 ++++++++++++ models/rccl.py | 49 ++++++++++++---------------------------------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/models/__init__.py b/models/__init__.py index e69de29b..14ca68f7 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -0,0 +1,13 @@ +"""CVS Pydantic models for data validation.""" + +from models.rccl import ( + RcclTests, + RcclTestsMultinodeRaw, + RcclTestsAggregated, +) + +__all__ = [ + 'RcclTests', + 'RcclTestsMultinodeRaw', + 'RcclTestsAggregated', +] \ No newline at end of file diff --git a/models/rccl.py b/models/rccl.py index abb75f2f..636ff99c 100644 --- a/models/rccl.py +++ b/models/rccl.py @@ -16,12 +16,8 @@ Redop = Literal['sum', 'prod', 'min', 'max', 'avg', 'all', 'none'] InPlace = Literal[0, 1] -class RcclTestsSingleNodeRaw(BaseModel): - """ - This class represents the schema for single noderccl-test results, while serializing rccl-test input - if we don't adhere to this schema, we fail immediately preventing weird behaviour later on - in the processing pipeline - """ + +class RcclTests(BaseModel): model_config = ConfigDict(frozen=True) numCycle: NonNegativeInt name: Collective @@ -42,28 +38,16 @@ def validate_not_nan_inf(cls, v: float, info) -> float: raise ValueError(f'{info.field_name} cannot be NaN/Inf, got {v}') return v - -class RcclTestsMultinodeRaw(BaseModel): +class RcclTestsMultinodeRaw(RcclTests): """ This class represents the schema for multi node rccl-test results, while serializing rccl-test input if we don't adhere to this schema, we fail immediately preventing weird behaviour later on in the processing pipeline """ - model_config = ConfigDict(frozen=True) - numCycle: NonNegativeInt - name: Collective nodes: PositiveInt ranks: PositiveInt ranksPerNode: PositiveInt gpusPerRank: PositiveInt - size: PositiveInt - type: Type - redop: Redop - inPlace: InPlace - time: NonNegativeFloat - algBw: NonNegativeFloat - busBw: NonNegativeFloat - wrong: int @model_validator(mode='after') def validate_ranks_relationship(self): @@ -76,14 +60,6 @@ def validate_ranks_relationship(self): ) return self - @field_validator('time', 'algBw', 'busBw') - @classmethod - def validate_not_nan_inf(cls, v: float, info) -> float: - """Ensure no NaN/Inf values in measurements.""" - if math.isnan(v) or math.isinf(v): - raise ValueError(f'{info.field_name} cannot be NaN/Inf, got {v}') - return v - class RcclTestsAggregated(BaseModel): """ This class represents the aggregated schema for rccl-test results @@ -93,17 +69,18 @@ class RcclTestsAggregated(BaseModel): name: Collective = Field(alias='collective') size: PositiveInt type: Type - - # Aggregated metrics - busBw_mean: NonNegativeFloat = Field(alias='busbw_mean') - busBw_std: NonNegativeFloat = Field(alias='busbw_std') - algBw_mean: NonNegativeFloat = Field(alias='algbw_mean') - algBw_std: NonNegativeFloat = Field(alias='algbw_std') - time_mean: NonNegativeFloat = Field(alias='time_mean') - time_std: NonNegativeFloat = Field(alias='time_std') + inPlace: InPlace #Metadata - num_runs: NonNegativeInt = Field(alias='numCycles', description='Number of cycles aggregated') + num_runs: PositiveInt = Field(description='Number of cycles aggregated') + + # Aggregated metrics + busBw_mean: NonNegativeFloat + busBw_std: NonNegativeFloat + algBw_mean: NonNegativeFloat + algBw_std: NonNegativeFloat + time_mean: NonNegativeFloat + time_std: NonNegativeFloat @field_validator('busBw_std', 'algBw_std', 'time_std') @classmethod From e5073d7ce50f3af56b04e3859ca5117c9248e65e Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 10:02:02 -0500 Subject: [PATCH 07/13] Single node rccl-tests aggregation for each data type --- lib/rccl_lib.py | 153 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 38 deletions(-) diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index f7ee7ee0..6558e197 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -5,17 +5,23 @@ All code contained here is Property of Advanced Micro Devices, Inc. ''' +#Standard libraries import re import sys import os +from typing import List, Dict +from pathlib import Path -import globals - -log = globals.log +#Third party libraries +import pandas as pd +from pydantic import ValidationError +import globals +from models.rccl import RcclTests, RcclTestsAggregated, RcclTestsMultinodeRaw from utils_lib import * from verify_lib import * +log = globals.log rccl_err_dict = { @@ -209,8 +215,44 @@ def convert_to_graph_dict(result_dict): - - +def aggregate_rccl_test_results(validated_results: List[RcclTests]) -> List[RcclTestsAggregated]: + """ + Aggregate multiple rccl-test results into mean/std per (name, size, type, inPlace) + Args: validated_results: List[RcclTests] - list of validated rccl-test results + Returns: List[RcclTestsAggregated] - list of aggregated rccl-test results with mean/std per (name, size, type, inPlace) + """ + if not validated_results: + raise ValueError("validated_results list cannot be empty") + log.info(f"Aggregating {len(validated_results)} RCCL test results") + data = [result.model_dump() for result in validated_results] + df = pd.DataFrame(data) + agg_df = df.groupby(['name', 'size', 'type', 'inPlace'], as_index=False).agg( + busBw_mean=('busBw', 'mean'), + busBw_std=('busBw', 'std'), + algBw_mean=('algBw', 'mean'), + algBw_std=('algBw', 'std'), + time_mean=('time', 'mean'), + time_std=('time', 'std'), + num_runs=('numCycle', 'count') + ) + agg_results = [] + errors = [] + + for row_dict in agg_df.to_dict('records'): + try: + agg_results.append(RcclTestsAggregated.model_validate(row_dict)) + except ValidationError as e: + error_msg = f"Validation failed for row {row_dict}: {e}" + log.error(error_msg) + errors.append(error_msg) + + # Report any validation failures + if errors: + error_summary = "\n".join(errors) + fail_test(f"Aggregation validation failed:\n{error_summary}") + + log.info(f"Successfully validated {len(agg_results)} aggregated results") + return agg_results @@ -544,7 +586,8 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ step_function=2, warmup_iterations=10, no_of_iterations=1, \ check_iteration_count=1, debug_level='INFO', \ rccl_result_file='/tmp/rccl_result_output.json', no_of_local_ranks=8, \ - verify_bus_bw=False, verify_bw_dip=True, verify_lat_dip=True, exp_results_dict=None ): + data_types=['float'], no_of_cycles=10, verify_bus_bw=False, verify_bw_dip=True, + verify_lat_dip=True, exp_results_dict=None ) -> List[Dict]: """ Run an Single Node RCCL collective test @@ -580,47 +623,81 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ LD_LIBRARY_PATH=f'{RCCL_PATH}:{ROCM_PATH}/lib:$LD_LIBRARY_PATH' - - cmd = f'''export NCCL_DEBUG={debug_level}; \ - export PATH={PATH}; \ - export LD_LIBRARY_PATH={LD_LIBRARY_PATH}; \ - {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ - -g {no_of_local_ranks} -c {check_iteration_count} -w {warmup_iterations} \ - -Z json -x {rccl_result_file}''' - - print('%%%%%%%%%%%%%%%%') - print(cmd) - print('%%%%%%%%%%%%%%%%') + # Run rccl test for each data type mentioned in the data_types list + all_raw_results = [] + all_validated_results = [] + base_path = Path(rccl_result_file) + for dtype in data_types: + # Create a unique result file for each data type + dtype_result_file = f'{base_path.parent}/{base_path.stem}_{dtype}.json' + cmd = f'''export NCCL_DEBUG={debug_level}; \ + export PATH={PATH}; \ + export LD_LIBRARY_PATH={LD_LIBRARY_PATH}; \ + {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ + -g {no_of_local_ranks} -c {check_iteration_count} -w {warmup_iterations} \ + -d {dtype} -N {no_of_cycles} -Z json -x {dtype_result_file}''' + + print('%%%%%%%%%%%%%%%%') + print(cmd) + print('%%%%%%%%%%%%%%%%') + try: + out_dict = phdl.exec(cmd, timeout=500) + for node in out_dict.keys(): + scan_rccl_logs(out_dict[node]) + except Exception as e: + log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + + # Read the JSON results emitted by the RCCL test binary + result_dict_out = phdl.exec(f'cat {dtype_result_file}') + dtype_result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Validate the results against the schema fail if results are not valid + try: + validated = [RcclTests.model_validate(test_result) for test_result in dtype_result_out] + log.info(f'Validation passed: {len(validated)} RcclTests schema validation passed') + all_validated_results.extend(validated) + all_raw_results.extend(dtype_result_out) + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test {dtype} schema validation failed: {e}') + + # Save the results to a main result file + with open(rccl_result_file, 'w') as f: + json.dump(all_raw_results, f, indent=2) + log.info(f'Saved combined results from all all data types to {rccl_result_file}') + + # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid try: - out_dict = phdl.exec(cmd, timeout=500) - for node in out_dict.keys(): - scan_rccl_logs(out_dict[node]) - except Exception as e: - log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - - # Read the JSON results emitted by the RCCL test binary - result_dict_out = phdl.exec(f'cat {rccl_result_file}') - result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + if (len(all_validated_results) > 1): + aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) + log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') + # Note: current we are saving the aggregated results, but we could instead use this for final report generation + aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' + with open(aggregated_path, 'w') as f: + json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) + log.info(f'Saved aggregated results to {aggregated_path}') + else: + log.info(f'Aggregation skipped: only one run found') + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test schema validation failed: {e}') + except ValueError as e: + log.error(f'Aggregation failed: {e}') + fail_test(f'RCCL Test aggregation failed: {e}') # Collect basic GPU information via rocm-smi smi_out_dict = phdl.exec('rocm-smi -a | head -30') + # NOTE: the bw & lat verification functions could also be refractored into RcclTestsRaw model to avoid code duplication # If requested, verify measured bus bandwidths against provided expected Bandwidth if re.search( 'True', verify_bus_bw, re.I ): - for node in result_dict_out.keys(): - result_out = json.loads(result_dict_out[node].replace( '\n', '').replace( '\r', '')) - if test_name in exp_results_dict.keys(): - check_bus_bw( test_name, result_out, exp_results_dict[test_name] ) + if test_name in exp_results_dict.keys(): + check_bus_bw( test_name, all_raw_results, exp_results_dict[test_name] ) if re.search( 'True', verify_bw_dip, re.I ): - for node in result_dict_out.keys(): - result_out = json.loads(result_dict_out[node].replace( '\n', '').replace( '\r', '')) - check_bw_dip( test_name, result_out, ) + check_bw_dip( test_name, all_raw_results, ) if re.search( 'True', verify_lat_dip, re.I ): - for node in result_dict_out.keys(): - result_out = json.loads(result_dict_out[node].replace( '\n', '').replace( '\r', '')) - check_lat_dip( test_name, result_out, ) + check_lat_dip( test_name, all_raw_results, ) - return result_out + return all_raw_results From e241bd47e179ad666fdfbb5e60298c047978f2da Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 10:09:11 -0500 Subject: [PATCH 08/13] Pass no of cycles and dtypes through config dict --- input/config_file/rccl/rccl_config.json | 2 ++ input/config_file/rccl/single_node_mi355_rccl.json | 2 ++ 2 files changed, 4 insertions(+) diff --git a/input/config_file/rccl/rccl_config.json b/input/config_file/rccl/rccl_config.json index b87061d5..b7e389b4 100644 --- a/input/config_file/rccl/rccl_config.json +++ b/input/config_file/rccl/rccl_config.json @@ -25,6 +25,8 @@ "end_msg_size": "16g", "step_function": "2", "threads_per_gpu": "1", + "data_types": [ "int8", "int32"], + "no_of_cycles": "10", "warmup_iterations": "10", "no_of_iterations": "1", "check_iteration_count": "1", diff --git a/input/config_file/rccl/single_node_mi355_rccl.json b/input/config_file/rccl/single_node_mi355_rccl.json index 4d849fce..870161f9 100644 --- a/input/config_file/rccl/single_node_mi355_rccl.json +++ b/input/config_file/rccl/single_node_mi355_rccl.json @@ -11,6 +11,8 @@ "start_msg_size": "1024", "end_msg_size": "16g", "step_function": "2", + "data_types": [ "int8", "int32"], + "no_of_cycles": "10", "warmup_iterations": "10", "no_of_iterations": "1", "check_iteration_count": "1", From 9bc6483374ee784765b300f73c3a02840517520a Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 11:22:20 -0500 Subject: [PATCH 09/13] iterate through dtypes and run multinode rccl-tests with validation --- lib/rccl_lib.py | 61 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index 6558e197..d93318bf 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -433,6 +433,7 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod nccl_proto='simple', gid_index=1, qp_count=1, \ start_msg_size=1024, end_msg_size='16g', \ step_function=2, threads_per_gpu=1, warmup_iterations=10, no_of_iterations=1, \ + data_types=['float'], no_of_cycles=10, \ check_iteration_count=1, debug_level='INFO', \ rccl_result_file='/tmp/rccl_result_output.json', no_of_local_ranks=8, \ ib_rx_queue_len=8192, ucx_tls='tcp', hcoll_enable_mcast_all=0, \ @@ -516,7 +517,13 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod shdl.exec(cmd) - cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ + all_raw_results = [] + all_validated_results = [] + base_path = Path(rccl_result_file) + for dtype in data_types: + # Create a unique result file for each data type + dtype_result_file = f'{base_path.parent}/{base_path.stem}_{dtype}.json' + cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ --allow-run-as-root \ --hostfile /tmp/rccl_hosts_file.txt \ -x NCCL_DEBUG={debug_level} \ @@ -535,24 +542,38 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod -x NCCL_NET_PLUGIN={nccl_net_plugin} \ {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ -g {threads_per_gpu} -c {check_iteration_count} -w {warmup_iterations} \ - -Z json -x {rccl_result_file} + -d {dtype} -N {no_of_cycles} -Z json -x {dtype_result_file} ''' - print('%%%%%%%%%%%%%%%%') - print(cmd) - print('%%%%%%%%%%%%%%%%') - try: - out_dict = shdl.exec(cmd, timeout=500) - output = out_dict[head_node] - #print(output) - scan_rccl_logs(output) - except Exception as e: - log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + print('%%%%%%%%%%%%%%%%') + print(cmd) + print('%%%%%%%%%%%%%%%%') + try: + out_dict = shdl.exec(cmd, timeout=500) + output = out_dict[head_node] + #print(output) + scan_rccl_logs(output) + except Exception as e: + log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - # Read the JSON results emitted by the RCCL test binary - result_dict_out = shdl.exec(f'cat {rccl_result_file}') - result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Read the JSON results emitted by the RCCL test binary + result_dict_out = shdl.exec(f'cat {dtype_result_file}') + dtype_result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Validate the results against the schema fail if results are not valid + try: + validated = [RcclTestsMultinodeRaw.model_validate(test_result) for test_result in dtype_result_out] + log.info(f'Validation passed: {len(validated)} RcclTests schema validation passed') + all_validated_results.extend(validated) + all_raw_results.extend(dtype_result_out) + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test {dtype} schema validation failed: {e}') + + # Save the results to a main result file + with open(rccl_result_file, 'w') as f: + json.dump(all_raw_results, f, indent=2) + log.info(f'Saved combined results from all data types to {rccl_result_file}') # Collect basic GPU information via rocm-smi @@ -563,15 +584,15 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod # If requested, verify measured bus bandwidths against provided expected Bandwidth if re.search( 'True', verify_bus_bw, re.I ): if test_name in exp_results_dict.keys(): - check_bus_bw( test_name, result_out, exp_results_dict[test_name] ) + check_bus_bw( test_name, all_raw_results, exp_results_dict[test_name] ) if re.search( 'True', verify_bw_dip, re.I ): - check_bw_dip( test_name, result_out, ) + check_bw_dip( test_name, all_raw_results, ) if re.search( 'True', verify_lat_dip, re.I ): - check_lat_dip( test_name, result_out, ) + check_lat_dip( test_name, all_raw_results, ) - return result_out + return all_raw_results From 25006bd14826a7dec1c91b62131a07907b351611 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 11:57:38 -0500 Subject: [PATCH 10/13] For multinode Rccl-tests model take nodes,ranks etc as optional metadata --- models/rccl.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/models/rccl.py b/models/rccl.py index 636ff99c..c150ef36 100644 --- a/models/rccl.py +++ b/models/rccl.py @@ -1,7 +1,10 @@ -from pydantic import BaseModel, Field, model_validator, ConfigDict, field_validator -from typing import Annotated, Literal +# std libs +from typing import Annotated, Literal, Optional import math +#pypdantic libs +from pydantic import BaseModel, Field, model_validator, ConfigDict, field_validator + NonNegativeInt = Annotated[int, Field(ge=0)] PositiveInt = Annotated[int, Field(gt=0)] NonNegativeFloat = Annotated[float, Field(ge=0.0)] @@ -82,6 +85,12 @@ class RcclTestsAggregated(BaseModel): time_mean: NonNegativeFloat time_std: NonNegativeFloat + # Multinode metadata (optional, None for single-node tests) + nodes: Optional[PositiveInt] = None + ranks: Optional[PositiveInt] = None + ranksPerNode: Optional[PositiveInt] = None + gpusPerRank: Optional[PositiveInt] = None + @field_validator('busBw_std', 'algBw_std', 'time_std') @classmethod def handle_nan_std(cls, v: float, info) -> float: From 8d2fb680af70259f20ed1c31d29d94f9a48470f6 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 12:49:24 -0500 Subject: [PATCH 11/13] Aggregate multinode rccl-test results --- lib/rccl_lib.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index d93318bf..521c92fe 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -575,6 +575,25 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod json.dump(all_raw_results, f, indent=2) log.info(f'Saved combined results from all data types to {rccl_result_file}') + # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid + try: + if len(all_validated_results) >= 1: + aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) + log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') + # Note: currently we are saving the aggregated results, but we could instead use this for final report generation + aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' + with open(aggregated_path, 'w') as f: + json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) + log.info(f'Saved aggregated results to {aggregated_path}') + else: + log.info(f'Aggregation skipped: only one run found') + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test schema validation failed: {e}') + except ValueError as e: + log.error(f'Aggregation failed: {e}') + fail_test(f'RCCL Test aggregation failed: {e}') + # Collect basic GPU information via rocm-smi smi_out_dict = shdl.exec('rocm-smi -a | head -30') @@ -689,10 +708,10 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid try: - if (len(all_validated_results) > 1): + if len(all_validated_results) >= 1: aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') - # Note: current we are saving the aggregated results, but we could instead use this for final report generation + # Note: currently we are saving the aggregated results, but we could instead use this for final report generation aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' with open(aggregated_path, 'w') as f: json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) From 25b31af7ef06c0bb506cd19fbf442406569f9dbe Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 12:50:00 -0500 Subject: [PATCH 12/13] For mutlinode aggregation retain metadata --- lib/rccl_lib.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index 521c92fe..a3fb08df 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -223,9 +223,42 @@ def aggregate_rccl_test_results(validated_results: List[RcclTests]) -> List[Rccl """ if not validated_results: raise ValueError("validated_results list cannot be empty") + + # Check if these are multinode results and validate consistency + multinode_config = None + if isinstance(validated_results[0], RcclTestsMultinodeRaw): + # Extract config from first result + first = validated_results[0] + multinode_config = { + 'nodes': first.nodes, + 'ranks': first.ranks, + 'ranksPerNode': first.ranksPerNode, + 'gpusPerRank': first.gpusPerRank + } + + # Validate all results have same config + for i, result in enumerate(validated_results): + if not isinstance(result, RcclTestsMultinodeRaw): + raise ValueError( + f"Mixed single-node and multi-node results at index {i}" + ) + if (result.nodes != multinode_config['nodes'] or + result.ranks != multinode_config['ranks'] or + result.ranksPerNode != multinode_config['ranksPerNode'] or + result.gpusPerRank != multinode_config['gpusPerRank']): + raise ValueError( + f"Inconsistent cluster config at index {i}: " + f"expected {multinode_config}, got " + f"nodes={result.nodes}, ranks={result.ranks}, " + f"ranksPerNode={result.ranksPerNode}, gpusPerRank={result.gpusPerRank}" + ) + log.info(f"Validated consistent multinode config: {multinode_config}") + log.info(f"Aggregating {len(validated_results)} RCCL test results") data = [result.model_dump() for result in validated_results] df = pd.DataFrame(data) + + # Group and aggregate agg_df = df.groupby(['name', 'size', 'type', 'inPlace'], as_index=False).agg( busBw_mean=('busBw', 'mean'), busBw_std=('busBw', 'std'), @@ -235,6 +268,12 @@ def aggregate_rccl_test_results(validated_results: List[RcclTests]) -> List[Rccl time_std=('time', 'std'), num_runs=('numCycle', 'count') ) + + # Add multinode config if present + if multinode_config: + for key, value in multinode_config.items(): + agg_df[key] = value + agg_results = [] errors = [] From 0c3eef03c9a16b90f96f8c04691de93e86410afe Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Mon, 17 Nov 2025 15:07:37 -0500 Subject: [PATCH 13/13] Multinode rccl-tests aggregation --- lib/rccl_lib.py | 93 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 23 deletions(-) diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index a3fb08df..1cef332c 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -9,6 +9,7 @@ import re import sys import os +import json from typing import List, Dict from pathlib import Path @@ -304,6 +305,7 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, nccl_proto='simple', gid_index=1, qp_count=1, \ start_msg_size=1024, end_msg_size='16g', \ step_function=2, threads_per_gpu=1, warmup_iterations=10, no_of_iterations=1, \ + data_types=['float'], no_of_cycles=10, \ check_iteration_count=1, debug_level='INFO', \ rccl_result_file='/tmp/rccl_result_output.json', no_of_local_ranks=8, \ ib_rx_queue_len=8192, ucx_tls='tcp', hcoll_enable_mcast_all=0, \ @@ -334,13 +336,15 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, nccl_algo, nccl_proto, gid_index, qp_count, ...: NCCL/UCX/MPI tuning parameters. start_msg_size, end_msg_size, step_function: Message size sweep setup. threads_per_gpu, warmup_iterations, check_iteration_count: Test execution tuning. + data_types: List of data types to test (e.g., ['float', 'half']). + no_of_cycles: Number of cycles to run for each data type. debug_level: NCCL_DEBUG level. rccl_result_file: Path where the RCCL test writes JSON results (-Z json -x file). verify_bus_bw: If 'True' (string), compare bus BW vs expected thresholds. exp_results_dict: Dict of expected results per test for verification. Returns: - result_out: The raw JSON string read from rccl_result_file on the head node. + all_raw_results: List of dictionaries containing all test results from all data types. """ print(f'Starting RCCL Test ..........................................{test_name}') @@ -387,7 +391,13 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, shdl.exec(cmd) - cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ + all_raw_results = [] + all_validated_results = [] + base_path = Path(rccl_result_file) + for dtype in data_types: + # Create a unique result file for each data type + dtype_result_file = f'{base_path.parent}/{base_path.stem}_{dtype}.json' + cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ --allow-run-as-root \ --hostfile /tmp/rccl_hosts_file.txt \ -x NCCL_DEBUG={debug_level} \ @@ -418,24 +428,57 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, -x NCCL_NET_PLUGIN={nccl_net_plugin} \ {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ -g {threads_per_gpu} -c {check_iteration_count} -w {warmup_iterations} \ - -Z json -x {rccl_result_file} + -d {dtype} -N {no_of_cycles} -Z json -x {dtype_result_file} ''' - print('%%%%%%%%%%%%%%%%') - print(cmd) - print('%%%%%%%%%%%%%%%%') - try: - out_dict = shdl.exec(cmd, timeout=500) - output = out_dict[head_node] - #print(output) - scan_rccl_logs(output) - except Exception as e: - log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + print('%%%%%%%%%%%%%%%%') + print(cmd) + print('%%%%%%%%%%%%%%%%') + try: + out_dict = shdl.exec(cmd, timeout=500) + output = out_dict[head_node] + #print(output) + scan_rccl_logs(output) + except Exception as e: + log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - # Read the JSON results emitted by the RCCL test binary - result_dict_out = shdl.exec(f'cat {rccl_result_file}') - result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Read the JSON results emitted by the RCCL test binary + result_dict_out = shdl.exec(f'cat {dtype_result_file}') + dtype_result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Validate the results against the schema fail if results are not valid + try: + validated = [RcclTestsMultinodeRaw.model_validate(test_result) for test_result in dtype_result_out] + log.info(f'Validation passed: {len(validated)} RcclTests schema validation passed') + all_validated_results.extend(validated) + all_raw_results.extend(dtype_result_out) + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test {dtype} schema validation failed: {e}') + + # Save the results to a main result file + with open(rccl_result_file, 'w') as f: + json.dump(all_raw_results, f, indent=2) + log.info(f'Saved combined results from all data types to {rccl_result_file}') + + # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid + try: + if len(all_validated_results) >= 1: + aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) + log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') + # Note: currently we are saving the aggregated results, but we could instead use this for final report generation + aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' + with open(aggregated_path, 'w') as f: + json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) + log.info(f'Saved aggregated results to {aggregated_path}') + else: + log.info(f'Aggregation skipped: only one run found') + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test schema validation failed: {e}') + except ValueError as e: + log.error(f'Aggregation failed: {e}') + fail_test(f'RCCL Test aggregation failed: {e}') # Collect basic GPU information via rocm-smi @@ -446,15 +489,15 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, # If requested, verify measured bus bandwidths against provided expected Bandwidth if re.search( 'True', verify_bus_bw, re.I ): if test_name in exp_results_dict.keys(): - check_bus_bw( test_name, result_out, exp_results_dict[test_name] ) + check_bus_bw( test_name, all_raw_results, exp_results_dict[test_name] ) if re.search( 'True', verify_bw_dip, re.I ): - check_bw_dip( test_name, result_out, ) + check_bw_dip( test_name, all_raw_results, ) if re.search( 'True', verify_lat_dip, re.I ): - check_lat_dip( test_name, result_out, ) + check_lat_dip( test_name, all_raw_results, ) - return result_out + return all_raw_results @@ -503,13 +546,15 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod nccl_algo, nccl_proto, gid_index, qp_count, ...: NCCL/UCX/MPI tuning parameters. start_msg_size, end_msg_size, step_function: Message size sweep setup. threads_per_gpu, warmup_iterations, check_iteration_count: Test execution tuning. + data_types: List of data types to test (e.g., ['float', 'half']). + no_of_cycles: Number of cycles to run for each data type. debug_level: NCCL_DEBUG level. rccl_result_file: Path where the RCCL test writes JSON results (-Z json -x file). verify_bus_bw: If 'True' (string), compare bus BW vs expected thresholds. exp_results_dict: Dict of expected results per test for verification. Returns: - result_out: The raw JSON string read from rccl_result_file on the head node. + all_raw_results: List of dictionaries containing all test results from all data types. """ print(f'Starting RCCL Test ..........................................{test_name}') @@ -680,11 +725,13 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ threads_per_gpu, warmup_iterations, check_iteration_count: Test execution tuning. debug_level: NCCL_DEBUG level. rccl_result_file: Path where the RCCL test writes JSON results (-Z json -x file). + data_types: List of data types to test (e.g., ['float', 'half']). + no_of_cycles: Number of cycles to run for each data type. verify_bus_bw: If 'True' (string), compare bus BW vs expected thresholds. exp_results_dict: Dict of expected results per test for verification. Returns: - result_out: The raw JSON string read from rccl_result_file on all nodes + all_raw_results: List of dictionaries containing all test results from all data types. """ print(f'Starting RCCL Test ..........................................{test_name}')