Skip to content

Commit

Permalink
integ tests: various fixes for EFA tests
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco De Martino <fdm@amazon.com>
  • Loading branch information
demartinofra committed Jun 7, 2019
1 parent 4a81d77 commit e997919
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 91 deletions.
8 changes: 6 additions & 2 deletions tests/integration-tests/remote_command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def run_remote_command(
raise RemoteCommandExecutionError(result)
return result

def run_remote_script(self, script_file, args=None, log_error=True, additional_files=None):
def run_remote_script(self, script_file, args=None, log_error=True, additional_files=None, hide=False):
"""
Execute a script remotely on the cluster master node.
Expand All @@ -93,14 +93,18 @@ def run_remote_script(self, script_file, args=None, log_error=True, additional_f
:param args: args to pass to the script when invoked.
:param log_error: log errors.
:param additional_files: additional files to copy before executing script.
:param hide: do not print command output to the local stdout
:return: result of the execution.
"""
script_name = os.path.basename(script_file)
self.__connection.put(script_file, script_name)
if not args:
args = []
return self.run_remote_command(
["/bin/bash", "--login", script_name] + args, log_error=log_error, additional_files=additional_files
["/bin/bash", "--login", script_name] + args,
log_error=log_error,
additional_files=additional_files,
hide=hide,
)

def _copy_additional_files(self, files):
Expand Down
61 changes: 19 additions & 42 deletions tests/integration-tests/tests/common/schedulers_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,7 @@ def get_job_exit_status(self, job_id):
pass

@abstractmethod
def submit_interactive_command(self, command, nodes=1):
"""
Submit a interactive command to the scheduler.
:param command: command to submit.
:return: result from remote command execution.
"""
pass

@abstractmethod
def submit_command(self, command, nodes=1):
def submit_command(self, command, nodes=1, slots=None):
"""
Submit a job to the scheduler.
Expand All @@ -76,7 +66,7 @@ def submit_command(self, command, nodes=1):
pass

@abstractmethod
def submit_script(self, script, nodes=1, additional_files=None):
def submit_script(self, script, nodes=1, slots=None, additional_files=None):
"""
Submit a job to the scheduler by using a script file.
Expand Down Expand Up @@ -125,13 +115,10 @@ def assert_job_submitted(self, awsbsub_output): # noqa: D102
assert_that(match).is_not_none()
return match.group(1)

def submit_interactive_command(self, command, nodes=1): # noqa: D102
raise NotImplementedError

def submit_command(self, command, nodes=1): # noqa: D102
def submit_command(self, command, nodes=1, slots=None): # noqa: D102
return self._remote_command_executor.run_remote_command('echo "{0}" | awsbsub -n {1}'.format(command, nodes))

def submit_script(self, script, nodes=1, additional_files=None): # noqa: D102
def submit_script(self, script, nodes=1, additional_files=None, slots=None): # noqa: D102
raise NotImplementedError

def assert_job_succeeded(self, job_id, children_number=0): # noqa: D102
Expand Down Expand Up @@ -174,16 +161,6 @@ def assert_job_submitted(self, qsub_output, is_array=False): # noqa: D102
assert_that(match).is_not_none()
return match.group(1)

def submit_interactive_command(self, command, nodes=1, slots=None): # noqa: D102
flags = ""
if nodes != 1:
raise Exception("SGE does not support nodes option")
if slots:
flags += "-pe mpi {0} ".format(slots)
return self._remote_command_executor.run_remote_command(
"echo '{0}' | qrsh {1}".format(command, flags), raise_on_error=False
)

def submit_command(self, command, nodes=1, slots=None, hold=False): # noqa: D102
flags = ""
if nodes != 1:
Expand All @@ -196,13 +173,16 @@ def submit_command(self, command, nodes=1, slots=None, hold=False): # noqa: D10
"echo '{0}' | qsub {1}".format(command, flags), raise_on_error=False
)

def submit_script(self, script, nodes=1, additional_files=None): # noqa: D102
def submit_script(self, script, nodes=1, slots=None, additional_files=None): # noqa: D102
if not additional_files:
additional_files = []
additional_files.append(script)
flags = ""
if slots:
flags += "-pe mpi {0} ".format(slots)
script_name = os.path.basename(script)
return self._remote_command_executor.run_remote_command(
"qsub {0}".format(script_name), additional_files=additional_files
"qsub {0} {1}".format(flags, script_name), additional_files=additional_files
)

def assert_job_succeeded(self, job_id, children_number=0): # noqa: D102
Expand Down Expand Up @@ -243,32 +223,32 @@ def assert_job_submitted(self, sbatch_output): # noqa: D102
assert_that(match).is_not_none()
return match.group(1)

def submit_interactive_command(self, command, nodes=1, host=None): # noqa: D102
submission_command = "srun -N {0} --wrap='{1}'".format(nodes, command)
if host:
submission_command += " --nodelist={0}".format(host)
return self._remote_command_executor.run_remote_command(submission_command)

def submit_command(self, command, nodes=1, host=None): # noqa: D102
def submit_command(self, command, nodes=1, slots=None, host=None): # noqa: D102
submission_command = "sbatch -N {0} --wrap='{1}'".format(nodes, command)
if host:
submission_command += " --nodelist={0}".format(host)
if slots:
submission_command += " -n {0}".format(slots)
return self._remote_command_executor.run_remote_command(submission_command)

def submit_script(self, script, nodes=1, host=None, additional_files=None): # noqa: D102
def submit_script(self, script, nodes=1, slots=None, host=None, additional_files=None): # noqa: D102
if not additional_files:
additional_files = []
additional_files.append(script)
script_name = os.path.basename(script)
submission_command = "sbatch"
if host:
submission_command += " --nodelist={0}".format(host)
submission_command += " -N {0} {1}".format(nodes, script_name)
if slots:
submission_command += " -n {0}".format(slots)
if nodes > 1:
submission_command += " -N {0}".format(slots)
submission_command += " {1}".format(nodes, script_name)
return self._remote_command_executor.run_remote_command(submission_command, additional_files=additional_files)

def assert_job_succeeded(self, job_id, children_number=0): # noqa: D102
result = self._remote_command_executor.run_remote_command("scontrol show jobs -o {0}".format(job_id))
return "JobState=COMPLETED" in result.stdout
assert_that(result.stdout).contains("JobState=COMPLETED")

def compute_nodes_count(self): # noqa: D102
result = self._remote_command_executor.run_remote_command("sinfo --Node --noheader | grep compute | wc -l")
Expand Down Expand Up @@ -297,9 +277,6 @@ def get_job_exit_status(self, job_id): # noqa: D102
def assert_job_submitted(self, qsub_output): # noqa: D102
raise NotImplementedError

def submit_interactive_command(self, command, nodes=1): # noqa: D102
raise NotImplementedError

def submit_command(self, command): # noqa: D102
raise NotImplementedError

Expand Down
72 changes: 42 additions & 30 deletions tests/integration-tests/tests/test_efa/test_efa.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,79 @@
from remote_command_executor import RemoteCommandExecutor
from tests.common.schedulers_common import get_scheduler_commands

INSTANCES_TO_SLOTS_MAP = {"c5n.18xlarge": 72, "p3dn.24xlarge": 96, "i3en.24xlarge": 96}

@pytest.mark.regions(["us-east-1"])

@pytest.mark.regions(["us-east-1", "eu-west-1"])
@pytest.mark.instances(["c5n.18xlarge", "p3dn.24xlarge", "i3en.24xlarge"])
@pytest.mark.oss(["alinux", "centos7", "ubuntu1604"])
@pytest.mark.schedulers(["sge", "slurm"])
@pytest.mark.usefixtures("os", "instance", "scheduler")
def test_efa(scheduler, pcluster_config_reader, clusters_factory, test_datadir):
@pytest.mark.usefixtures("os", "region")
def test_efa(scheduler, instance, pcluster_config_reader, clusters_factory, test_datadir):
"""
Test all EFA Features.
Grouped all tests in a single function so that cluster can be reused for all of them.
"""
scaledown_idletime = 3
max_queue_size = 5
cluster_config = pcluster_config_reader(scaledown_idletime=scaledown_idletime, max_queue_size=max_queue_size)
max_queue_size = 2
slots_per_instance = INSTANCES_TO_SLOTS_MAP[instance]
cluster_config = pcluster_config_reader(max_queue_size=max_queue_size)
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor)

_test_efa_installed(remote_command_executor)
_test_efa_mpi(remote_command_executor, scheduler, test_datadir)
_test_efa_installed(scheduler_commands, remote_command_executor)
_test_efa_mpi(remote_command_executor, scheduler_commands, test_datadir, slots_per_instance)
_test_osu_benchmarks(remote_command_executor, scheduler_commands, test_datadir, slots_per_instance)


def _test_efa_installed(remote_command_executor, scheduler):
def _test_efa_installed(scheduler_commands, remote_command_executor):
# Output contains:
# 00:06.0 Ethernet controller: Amazon.com, Inc. Device efa0
logging.info("Testing EFA Installed")
scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor)
result = scheduler_commands.submit_interactive_command("/sbin/lspci")
logging.info("Testing EFA installed")
result = scheduler_commands.submit_command("/sbin/lspci > /shared/lspci.out")

job_id = scheduler_commands.assert_job_submitted(result.stdout)
scheduler_commands.wait_job_completed(job_id)
scheduler_commands.assert_job_succeeded(job_id)

# Check EFA interface is present on compute node
result = remote_command_executor.run_remote_command("cat /shared/lspci.out")
assert_that(result.stdout).contains("00:06.0 Ethernet controller: Amazon.com, Inc. Device efa0")

# Check EFA interface not present on master
result = remote_command_executor.run_remote_command("/sbin/lspci")
assert_that(result.stdout).does_not_contain("00:06.0 Ethernet controller: Amazon.com, Inc. Device efa0")

def _test_efa_mpi(remote_command_executor, scheduler, test_datadir):
logging.info("Testing EFA Installed")

def _test_efa_mpi(remote_command_executor, scheduler_commands, test_datadir, slots_per_instance):
logging.info("Testing mpi job with EFA")
# Compile mpi script
result = remote_command_executor.run_remote_command(
remote_command_executor.run_remote_command(
"/opt/amazon/efa/bin/mpicc -o mpi_hello_world mpi_hello_world.c",
additional_files=[str(test_datadir / "mpi_hello_world.c")],
).stdout
logging.info(result)
)

# submit script using additional files
scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor)

result = scheduler_commands.submit_script(str(test_datadir / "{0}_submit.sh".format(scheduler)))
result = scheduler_commands.submit_script(str(test_datadir / "mpi_submit.sh"), slots=2 * slots_per_instance)
job_id = scheduler_commands.assert_job_submitted(result.stdout)
scheduler_commands.wait_job_completed(job_id)
scheduler_commands.assert_job_succeeded(job_id)

mpi_out = remote_command_executor.run_remote_command("cat /shared/mpi.out").stdout
assert_that(mpi_out.splitlines()).is_length(2)
assert_that(mpi_out).matches(r"Hello world from processor ip-.+, rank 0 out of 2 processors")
assert_that(mpi_out).matches(r"Hello world from processor ip-.+, rank 1 out of 2 processors")

def _test_osu_benchmarks(remote_command_executor, scheduler, test_datadir):
logging.info("Testing EFA Installed")
# Compile mpi script
result = remote_command_executor.run_remote_command(
"/bin/bash osu_benchmarks.sh", additional_files=[str(test_datadir / "osu_benchmarks.sh")]
).stdout
logging.info(result)

# submit script using additional files
scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor)
def _test_osu_benchmarks(remote_command_executor, scheduler_commands, test_datadir, slots_per_instance):
logging.info("Running OSU benchmarks")
remote_command_executor.run_remote_script(str(test_datadir / "init_osu_benchmarks.sh"), hide=True)

result = scheduler_commands.submit_script(str(test_datadir / "{0}_submit_osu_benchmarks.sh".format(scheduler)))
result = scheduler_commands.submit_script(str(test_datadir / "osu_submit.sh"), slots=2 * slots_per_instance)
job_id = scheduler_commands.assert_job_submitted(result.stdout)
scheduler_commands.wait_job_completed(job_id)
scheduler_commands.assert_job_succeeded(job_id)

# TODO: perform assertions on benchmarks results
remote_command_executor.run_remote_command("cat /shared/osu.out")
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env bash
set -e

cd /shared
wget http://mvapich.cse.ohio-state.edu/download/mvapich/osu-micro-benchmarks-5.4.tar.gz
tar zxvf ./osu-micro-benchmarks-5.4.tar.gz
cd osu-micro-benchmarks-5.4/
./configure CC=/opt/amazon/efa/bin/mpicc CXX=/opt/amazon/efa/bin/mpicxx
make
# make install in the submit script
# make install in the submit script
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ int main(int argc, char** argv) {

// Finalize the MPI environment. No more MPI calls can be made after this
MPI_Finalize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -e

module load openmpi
mpirun -N 1 -np 2 "mpi_hello_world" &> /shared/mpi.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -e

module load openmpi
mpirun --map-by ppr:1:node /shared/osu-micro-benchmarks-5.4/mpi/pt2pt/osu_latency &> /shared/osu.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ base_os = {{ os }}
key_name = {{ key_name }}
vpc_settings = parallelcluster-vpc
scheduler = {{ scheduler }}
master_instance_type = t2.micro
master_instance_type = c5.xlarge
compute_instance_type = {{ instance }}
initial_queue_size = 2
maintain_initial_size = true
max_queue_size = {{ max_queue_size }}
enable_efa = compute
placement_group = DYNAMIC

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit e997919

Please sign in to comment.