diff --git a/input/config_file/rccl/rccl_config.json b/input/config_file/rccl/rccl_config.json
index 813bce0f..94666f66 100644
--- a/input/config_file/rccl/rccl_config.json
+++ b/input/config_file/rccl/rccl_config.json
@@ -41,6 +41,7 @@
"nccl_ib_tc": "0",
"nccl_ib_split_data_on_qps": "0",
"nccl_pxn_disable": [ "0", "1" ],
+ "nccl_p2p_batch_enable": [ "0", "1" ],
"nccl_net_plugin": "none",
"verify_bus_bw": "False",
"verify_bw_dip": "True",
diff --git a/lib/html_lib.py b/lib/html_lib.py
index e24ac1c7..79f73faf 100644
--- a/lib/html_lib.py
+++ b/lib/html_lib.py
@@ -1100,6 +1100,7 @@ def build_rccl_result_table( filename, res_dict ):
Protocol |
QP_count |
PXN_DISABLE |
+ P2P_BATCH_ENABLE |
Msg Size |
Algo BW GB/s |
Bus BW GB/s |
@@ -1108,7 +1109,7 @@ def build_rccl_result_table( filename, res_dict ):
'''
fp.write(html_lines)
for key_nam in res_dict.keys():
- (collective,algo,protocol,qp_count,pxn_disable) = key_nam.split("-")
+ (collective,algo,protocol,qp_count,pxn_disable,p2p_batch_enable) = key_nam.split("-")
last_bw = 0.0
last_time = 0
for msg_size in res_dict[key_nam].keys():
@@ -1121,6 +1122,7 @@ def build_rccl_result_table( filename, res_dict ):
{protocol} |
{qp_count} |
{pxn_disable} |
+ {p2p_batch_enable} |
{msg_size} |
{res_dict[key_nam][msg_size]['alg_bw']} |
'''
diff --git a/lib/rccl_lib.py b/lib/rccl_lib.py
index 43af1058..821a5375 100644
--- a/lib/rccl_lib.py
+++ b/lib/rccl_lib.py
@@ -312,7 +312,7 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list,
ib_rx_queue_len=8192, ucx_tls='tcp', hcoll_enable_mcast_all=0, \
nccl_cumem_enable=0, nccl_ib_timeout=30, nccl_ib_sl=0, \
nccl_ib_tc=41, nccl_ib_split_data_on_qps=0, nccl_pxn_disable=1, \
- nccl_net_plugin=None, user_password=None, \
+ nccl_p2p_batch_enable=1, nccl_net_plugin=None, user_password=None, \
min_channels=64, max_channels=64, \
data_type="float", \
user_key_file=None, verify_bus_bw=False, \
@@ -418,6 +418,7 @@ def rccl_cluster_test( phdl, shdl, test_name, cluster_node_list, vpc_node_list,
-x NCCL_IB_TC={nccl_ib_tc} \
-x NCCL_IB_SPLIT_DATA_ON_QPS={nccl_ib_split_data_on_qps} \
-x NCCL_PXN_DISABLE={nccl_pxn_disable} \
+ -x RCCL_P2P_BATCH_ENABLE={nccl_p2p_batch_enable} \
-x NCCL_NET_PLUGIN={nccl_net_plugin} \
{RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \
-g {threads_per_gpu} -c {check_iteration_count} -w {warmup_iterations} \
@@ -482,7 +483,7 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod
ib_rx_queue_len=8192, ucx_tls='tcp', hcoll_enable_mcast_all=0, \
nccl_cumem_enable=0, nccl_ib_timeout=30, nccl_ib_sl=0, \
nccl_ib_tc=41, nccl_ib_split_data_on_qps=0, nccl_pxn_disable=1, \
- nccl_net_plugin=None, user_password=None, \
+ nccl_p2p_batch_enable=1, nccl_net_plugin=None, user_password=None, \
min_channels=64, max_channels=64, \
user_key_file=None, verify_bus_bw=False, \
verify_bw_dip=True, verify_lat_dip=True, exp_results_dict=None ):
@@ -584,6 +585,7 @@ def rccl_cluster_test_default( phdl, shdl, test_name, cluster_node_list, vpc_nod
--mca btl_tcp_if_exclude lo,docker0,usb0 \
-x UCX_NET_DEVICES={net_dev_list} \
-x UCX_TLS={ucx_tls} \
+ -x RCCL_P2P_BATCH_ENABLE={nccl_p2p_batch_enable} \
-x NCCL_NET_PLUGIN={nccl_net_plugin} \
{RCCL_TESTS_INSTALL_DIR}/{test_name} -b {start_msg_size} -e {end_msg_size} -f {step_function} \
-g {threads_per_gpu} -c {check_iteration_count} -w {warmup_iterations} \
diff --git a/tests/rccl/README.md b/tests/rccl/README.md
index fb371a7d..291d525c 100644
--- a/tests/rccl/README.md
+++ b/tests/rccl/README.md
@@ -11,3 +11,8 @@ This Pytest script can be run in the following fashion (for the details on argum
(myenv) [user@host]~/cvs:(main)$pytest -vvv --log-file=/tmp/test.log -s ./tests/rccl/rccl_multinode_cvs.py --cluster_file input/cluster_file/cluster.json --config_file input/config_file/rccl/rccl_config.json --html=/var/www/html/cvs/rccl.html --capture=tee-sys --self-contained-html
```
+
+## P2P Batching Restrictions
+
+- `RCCL_P2P_BATCH_ENABLE=1` is only tested on clusters with ≤32 nodes to avoid hangs observed with alltoall/alltoallv operations in larger clusters.
+- For clusters >32 nodes, only `RCCL_P2P_BATCH_ENABLE=0` is tested.
diff --git a/tests/rccl/rccl_multinode_cvs.py b/tests/rccl/rccl_multinode_cvs.py
index 93cb282f..81f19dea 100644
--- a/tests/rccl/rccl_multinode_cvs.py
+++ b/tests/rccl/rccl_multinode_cvs.py
@@ -227,6 +227,9 @@ def pytest_generate_tests(metafunc):
cfg = json.load(fp)
rccl = cfg.get("rccl", {})
+ # Get node count for P2P batching restriction
+ num_nodes = int(rccl.get("no_of_nodes", "2"))
+
# Defaults (dedup'd)
rccl_collective_list = rccl.get(
"rccl_collective",
@@ -243,8 +246,14 @@ def pytest_generate_tests(metafunc):
qp_scale_list = rccl.get("qp_scale", ["1", "2"])
nccl_pxn_disable_list = rccl.get("nccl_pxn_disable", [ "1", "0" ])
+ # P2P batching is only safe and effective for <=32 nodes
+ if num_nodes <= 32:
+ nccl_p2p_batch_enable_list = ["0", "1"]
+ else:
+ nccl_p2p_batch_enable_list = ["0"] # Only test disabled to avoid hangs
+
# Only parametrize fixtures used by this test
- all_keys = ("rccl_collective", "rccl_algo", "rccl_protocol", "qp_scale", "nccl_pxn_disable")
+ all_keys = ("rccl_collective", "rccl_algo", "rccl_protocol", "qp_scale", "nccl_pxn_disable", "nccl_p2p_batch_enable")
active = [k for k in all_keys if k in metafunc.fixturenames]
if not active:
@@ -256,6 +265,7 @@ def pytest_generate_tests(metafunc):
"rccl_protocol": rccl_protocol_list,
"qp_scale": qp_scale_list,
"nccl_pxn_disable": nccl_pxn_disable_list,
+ "nccl_p2p_batch_enable": nccl_p2p_batch_enable_list,
}
domains = [domain_by_key[k] for k in active]
@@ -329,7 +339,7 @@ def test_disable_firewall( phdl ):
def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective, rccl_algo, \
- rccl_protocol, qp_scale, nccl_pxn_disable ):
+ rccl_protocol, qp_scale, nccl_pxn_disable, nccl_p2p_batch_enable ):
"""
Execute RCCL performance test across the cluster with given parameters.
@@ -422,6 +432,7 @@ def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective, rccl_
nccl_ib_tc = config_dict['nccl_ib_tc'], \
nccl_ib_split_data_on_qps = config_dict['nccl_ib_split_data_on_qps'], \
nccl_pxn_disable = nccl_pxn_disable, \
+ nccl_p2p_batch_enable = nccl_p2p_batch_enable, \
nccl_net_plugin = config_dict['nccl_net_plugin'], \
user_key_file = cluster_dict['priv_key_file'], \
verify_bus_bw = config_dict['verify_bus_bw'], \
@@ -432,7 +443,7 @@ def test_rccl_perf(phdl, shdl, cluster_dict, config_dict, rccl_collective, rccl_
print(result_dict)
- key_name = f'{rccl_collective}-{rccl_algo}-{rccl_protocol}-{qp_scale}-{nccl_pxn_disable}'
+ key_name = f'{rccl_collective}-{rccl_algo}-{rccl_protocol}-{qp_scale}-{nccl_pxn_disable}-{nccl_p2p_batch_enable}'
rccl_res_dict[key_name] = result_dict
# Scan dmesg between start and end times cluster wide ..