Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions tests/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
delete_s3_bucket,
generate_stack_name,
get_architecture_supported_by_instance_type,
get_instance_info,
get_vpc_snakecase_value,
random_alphanumeric,
set_credentials,
Expand Down Expand Up @@ -826,6 +827,24 @@ def architecture(request, instance, region):
return supported_architecture


@pytest.fixture()
def default_threads_per_core(request, instance, region):
"""Return the default threads per core for the given instance type."""
# NOTE: currently, .metal instances do not contain the DefaultThreadsPerCore
# attribute in their VCpuInfo section. This is a known limitation with the
# ec2 DescribeInstanceTypes API. For these instance types an assumption
# is made that if the instance's supported architectures list includes
# x86_64 then the default is 2, otherwise it's 1.
logging.info(f"Getting defaul threads per core for instance type {instance}")
instance_type_data = get_instance_info(instance, region)
threads_per_core = instance_type_data.get("VCpuInfo", {}).get("DefaultThreadsPerCore")
if threads_per_core is None:
supported_architectures = instance_type_data.get("ProcessorInfo", {}).get("SupportedArchitectures", [])
threads_per_core = 2 if "x86_64" in supported_architectures else 1
logging.info(f"Defaul threads per core for instance type {instance} : {threads_per_core}")
return threads_per_core


@pytest.fixture(scope="session")
def key_name(request):
"""Return the EC2 key pair name to be used."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,22 @@
# HT disabled via CpuOptions
@pytest.mark.dimensions("sa-east-1", "c5.xlarge", "alinux2", "sge")
@pytest.mark.dimensions("sa-east-1", "c5.xlarge", "centos7", "torque")
def test_sit_disable_hyperthreading(region, scheduler, instance, os, pcluster_config_reader, clusters_factory):
def test_sit_disable_hyperthreading(
region, scheduler, instance, os, pcluster_config_reader, clusters_factory, default_threads_per_core
):
"""Test Disable Hyperthreading for SIT clusters."""
slots_per_instance = fetch_instance_slots(region, instance)
cluster_config = pcluster_config_reader()
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor)
_test_disable_hyperthreading_settings(remote_command_executor, scheduler_commands, slots_per_instance, scheduler)
_test_disable_hyperthreading_settings(
remote_command_executor,
scheduler_commands,
slots_per_instance,
scheduler,
default_threads_per_core=default_threads_per_core,
)

assert_no_errors_in_logs(remote_command_executor, scheduler)

Expand All @@ -47,7 +55,9 @@ def test_sit_disable_hyperthreading(region, scheduler, instance, os, pcluster_co
@pytest.mark.dimensions("us-west-2", "m4.xlarge", "centos8", "slurm")
# HT disabled via CpuOptions
@pytest.mark.dimensions("us-west-1", "c5.xlarge", "ubuntu1804", "slurm")
def test_hit_disable_hyperthreading(region, scheduler, instance, os, pcluster_config_reader, clusters_factory):
def test_hit_disable_hyperthreading(
region, scheduler, instance, os, pcluster_config_reader, clusters_factory, default_threads_per_core
):
"""Test Disable Hyperthreading for HIT clusters."""
slots_per_instance = fetch_instance_slots(region, instance)
cluster_config = pcluster_config_reader()
Expand All @@ -61,6 +71,7 @@ def test_hit_disable_hyperthreading(region, scheduler, instance, os, pcluster_co
scheduler,
hyperthreading_disabled=False,
partition="ht-enabled",
default_threads_per_core=default_threads_per_core,
)
_test_disable_hyperthreading_settings(
remote_command_executor,
Expand All @@ -69,6 +80,7 @@ def test_hit_disable_hyperthreading(region, scheduler, instance, os, pcluster_co
scheduler,
hyperthreading_disabled=True,
partition="ht-disabled",
default_threads_per_core=default_threads_per_core,
)

assert_no_errors_in_logs(remote_command_executor, scheduler)
Expand All @@ -81,17 +93,20 @@ def _test_disable_hyperthreading_settings(
scheduler,
hyperthreading_disabled=True,
partition=None,
default_threads_per_core=2,
):
expected_cpus_per_instance = slots_per_instance // 2 if hyperthreading_disabled else slots_per_instance
expected_threads_per_core = 1 if hyperthreading_disabled else 2
expected_cpus_per_instance = (
slots_per_instance // default_threads_per_core if hyperthreading_disabled else slots_per_instance
)
expected_threads_per_core = 1 if hyperthreading_disabled else default_threads_per_core

# Test disable hyperthreading on head node
logging.info("Test Disable Hyperthreading on head node")
result = remote_command_executor.run_remote_command("lscpu")
if partition:
# If partition is supplied, assume this is HIT setting where ht settings are at the queue level
# In this case, ht is not disabled on head node
assert_that(result.stdout).matches(r"Thread\(s\) per core:\s+{0}".format(2))
assert_that(result.stdout).matches(r"Thread\(s\) per core:\s+{0}".format(default_threads_per_core))
_assert_active_cpus(result.stdout, slots_per_instance)
else:
assert_that(result.stdout).matches(r"Thread\(s\) per core:\s+{0}".format(expected_threads_per_core))
Expand Down Expand Up @@ -128,10 +143,12 @@ def _test_disable_hyperthreading_settings(
# check scale up to 2 nodes
if partition:
result = scheduler_commands.submit_command(
"hostname > /shared/hostname.out", slots=slots_per_instance, partition=partition
"hostname > /shared/hostname.out", slots=2 * expected_cpus_per_instance, partition=partition
)
else:
result = scheduler_commands.submit_command("hostname > /shared/hostname.out", slots=slots_per_instance)
result = scheduler_commands.submit_command(
"hostname > /shared/hostname.out", slots=2 * expected_cpus_per_instance
)
job_id = scheduler_commands.assert_job_submitted(result.stdout)
scheduler_commands.wait_job_completed(job_id)
scheduler_commands.assert_job_succeeded(job_id)
Expand Down