Skip to content
3 changes: 1 addition & 2 deletions tests/integration-tests/tests/common/assertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,7 @@ def _assert_build_image_stack_deleted(stack_name, region, timeout_seconds=600, p
pytest.fail(f"Timed-out waiting for stack {stack_name} deletion (last status: {last_status})")


def assert_regex_in_file(cluster: Cluster, compute_node_ip: str, file_name: str, pattern: str, negate: bool = True):
rce = RemoteCommandExecutor(cluster, compute_node_ip)
def assert_regex_in_file(rce: RemoteCommandExecutor, file_name: str, pattern: str, negate: bool = True):
file_content = read_remote_file(rce, file_name)
assertion = assert_that(bool(re.search(pattern, file_content, re.IGNORECASE)))
assertion.is_false() if negate else assertion.is_fals()
29 changes: 13 additions & 16 deletions tests/integration-tests/tests/ultraserver/test_gb200.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,27 @@ def submit_job_imex_status(rce: RemoteCommandExecutor, queue: str, max_nodes: in
return job_id


def assert_imex_nodes_config_is_correct(
rce: RemoteCommandExecutor, queue_name: str, compute_resource_name: str, expected_ips: list
):
logging.info(f"Checking IMEX nodes config contains the expected nodes: {expected_ips}")
imex_nodes_config_file = (
f"/opt/parallelcluster/shared/nvidia-imex/nodes_config_{queue_name}_{compute_resource_name}.cfg"
)
imex_config_content = read_remote_file(rce, imex_nodes_config_file)
imex_config_content_clean = [line for line in imex_config_content.split("\n") if not line.strip().startswith("#")]
actual_ips = [ip.strip() for ip in imex_config_content_clean]
assert_that(actual_ips).contains_only(*expected_ips)
logging.info(f"IMEX nodes config {imex_nodes_config_file} contains the expected nodes: {expected_ips}")
def assert_imex_nodes_config_is_correct(cluster: Cluster, queue: str, compute_resource: str, expected_ips: list):
for compute_node_ip in cluster.get_compute_nodes_private_ip(queue, compute_resource):
logging.info(f"Checking IMEX nodes config for compute node {compute_node_ip} contains the expected nodes: {expected_ips}")
rce = RemoteCommandExecutor(cluster, compute_node_ip=compute_node_ip)
imex_config_content = read_remote_file(rce, "/etc/nvidia-imex/nodes_config.cfg")
imex_config_content_clean = [line for line in imex_config_content.split("\n") if not line.strip().startswith("#")]
actual_ips = [ip.strip() for ip in imex_config_content_clean]
assert_that(actual_ips).contains_only(*expected_ips)
logging.info(f"IMEX nodes config for compute node {compute_node_ip} contains the expected nodes: {expected_ips}")


def assert_no_errors_in_logs(cluster: Cluster, queue: str, compute_resource: str):
rce = RemoteCommandExecutor(cluster)
logs = ["/var/log/nvidia-imex-verbose.log", "/var/log/parallelcluster/nvidia-imex-prolog.log"]
for compute_node_ip in cluster.get_compute_nodes_private_ip(queue, compute_resource):
rce = RemoteCommandExecutor(cluster, compute_node_ip=compute_node_ip)
for log in logs:
logging.info(f"Checking file {log} log does not contain any error")
if log == "/var/log/nvidia-imex-verbose.log" and not is_existing_remote_file(rce, log):
logging.info("IMEX log file not found. Not an issue as IMEX writes logs there only in case of errors.")
continue
assert_regex_in_file(cluster, compute_node_ip, log, r"(warn|error|fail)", negate=True)
assert_regex_in_file(rce, log, r"(warn|error|fail)", negate=True)


def assert_imex_status(
Expand Down Expand Up @@ -210,7 +207,7 @@ def _check_imex_healthy():
f"Private IP addresses for nodes in queue {queue} and compute resource {compute_resource}: " f"{ips}"
)

assert_imex_nodes_config_is_correct(rce, queue, compute_resource, ips)
assert_imex_nodes_config_is_correct(cluster, queue, compute_resource, ips)
assert_imex_status(rce, job_id, ips, service_status="UP", node_status="READY", connection_status="CONNECTED")
assert_no_errors_in_logs(cluster, queue, compute_resource)

Expand Down Expand Up @@ -240,7 +237,7 @@ def assert_imex_not_configured(cluster: Cluster, queue: str, compute_resource: s

job_id = submit_job_imex_status(rce, queue, max_nodes)

assert_imex_nodes_config_is_correct(rce, queue, compute_resource, FAKE_IPS)
assert_imex_nodes_config_is_correct(cluster, queue, compute_resource, FAKE_IPS)
assert_imex_status(
rce, job_id, FAKE_IPS, service_status="DOWN", node_status="UNAVAILABLE", connection_status="INVALID"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,40 +91,9 @@ function write_file() {
return 1 # Not Updated
fi

# Try to acquire lock with timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we do keep this as part of the prolog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor

@himani2411 himani2411 Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a deadlock prevention, even through we added it for shared file.
Any scenario where more processes access this file and we end up in a deadlock scenario can be prevented if we keep it and we have logs showing that we were in deadlock

(
if ! flock -x -w ${_lock_timeout_seconds} 200; then
# If timeout, assume deadlock and try to recover
info "Lock timeout after ${_lock_timeout_seconds}s, attempting deadlock recovery"
exit 1
fi
echo "${_content}" > "${_file}"
) 200>"${_lock_file}"

local _lock_result=$?

if [[ ${_lock_result} -eq 0 ]]; then
return 0 # Updated successfully
fi

# Deadlock recovery: remove stale lock file and retry once
error "Potential deadlock detected for ${_file}, attempting recovery"
rm -f "${_lock_file}"
sleep 1 # Brief pause to avoid race conditions

(
if ! flock -x -w 10 200; then
exit 1
fi
echo "${_content}" > "${_file}"
) 200>"${_lock_file}"

if [[ $? -eq 0 ]]; then
info "Lock acquired after deadlock recovery for ${_file}"
return 0 # Updated
fi

error_exit "Failed to acquire lock for ${_file} even after deadlock recovery"
echo "${_content}" > "${_file}"
info "File ${_file} updated"
return 0 # Updated
}

function reload_imex() {
Expand Down Expand Up @@ -171,8 +140,8 @@ function create_default_imex_channel() {
COMPUTE_RESOURCE_NAME=$(get_compute_resource_name "${QUEUE_NAME}-st-" $SLURMD_NODENAME)
CR_NODES=$(get_node_names "${QUEUE_NAME}" "${COMPUTE_RESOURCE_NAME}")
IPS_FROM_CR=$(get_ips_from_node_names "${CR_NODES}")
IMEX_MAIN_CONFIG="/opt/parallelcluster/shared/nvidia-imex/config_${QUEUE_NAME}_${COMPUTE_RESOURCE_NAME}.cfg"
IMEX_NODES_CONFIG="/opt/parallelcluster/shared/nvidia-imex/nodes_config_${QUEUE_NAME}_${COMPUTE_RESOURCE_NAME}.cfg"
IMEX_MAIN_CONFIG="/etc/nvidia-imex/config.cfg"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to chnage the nvidia-imex-status.job file which points to using a config specific file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, done!

IMEX_NODES_CONFIG="/etc/nvidia-imex/nodes_config.cfg"

info "Queue Name: ${QUEUE_NAME}"
info "CR Name: ${COMPUTE_RESOURCE_NAME}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@
sleep 45
QUEUE_NAME=$(cat "/etc/chef/dna.json" | jq -r ".cluster.scheduler_queue_name")
COMPUTE_RES_NAME=$(cat "/etc/chef/dna.json" | jq -r ".cluster.scheduler_compute_resource_name")
IMEX_CONFIG_FILE="/opt/parallelcluster/shared/nvidia-imex/config_${QUEUE_NAME}_${COMPUTE_RES_NAME}.cfg"

srun bash -c "/usr/bin/nvidia-imex-ctl -N -j -c ${IMEX_CONFIG_FILE} > result_\${SLURM_JOB_ID}_\$(hostname).out 2> result_\${SLURM_JOB_ID}_\$(hostname).err"
srun bash -c "/usr/bin/nvidia-imex-ctl -N -j > result_\${SLURM_JOB_ID}_\$(hostname).out 2> result_\${SLURM_JOB_ID}_\$(hostname).err"
Loading