diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..88d0dd45 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +pycache/ +__pycache__/ +*.pyc diff --git a/input/config_file/rccl/rccl_config.json b/input/config_file/rccl/rccl_config.json index b87061d5..b7e389b4 100644 --- a/input/config_file/rccl/rccl_config.json +++ b/input/config_file/rccl/rccl_config.json @@ -25,6 +25,8 @@ "end_msg_size": "16g", "step_function": "2", "threads_per_gpu": "1", + "data_types": [ "int8", "int32"], + "no_of_cycles": "10", "warmup_iterations": "10", "no_of_iterations": "1", "check_iteration_count": "1", diff --git a/input/config_file/rccl/single_node_mi355_rccl.json b/input/config_file/rccl/single_node_mi355_rccl.json index 4d849fce..870161f9 100644 --- a/input/config_file/rccl/single_node_mi355_rccl.json +++ b/input/config_file/rccl/single_node_mi355_rccl.json @@ -11,6 +11,8 @@ "start_msg_size": "1024", "end_msg_size": "16g", "step_function": "2", + "data_types": [ "int8", "int32"], + "no_of_cycles": "10", "warmup_iterations": "10", "no_of_iterations": "1", "check_iteration_count": "1", diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py index 9c828c2a..1cef332c 100644 --- a/lib/rccl_lib.py +++ b/lib/rccl_lib.py @@ -5,17 +5,24 @@ All code contained here is Property of Advanced Micro Devices, Inc. ''' +#Standard libraries import re import sys import os +import json +from typing import List, Dict +from pathlib import Path -import globals - -log = globals.log +#Third party libraries +import pandas as pd +from pydantic import ValidationError +import globals +from models.rccl import RcclTests, RcclTestsAggregated, RcclTestsMultinodeRaw from utils_lib import * from verify_lib import * +log = globals.log rccl_err_dict = { @@ -209,8 +216,83 @@ def convert_to_graph_dict(result_dict): - - +def aggregate_rccl_test_results(validated_results: List[RcclTests]) -> List[RcclTestsAggregated]: + """ + Aggregate multiple rccl-test results into mean/std per (name, size, type, inPlace) + Args: validated_results: List[RcclTests] - list of validated rccl-test results + Returns: List[RcclTestsAggregated] - list of aggregated rccl-test results with mean/std per (name, size, type, inPlace) + """ + if not validated_results: + raise ValueError("validated_results list cannot be empty") + + # Check if these are multinode results and validate consistency + multinode_config = None + if isinstance(validated_results[0], RcclTestsMultinodeRaw): + # Extract config from first result + first = validated_results[0] + multinode_config = { + 'nodes': first.nodes, + 'ranks': first.ranks, + 'ranksPerNode': first.ranksPerNode, + 'gpusPerRank': first.gpusPerRank + } + + # Validate all results have same config + for i, result in enumerate(validated_results): + if not isinstance(result, RcclTestsMultinodeRaw): + raise ValueError( + f"Mixed single-node and multi-node results at index {i}" + ) + if (result.nodes != multinode_config['nodes'] or + result.ranks != multinode_config['ranks'] or + result.ranksPerNode != multinode_config['ranksPerNode'] or + result.gpusPerRank != multinode_config['gpusPerRank']): + raise ValueError( + f"Inconsistent cluster config at index {i}: " + f"expected {multinode_config}, got " + f"nodes={result.nodes}, ranks={result.ranks}, " + f"ranksPerNode={result.ranksPerNode}, gpusPerRank={result.gpusPerRank}" + ) + log.info(f"Validated consistent multinode config: {multinode_config}") + + log.info(f"Aggregating {len(validated_results)} RCCL test results") + data = [result.model_dump() for result in validated_results] + df = pd.DataFrame(data) + + # Group and aggregate + agg_df = df.groupby(['name', 'size', 'type', 'inPlace'], as_index=False).agg( + busBw_mean=('busBw', 'mean'), + busBw_std=('busBw', 'std'), + algBw_mean=('algBw', 'mean'), + algBw_std=('algBw', 'std'), + time_mean=('time', 'mean'), + time_std=('time', 'std'), + num_runs=('numCycle', 'count') + ) + + # Add multinode config if present + if multinode_config: + for key, value in multinode_config.items(): + agg_df[key] = value + + agg_results = [] + errors = [] + + for row_dict in agg_df.to_dict('records'): + try: + agg_results.append(RcclTestsAggregated.model_validate(row_dict)) + except ValidationError as e: + error_msg = f"Validation failed for row {row_dict}: {e}" + log.error(error_msg) + errors.append(error_msg) + + # Report any validation failures + if errors: + error_summary = "\n".join(errors) + fail_test(f"Aggregation validation failed:\n{error_summary}") + + log.info(f"Successfully validated {len(agg_results)} aggregated results") + return agg_results @@ -223,6 +305,7 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, nccl_proto='simple', gid_index=1, qp_count=1, \ start_msg_size=1024, end_msg_size='16g', \ step_function=2, threads_per_gpu=1, warmup_iterations=10, no_of_iterations=1, \ + data_types=['float'], no_of_cycles=10, \ check_iteration_count=1, debug_level='INFO', \ rccl_result_file='/tmp/rccl_result_output.json', no_of_local_ranks=8, \ ib_rx_queue_len=8192, ucx_tls='tcp', hcoll_enable_mcast_all=0, \ @@ -253,13 +336,15 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, nccl_algo, nccl_proto, gid_index, qp_count, ...: NCCL/UCX/MPI tuning parameters. start_msg_size, end_msg_size, step_function: Message size sweep setup. threads_per_gpu, warmup_iterations, check_iteration_count: Test execution tuning. + data_types: List of data types to test (e.g., ['float', 'half']). + no_of_cycles: Number of cycles to run for each data type. debug_level: NCCL_DEBUG level. rccl_result_file: Path where the RCCL test writes JSON results (-Z json -x file). verify_bus_bw: If 'True' (string), compare bus BW vs expected thresholds. exp_results_dict: Dict of expected results per test for verification. Returns: - result_out: The raw JSON string read from rccl_result_file on the head node. + all_raw_results: List of dictionaries containing all test results from all data types. """ print(f'Starting RCCL Test ..........................................{test_name}') @@ -306,7 +391,13 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, shdl.exec(cmd) - cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ + all_raw_results = [] + all_validated_results = [] + base_path = Path(rccl_result_file) + for dtype in data_types: + # Create a unique result file for each data type + dtype_result_file = f'{base_path.parent}/{base_path.stem}_{dtype}.json' + cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ --allow-run-as-root \ --hostfile /tmp/rccl_hosts_file.txt \ -x NCCL_DEBUG={debug_level} \ @@ -317,6 +408,7 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, -x PATH={PATH} \ -x LD_LIBRARY_PATH={LD_LIBRARY_PATH} \ -x NCCL_IB_HCA={ib_hca_list} \ + -x NCCL_SOCKET_IFNAME={oob_port} \ --mca btl ^vader,openib \ --mca btl_tcp_if_include {oob_port}\ -x UCX_NET_DEVICES={net_dev_list} \ @@ -336,24 +428,57 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, -x NCCL_NET_PLUGIN={nccl_net_plugin} \ {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ -g {threads_per_gpu} -c {check_iteration_count} -w {warmup_iterations} \ - -Z json -x {rccl_result_file} + -d {dtype} -N {no_of_cycles} -Z json -x {dtype_result_file} ''' - print('%%%%%%%%%%%%%%%%') - print(cmd) - print('%%%%%%%%%%%%%%%%') + print('%%%%%%%%%%%%%%%%') + print(cmd) + print('%%%%%%%%%%%%%%%%') + try: + out_dict = shdl.exec(cmd, timeout=500) + output = out_dict[head_node] + #print(output) + scan_rccl_logs(output) + except Exception as e: + log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + + # Read the JSON results emitted by the RCCL test binary + result_dict_out = shdl.exec(f'cat {dtype_result_file}') + dtype_result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Validate the results against the schema fail if results are not valid + try: + validated = [RcclTestsMultinodeRaw.model_validate(test_result) for test_result in dtype_result_out] + log.info(f'Validation passed: {len(validated)} RcclTests schema validation passed') + all_validated_results.extend(validated) + all_raw_results.extend(dtype_result_out) + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test {dtype} schema validation failed: {e}') + + # Save the results to a main result file + with open(rccl_result_file, 'w') as f: + json.dump(all_raw_results, f, indent=2) + log.info(f'Saved combined results from all data types to {rccl_result_file}') + + # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid try: - out_dict = shdl.exec(cmd, timeout=500) - output = out_dict[head_node] - #print(output) - scan_rccl_logs(output) - except Exception as e: - log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - - # Read the JSON results emitted by the RCCL test binary - result_dict_out = shdl.exec(f'cat {rccl_result_file}') - result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + if len(all_validated_results) >= 1: + aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) + log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') + # Note: currently we are saving the aggregated results, but we could instead use this for final report generation + aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' + with open(aggregated_path, 'w') as f: + json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) + log.info(f'Saved aggregated results to {aggregated_path}') + else: + log.info(f'Aggregation skipped: only one run found') + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test schema validation failed: {e}') + except ValueError as e: + log.error(f'Aggregation failed: {e}') + fail_test(f'RCCL Test aggregation failed: {e}') # Collect basic GPU information via rocm-smi @@ -364,15 +489,15 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list, # If requested, verify measured bus bandwidths against provided expected Bandwidth if re.search( 'True', verify_bus_bw, re.I ): if test_name in exp_results_dict.keys(): - check_bus_bw( test_name, result_out, exp_results_dict[test_name] ) + check_bus_bw( test_name, all_raw_results, exp_results_dict[test_name] ) if re.search( 'True', verify_bw_dip, re.I ): - check_bw_dip( test_name, result_out, ) + check_bw_dip( test_name, all_raw_results, ) if re.search( 'True', verify_lat_dip, re.I ): - check_lat_dip( test_name, result_out, ) + check_lat_dip( test_name, all_raw_results, ) - return result_out + return all_raw_results @@ -390,6 +515,7 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod nccl_proto='simple', gid_index=1, qp_count=1, \ start_msg_size=1024, end_msg_size='16g', \ step_function=2, threads_per_gpu=1, warmup_iterations=10, no_of_iterations=1, \ + data_types=['float'], no_of_cycles=10, \ check_iteration_count=1, debug_level='INFO', \ rccl_result_file='/tmp/rccl_result_output.json', no_of_local_ranks=8, \ ib_rx_queue_len=8192, ucx_tls='tcp', hcoll_enable_mcast_all=0, \ @@ -420,13 +546,15 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod nccl_algo, nccl_proto, gid_index, qp_count, ...: NCCL/UCX/MPI tuning parameters. start_msg_size, end_msg_size, step_function: Message size sweep setup. threads_per_gpu, warmup_iterations, check_iteration_count: Test execution tuning. + data_types: List of data types to test (e.g., ['float', 'half']). + no_of_cycles: Number of cycles to run for each data type. debug_level: NCCL_DEBUG level. rccl_result_file: Path where the RCCL test writes JSON results (-Z json -x file). verify_bus_bw: If 'True' (string), compare bus BW vs expected thresholds. exp_results_dict: Dict of expected results per test for verification. Returns: - result_out: The raw JSON string read from rccl_result_file on the head node. + all_raw_results: List of dictionaries containing all test results from all data types. """ print(f'Starting RCCL Test ..........................................{test_name}') @@ -473,7 +601,13 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod shdl.exec(cmd) - cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ + all_raw_results = [] + all_validated_results = [] + base_path = Path(rccl_result_file) + for dtype in data_types: + # Create a unique result file for each data type + dtype_result_file = f'{base_path.parent}/{base_path.stem}_{dtype}.json' + cmd = f'''{MPI_INSTALL_DIR}/mpirun --np {no_of_global_ranks} \ --allow-run-as-root \ --hostfile /tmp/rccl_hosts_file.txt \ -x NCCL_DEBUG={debug_level} \ @@ -484,6 +618,7 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod -x PATH={PATH} \ -x LD_LIBRARY_PATH={LD_LIBRARY_PATH} \ -x NCCL_IB_HCA={ib_hca_list} \ + -x NCCL_SOCKET_IFNAME={oob_port} \ --mca btl ^vader,openib \ --mca btl_tcp_if_include {oob_port}\ -x UCX_NET_DEVICES={net_dev_list} \ @@ -491,24 +626,57 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod -x NCCL_NET_PLUGIN={nccl_net_plugin} \ {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ -g {threads_per_gpu} -c {check_iteration_count} -w {warmup_iterations} \ - -Z json -x {rccl_result_file} + -d {dtype} -N {no_of_cycles} -Z json -x {dtype_result_file} ''' - print('%%%%%%%%%%%%%%%%') - print(cmd) - print('%%%%%%%%%%%%%%%%') + print('%%%%%%%%%%%%%%%%') + print(cmd) + print('%%%%%%%%%%%%%%%%') + try: + out_dict = shdl.exec(cmd, timeout=500) + output = out_dict[head_node] + #print(output) + scan_rccl_logs(output) + except Exception as e: + log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + + # Read the JSON results emitted by the RCCL test binary + result_dict_out = shdl.exec(f'cat {dtype_result_file}') + dtype_result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Validate the results against the schema fail if results are not valid + try: + validated = [RcclTestsMultinodeRaw.model_validate(test_result) for test_result in dtype_result_out] + log.info(f'Validation passed: {len(validated)} RcclTests schema validation passed') + all_validated_results.extend(validated) + all_raw_results.extend(dtype_result_out) + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test {dtype} schema validation failed: {e}') + + # Save the results to a main result file + with open(rccl_result_file, 'w') as f: + json.dump(all_raw_results, f, indent=2) + log.info(f'Saved combined results from all data types to {rccl_result_file}') + + # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid try: - out_dict = shdl.exec(cmd, timeout=500) - output = out_dict[head_node] - #print(output) - scan_rccl_logs(output) - except Exception as e: - log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - - # Read the JSON results emitted by the RCCL test binary - result_dict_out = shdl.exec(f'cat {rccl_result_file}') - result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + if len(all_validated_results) >= 1: + aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) + log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') + # Note: currently we are saving the aggregated results, but we could instead use this for final report generation + aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' + with open(aggregated_path, 'w') as f: + json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) + log.info(f'Saved aggregated results to {aggregated_path}') + else: + log.info(f'Aggregation skipped: only one run found') + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test schema validation failed: {e}') + except ValueError as e: + log.error(f'Aggregation failed: {e}') + fail_test(f'RCCL Test aggregation failed: {e}') # Collect basic GPU information via rocm-smi @@ -519,15 +687,15 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod # If requested, verify measured bus bandwidths against provided expected Bandwidth if re.search( 'True', verify_bus_bw, re.I ): if test_name in exp_results_dict.keys(): - check_bus_bw( test_name, result_out, exp_results_dict[test_name] ) + check_bus_bw( test_name, all_raw_results, exp_results_dict[test_name] ) if re.search( 'True', verify_bw_dip, re.I ): - check_bw_dip( test_name, result_out, ) + check_bw_dip( test_name, all_raw_results, ) if re.search( 'True', verify_lat_dip, re.I ): - check_lat_dip( test_name, result_out, ) + check_lat_dip( test_name, all_raw_results, ) - return result_out + return all_raw_results @@ -542,7 +710,8 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ step_function=2, warmup_iterations=10, no_of_iterations=1, \ check_iteration_count=1, debug_level='INFO', \ rccl_result_file='/tmp/rccl_result_output.json', no_of_local_ranks=8, \ - verify_bus_bw=False, verify_bw_dip=True, verify_lat_dip=True, exp_results_dict=None ): + data_types=['float'], no_of_cycles=10, verify_bus_bw=False, verify_bw_dip=True, + verify_lat_dip=True, exp_results_dict=None ) -> List[Dict]: """ Run an Single Node RCCL collective test @@ -556,11 +725,13 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ threads_per_gpu, warmup_iterations, check_iteration_count: Test execution tuning. debug_level: NCCL_DEBUG level. rccl_result_file: Path where the RCCL test writes JSON results (-Z json -x file). + data_types: List of data types to test (e.g., ['float', 'half']). + no_of_cycles: Number of cycles to run for each data type. verify_bus_bw: If 'True' (string), compare bus BW vs expected thresholds. exp_results_dict: Dict of expected results per test for verification. Returns: - result_out: The raw JSON string read from rccl_result_file on all nodes + all_raw_results: List of dictionaries containing all test results from all data types. """ print(f'Starting RCCL Test ..........................................{test_name}') @@ -578,47 +749,81 @@ def rccl_single_node_test( phdl, test_name, cluster_node_list, \ LD_LIBRARY_PATH=f'{RCCL_PATH}:{ROCM_PATH}/lib:$LD_LIBRARY_PATH' - - cmd = f'''export NCCL_DEBUG={debug_level}; \ - export PATH={PATH}; \ - export LD_LIBRARY_PATH={LD_LIBRARY_PATH}; \ - {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ - -g {no_of_local_ranks} -c {check_iteration_count} -w {warmup_iterations} \ - -Z json -x {rccl_result_file}''' - - print('%%%%%%%%%%%%%%%%') - print(cmd) - print('%%%%%%%%%%%%%%%%') + # Run rccl test for each data type mentioned in the data_types list + all_raw_results = [] + all_validated_results = [] + base_path = Path(rccl_result_file) + for dtype in data_types: + # Create a unique result file for each data type + dtype_result_file = f'{base_path.parent}/{base_path.stem}_{dtype}.json' + cmd = f'''export NCCL_DEBUG={debug_level}; \ + export PATH={PATH}; \ + export LD_LIBRARY_PATH={LD_LIBRARY_PATH}; \ + {RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \ + -g {no_of_local_ranks} -c {check_iteration_count} -w {warmup_iterations} \ + -d {dtype} -N {no_of_cycles} -Z json -x {dtype_result_file}''' + + print('%%%%%%%%%%%%%%%%') + print(cmd) + print('%%%%%%%%%%%%%%%%') + try: + out_dict = phdl.exec(cmd, timeout=500) + for node in out_dict.keys(): + scan_rccl_logs(out_dict[node]) + except Exception as e: + log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') + + # Read the JSON results emitted by the RCCL test binary + result_dict_out = phdl.exec(f'cat {dtype_result_file}') + dtype_result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + # Validate the results against the schema fail if results are not valid + try: + validated = [RcclTests.model_validate(test_result) for test_result in dtype_result_out] + log.info(f'Validation passed: {len(validated)} RcclTests schema validation passed') + all_validated_results.extend(validated) + all_raw_results.extend(dtype_result_out) + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test {dtype} schema validation failed: {e}') + + # Save the results to a main result file + with open(rccl_result_file, 'w') as f: + json.dump(all_raw_results, f, indent=2) + log.info(f'Saved combined results from all all data types to {rccl_result_file}') + + # Validate the results against the schema and aggregate if multiple results are found, fail if results are not valid try: - out_dict = phdl.exec(cmd, timeout=500) - for node in out_dict.keys(): - scan_rccl_logs(out_dict[node]) - except Exception as e: - log.error(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - fail_test(f'Hit Exceptions with rccl cmd {cmd} - exception {e}') - - # Read the JSON results emitted by the RCCL test binary - result_dict_out = phdl.exec(f'cat {rccl_result_file}') - result_out = json.loads(result_dict_out[head_node].replace( '\n', '').replace( '\r', '')) + if len(all_validated_results) >= 1: + aggregated_rccl_tests = aggregate_rccl_test_results(all_validated_results) + log.info(f'Aggregation passed: {len(aggregated_rccl_tests)} RcclTestsAggregated schema validation passed') + # Note: currently we are saving the aggregated results, but we could instead use this for final report generation + aggregated_path = f'{base_path.parent}/{base_path.stem}_aggregated.json' + with open(aggregated_path, 'w') as f: + json.dump([result.model_dump() for result in aggregated_rccl_tests], f, indent=2) + log.info(f'Saved aggregated results to {aggregated_path}') + else: + log.info(f'Aggregation skipped: only one run found') + except ValidationError as e: + log.error(f'Validation Failed: {e}') + fail_test(f'RCCL Test schema validation failed: {e}') + except ValueError as e: + log.error(f'Aggregation failed: {e}') + fail_test(f'RCCL Test aggregation failed: {e}') # Collect basic GPU information via rocm-smi smi_out_dict = phdl.exec('rocm-smi -a | head -30') + # NOTE: the bw & lat verification functions could also be refractored into RcclTestsRaw model to avoid code duplication # If requested, verify measured bus bandwidths against provided expected Bandwidth if re.search( 'True', verify_bus_bw, re.I ): - for node in result_dict_out.keys(): - result_out = json.loads(result_dict_out[node].replace( '\n', '').replace( '\r', '')) - if test_name in exp_results_dict.keys(): - check_bus_bw( test_name, result_out, exp_results_dict[test_name] ) + if test_name in exp_results_dict.keys(): + check_bus_bw( test_name, all_raw_results, exp_results_dict[test_name] ) if re.search( 'True', verify_bw_dip, re.I ): - for node in result_dict_out.keys(): - result_out = json.loads(result_dict_out[node].replace( '\n', '').replace( '\r', '')) - check_bw_dip( test_name, result_out, ) + check_bw_dip( test_name, all_raw_results, ) if re.search( 'True', verify_lat_dip, re.I ): - for node in result_dict_out.keys(): - result_out = json.loads(result_dict_out[node].replace( '\n', '').replace( '\r', '')) - check_lat_dip( test_name, result_out, ) + check_lat_dip( test_name, all_raw_results, ) - return result_out + return all_raw_results diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 00000000..14ca68f7 --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,13 @@ +"""CVS Pydantic models for data validation.""" + +from models.rccl import ( + RcclTests, + RcclTestsMultinodeRaw, + RcclTestsAggregated, +) + +__all__ = [ + 'RcclTests', + 'RcclTestsMultinodeRaw', + 'RcclTestsAggregated', +] \ No newline at end of file diff --git a/models/rccl.py b/models/rccl.py new file mode 100644 index 00000000..c150ef36 --- /dev/null +++ b/models/rccl.py @@ -0,0 +1,106 @@ +# std libs +from typing import Annotated, Literal, Optional +import math + +#pypdantic libs +from pydantic import BaseModel, Field, model_validator, ConfigDict, field_validator + +NonNegativeInt = Annotated[int, Field(ge=0)] +PositiveInt = Annotated[int, Field(gt=0)] +NonNegativeFloat = Annotated[float, Field(ge=0.0)] +Collective = Literal['AllReduce', 'AllGather', 'Scatter', 'Gather', 'ReduceScatter', 'SendRecv', 'AllToAll', 'AllToAllV', 'Broadcast'] +Type = Literal[ + 'int8', 'int32', 'int64', + 'uint8', 'uint32', 'uint64', + 'float', 'double', + 'half', 'bfloat16', + 'fp8_e4m3', 'fp8_e5m2' +] +Redop = Literal['sum', 'prod', 'min', 'max', 'avg', 'all', 'none'] +InPlace = Literal[0, 1] + + +class RcclTests(BaseModel): + model_config = ConfigDict(frozen=True) + numCycle: NonNegativeInt + name: Collective + size: PositiveInt + type: Type + redop: Redop + inPlace: InPlace + time: NonNegativeFloat + algBw: NonNegativeFloat + busBw: NonNegativeFloat + wrong: int + + @field_validator('time', 'algBw', 'busBw') + @classmethod + def validate_not_nan_inf(cls, v: float, info) -> float: + """Ensure no NaN/Inf values in measurements.""" + if math.isnan(v) or math.isinf(v): + raise ValueError(f'{info.field_name} cannot be NaN/Inf, got {v}') + return v + +class RcclTestsMultinodeRaw(RcclTests): + """ + This class represents the schema for multi node rccl-test results, while serializing rccl-test input + if we don't adhere to this schema, we fail immediately preventing weird behaviour later on + in the processing pipeline + """ + nodes: PositiveInt + ranks: PositiveInt + ranksPerNode: PositiveInt + gpusPerRank: PositiveInt + + @model_validator(mode='after') + def validate_ranks_relationship(self): + """Ensure ranks = nodes * ranksPerNode.""" + expected_ranks = self.nodes * self.ranksPerNode + if self.ranks != expected_ranks: + raise ValueError( + f"ranks ({self.ranks}) must equal nodes ({self.nodes}) × " + f"ranksPerNode ({self.ranksPerNode}) = {expected_ranks}" + ) + return self + +class RcclTestsAggregated(BaseModel): + """ + This class represents the aggregated schema for rccl-test results + """ + # Grouping keys + model_config = ConfigDict(frozen=True, populate_by_name=True) + name: Collective = Field(alias='collective') + size: PositiveInt + type: Type + inPlace: InPlace + + #Metadata + num_runs: PositiveInt = Field(description='Number of cycles aggregated') + + # Aggregated metrics + busBw_mean: NonNegativeFloat + busBw_std: NonNegativeFloat + algBw_mean: NonNegativeFloat + algBw_std: NonNegativeFloat + time_mean: NonNegativeFloat + time_std: NonNegativeFloat + + # Multinode metadata (optional, None for single-node tests) + nodes: Optional[PositiveInt] = None + ranks: Optional[PositiveInt] = None + ranksPerNode: Optional[PositiveInt] = None + gpusPerRank: Optional[PositiveInt] = None + + @field_validator('busBw_std', 'algBw_std', 'time_std') + @classmethod + def handle_nan_std(cls, v: float, info) -> float: + """ + Convert NaN (from single-value std) to 0.0. + Pandas returns NaN for std of single value, which is correct mathematically, + but we interpret it as 0 variability. + """ + if math.isnan(v): + return 0.0 + if math.isinf(v): + raise ValueError(f'{info.field_name} cannot be Inf') + return v \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f88fbab6..11f34007 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ pytest-html pytest-repeat pytest-dependency xlsxwriter +pydantic>=2.0 +pandas \ No newline at end of file diff --git a/tests/rccl/rccl_multinode_cvs.py b/tests/rccl/rccl_multinode_cvs.py index 93cb282f..2aa6150e 100644 --- a/tests/rccl/rccl_multinode_cvs.py +++ b/tests/rccl/rccl_multinode_cvs.py @@ -407,6 +407,8 @@ def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective, rccl_ end_msg_size = config_dict['end_msg_size'], \ step_function = config_dict['step_function'], \ threads_per_gpu = config_dict['threads_per_gpu'], \ + data_types = config_dict['data_types'], \ + no_of_cycles = config_dict['no_of_cycles'], \ warmup_iterations = config_dict['warmup_iterations'], \ no_of_iterations = config_dict['no_of_iterations'], \ check_iteration_count = config_dict['check_iteration_count'], \ diff --git a/tests/rccl/rccl_multinode_default_cvs.py b/tests/rccl/rccl_multinode_default_cvs.py index 36800ef4..40d44be1 100644 --- a/tests/rccl/rccl_multinode_default_cvs.py +++ b/tests/rccl/rccl_multinode_default_cvs.py @@ -345,6 +345,8 @@ def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective ): end_msg_size = config_dict['end_msg_size'], \ step_function = config_dict['step_function'], \ threads_per_gpu = config_dict['threads_per_gpu'], \ + data_types = config_dict['data_types'], \ + no_of_cycles = config_dict['no_of_cycles'], \ warmup_iterations = config_dict['warmup_iterations'], \ no_of_iterations = config_dict['no_of_iterations'], \ check_iteration_count = config_dict['check_iteration_count'], \ diff --git a/tests/rccl/rccl_singlenode_cvs.py b/tests/rccl/rccl_singlenode_cvs.py index 3cd62bb5..3ee6142f 100644 --- a/tests/rccl/rccl_singlenode_cvs.py +++ b/tests/rccl/rccl_singlenode_cvs.py @@ -309,6 +309,8 @@ def test_singlenode_perf(phdl, cluster_dict, config_dict, rccl_collective ): debug_level = config_dict['debug_level'], \ rccl_result_file = config_dict['rccl_result_file'], \ no_of_local_ranks = config_dict['no_of_local_ranks'], \ + data_types = config_dict['data_types'], \ + no_of_cycles = config_dict['no_of_cycles'], \ verify_bus_bw = config_dict['verify_bus_bw'], \ verify_bw_dip = config_dict['verify_bw_dip'], \ verify_lat_dip = config_dict['verify_lat_dip'], \