diff --git a/tests/integration-tests/README.md b/tests/integration-tests/README.md index 431f38e020..4faee711c7 100644 --- a/tests/integration-tests/README.md +++ b/tests/integration-tests/README.md @@ -174,6 +174,25 @@ python -m test_runner \ Keep in mind, the cluster you pass can have different `scheduler`, `os` or other features than what is specified in the test. This can break the tests in unexpected ways. Be mindful. +### Benchmark and performance tests + +Performance tests are disabled by default due to the high resource utilization involved with their execution. +In order to run performance tests you can use the following options: +* `--benchmarks`: run benchmarks tests. This disables the execution of all tests defined under the tests directory. +* `--benchmarks-target-capacity`: set the target capacity for benchmarks tests (default: 200). +* `--benchmarks-max-time`: set the max waiting time in minutes for benchmarks tests (default: 30). + +The same filters by dimensions and features can be applied to this kind of tests. + +The output produced by the performance tests is stored under the following directory tree: +``` +tests_outputs +└── $timestamp..out + └── benchmarks: directory storing all cluster configs used by test +    ├── test_scaling_speed.py-test_scaling_speed[c5.xlarge-eu-west-1-centos7-slurm].png + └── ... +``` + ## Write Integration Tests All integration tests are defined in the `integration-tests/tests` directory. @@ -507,3 +526,11 @@ def vpc(cfn_stacks_factory): cfn_stacks_factory.create_stack(stack) return stack ``` + +### Benchmark and performance tests + +Benchmark and performance tests follow the same rules described above for a normal integration test. +The only differences are the following: +- the tests are defined under the `benchmarks/` directory +- they are not executed by default with the rest of the integration tests +- they write their output to a specific benchmarks directory created in the output dir diff --git a/tests/integration-tests/benchmarks/__init__.py b/tests/integration-tests/benchmarks/__init__.py new file mode 100644 index 0000000000..2251b11f46 --- /dev/null +++ b/tests/integration-tests/benchmarks/__init__.py @@ -0,0 +1,11 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. +# See the License for the specific language governing permissions and limitations under the License. diff --git a/tests/integration-tests/benchmarks/test_scaling_performance.py b/tests/integration-tests/benchmarks/test_scaling_performance.py new file mode 100644 index 0000000000..59fcd00fbe --- /dev/null +++ b/tests/integration-tests/benchmarks/test_scaling_performance.py @@ -0,0 +1,245 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import datetime +import json +import logging +import os +from time import sleep + +import boto3 +import pytest +from retrying import RetryError, retry + +from assertpy import assert_that +from remote_command_executor import RemoteCommandExecutor +from tests.common.assertions import assert_no_errors_in_logs +from tests.common.schedulers_common import get_scheduler_commands +from time_utils import minutes, seconds + +METRIC_WIDGET_TEMPLATE = """ + {{ + "metrics": [ + [ "ParallelCluster/benchmarking/{cluster_name}", "ComputeNodesCount", {{ "stat": "Maximum", "label": \ +"ComputeNodesCount Max" }} ], + [ "...", {{ "stat": "Minimum", "label": "ComputeNodesCount Min" }} ], + [ "AWS/AutoScaling", "GroupDesiredCapacity", "AutoScalingGroupName", "{asg_name}", {{ "stat": "Maximum", \ +"label": "GroupDesiredCapacity" }} ], + [ ".", "GroupInServiceInstances", ".", ".", {{ "stat": "Maximum", "label": "GroupInServiceInstances" }} ] + ], + "view": "timeSeries", + "stacked": false, + "stat": "Maximum", + "period": 1, + "title": "{title}", + "width": 1400, + "height": 700, + "start": "{graph_start_time}", + "end": "{graph_end_time}", + "annotations": {{ + "horizontal": [ + {{ + "label": "Scaling Target", + "value": {scaling_target} + }} + ], + "vertical": [ + {{ + "label": "Start Time", + "value": "{start_time}" + }}, + {{ + "label": "End Time", + "value": "{end_time}" + }} + ] + }}, + "yAxis": {{ + "left": {{ + "showUnits": false, + "label": "Count" + }}, + "right": {{ + "showUnits": true + }} + }} + }}""" + + +@pytest.mark.schedulers(["slurm", "sge"]) +@pytest.mark.benchmarks +def test_scaling_performance(region, scheduler, os, instance, pcluster_config_reader, clusters_factory, request): + """The test runs benchmarks for the scaling logic.""" + benchmarks_max_time = request.config.getoption("benchmarks_max_time") + + benchmark_params = { + "region": region, + "scheduler": scheduler, + "os": os, + "instance": instance, + "scaling_target": request.config.getoption("benchmarks_target_capacity"), + "scaledown_idletime": 2, + "job_duration": 60, + } + + cluster_config = pcluster_config_reader( + scaledown_idletime=benchmark_params["scaledown_idletime"], scaling_target=benchmark_params["scaling_target"] + ) + cluster = clusters_factory(cluster_config) + remote_command_executor = RemoteCommandExecutor(cluster) + scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor) + + _enable_asg_metrics(region, cluster) + logging.info("Starting benchmark with following parameters: %s", benchmark_params) + start_time = datetime.datetime.utcnow() + if scheduler == "sge": + kwargs = {"slots": _get_instance_vcpus(region, instance) * benchmark_params["scaling_target"]} + else: + kwargs = {"nodes": benchmark_params["scaling_target"]} + result = scheduler_commands.submit_command("sleep {0}".format(benchmark_params["job_duration"]), **kwargs) + scheduler_commands.assert_job_submitted(result.stdout) + compute_nodes_time_series, timestamps = _publish_compute_nodes_metric( + scheduler_commands, + max_monitoring_time=minutes(benchmarks_max_time), + region=region, + cluster_name=cluster.cfn_name, + ) + end_time = timestamps[-1] + + logging.info("Benchmark completed. Producing outputs and performing assertions.") + benchmark_params["total_time"] = "{0}seconds".format(int((end_time - start_time).total_seconds())) + _produce_benchmark_metrics_report( + benchmark_params, + region, + cluster.cfn_name, + cluster.asg, + start_time.replace(tzinfo=datetime.timezone.utc).isoformat(), + end_time.replace(tzinfo=datetime.timezone.utc).isoformat(), + benchmark_params["scaling_target"], + request, + ) + assert_that(max(compute_nodes_time_series)).is_equal_to(benchmark_params["scaling_target"]) + assert_that(compute_nodes_time_series[-1]).is_equal_to(0) + assert_no_errors_in_logs(remote_command_executor, ["/var/log/sqswatcher", "/var/log/jobwatcher"]) + + +def _publish_compute_nodes_metric(scheduler_commands, max_monitoring_time, region, cluster_name): + logging.info("Monitoring scheduler status and publishing metrics") + cw_client = boto3.client("cloudwatch", region_name=region) + compute_nodes_time_series = [0] + timestamps = [datetime.datetime.utcnow()] + + @retry( + retry_on_result=lambda _: len(compute_nodes_time_series) == 1 or compute_nodes_time_series[-1] != 0, + wait_fixed=seconds(20), + stop_max_delay=max_monitoring_time, + ) + def _watch_compute_nodes_allocation(): + try: + compute_nodes = scheduler_commands.compute_nodes_count() + logging.info("Publishing metric: count={0}".format(compute_nodes)) + cw_client.put_metric_data( + Namespace="ParallelCluster/benchmarking/{cluster_name}".format(cluster_name=cluster_name), + MetricData=[{"MetricName": "ComputeNodesCount", "Value": compute_nodes, "Unit": "Count"}], + ) + # add values only if there is a transition. + if compute_nodes_time_series[-1] != compute_nodes: + compute_nodes_time_series.append(compute_nodes) + timestamps.append(datetime.datetime.utcnow()) + except Exception as e: + logging.warning("Failed while watching nodes allocation with exception: %s", e) + raise + + try: + _watch_compute_nodes_allocation() + except RetryError: + # ignoring this error in order to perform assertions on the collected data. + pass + + logging.info( + "Monitoring completed: %s", "compute_nodes_time_series [" + " ".join(map(str, compute_nodes_time_series)) + "]" + ) + logging.info("Sleeping for 3 minutes to wait for the metrics to propagate...") + sleep(180) + + return compute_nodes_time_series, timestamps + + +def _enable_asg_metrics(region, cluster): + logging.info("Enabling ASG metrics for %s", cluster.asg) + boto3.client("autoscaling", region_name=region).enable_metrics_collection( + AutoScalingGroupName=cluster.asg, + Metrics=["GroupDesiredCapacity", "GroupInServiceInstances", "GroupTerminatingInstances"], + Granularity="1Minute", + ) + + +def _publish_metric(region, instance, os, scheduler, state, count): + cw_client = boto3.client("cloudwatch", region_name=region) + logging.info("Publishing metric: state={0} count={1}".format(state, count)) + cw_client.put_metric_data( + Namespace="parallelcluster/benchmarking/test_scaling_speed/{region}/{instance}/{os}/{scheduler}".format( + region=region, instance=instance, os=os, scheduler=scheduler + ), + MetricData=[ + { + "MetricName": "ComputeNodesCount", + "Dimensions": [{"Name": "state", "Value": state}], + "Value": count, + "Unit": "Count", + } + ], + ) + + +def _produce_benchmark_metrics_report( + benchmark_params, region, cluster_name, asg_name, start_time, end_time, scaling_target, request +): + title = ", ".join("{0}={1}".format(key, val) for (key, val) in benchmark_params.items()) + graph_start_time = _to_datetime(start_time) - datetime.timedelta(minutes=2) + graph_end_time = _to_datetime(end_time) + datetime.timedelta(minutes=2) + scaling_target = scaling_target + widget_metric = METRIC_WIDGET_TEMPLATE.format( + cluster_name=cluster_name, + asg_name=asg_name, + start_time=start_time, + end_time=end_time, + title=title, + graph_start_time=graph_start_time, + graph_end_time=graph_end_time, + scaling_target=scaling_target, + ) + logging.info(widget_metric) + cw_client = boto3.client("cloudwatch", region_name=region) + response = cw_client.get_metric_widget_image(MetricWidget=widget_metric) + _write_results_to_outdir(request, response["MetricWidgetImage"]) + + +def _to_datetime(timestamp): + return datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") + + +def _write_results_to_outdir(request, image_bytes): + out_dir = request.config.getoption("output_dir") + os.makedirs("{out_dir}/benchmarks".format(out_dir=out_dir), exist_ok=True) + graph_dst = "{out_dir}/benchmarks/{test_name}.png".format( + out_dir=out_dir, test_name=request.node.nodeid.replace("::", "-") + ) + with open(graph_dst, "wb") as image: + image.write(image_bytes) + + +def _get_instance_vcpus(region, instance): + bucket_name = "{0}-aws-parallelcluster".format(region) + s3 = boto3.resource("s3", region_name=region) + instances_file_content = s3.Object(bucket_name, "instances/instances.json").get()["Body"].read() + instances = json.loads(instances_file_content) + return int(instances[instance]["vcpus"]) diff --git a/tests/integration-tests/benchmarks/test_scaling_performance/test_scaling_performance/pcluster.config.ini b/tests/integration-tests/benchmarks/test_scaling_performance/test_scaling_performance/pcluster.config.ini new file mode 100644 index 0000000000..83f01f3ed6 --- /dev/null +++ b/tests/integration-tests/benchmarks/test_scaling_performance/test_scaling_performance/pcluster.config.ini @@ -0,0 +1,24 @@ +[global] +cluster_template = default + +[aws] +aws_region_name = {{ region }} + +[cluster default] +base_os = {{ os }} +key_name = {{ key_name }} +vpc_settings = parallelcluster-vpc +scheduler = {{ scheduler }} +master_instance_type = {{ instance }} +compute_instance_type = {{ instance }} +initial_queue_size = 0 +max_queue_size = {{ scaling_target + 10 }} +scaling_settings = custom + +[scaling custom] +scaledown_idletime = {{ scaledown_idletime }} + +[vpc parallelcluster-vpc] +vpc_id = {{ vpc_id }} +master_subnet_id = {{ public_subnet_id }} +compute_subnet_id = {{ private_subnet_id }} diff --git a/tests/integration-tests/conftest.py b/tests/integration-tests/conftest.py index 55c7f66bd4..975b1794f6 100644 --- a/tests/integration-tests/conftest.py +++ b/tests/integration-tests/conftest.py @@ -61,6 +61,8 @@ def pytest_addoption(parser): parser.addoption( "--no-delete", action="store_true", default=False, help="Don't delete stacks after tests are complete." ) + parser.addoption("--benchmarks-target-capacity", help="set the target capacity for benchmarks tests", type=int) + parser.addoption("--benchmarks-max-time", help="set the max waiting time in minutes for benchmarks tests", type=int) def pytest_generate_tests(metafunc): diff --git a/tests/integration-tests/reports_generator.py b/tests/integration-tests/reports_generator.py index e25020b0b4..69aa37d7e2 100644 --- a/tests/integration-tests/reports_generator.py +++ b/tests/integration-tests/reports_generator.py @@ -125,7 +125,7 @@ def _put_metrics(cw_client, namespace, metrics, dimensions, timestamp): time.sleep(put_metric_sleep_interval) failures_errors = metrics["failures"] + metrics["errors"] - failure_rate = float(failures_errors) / metrics["total"] * 100 + failure_rate = float(failures_errors) / metrics["total"] * 100 if metrics["total"] > 0 else 0 additional_metrics = [ {"name": "failures_errors", "value": failures_errors, "unit": "Count"}, {"name": "failure_rate", "value": failure_rate, "unit": "Percent"}, diff --git a/tests/integration-tests/test_runner.py b/tests/integration-tests/test_runner.py index 778a3f31d0..edf292d50c 100644 --- a/tests/integration-tests/test_runner.py +++ b/tests/integration-tests/test_runner.py @@ -69,6 +69,9 @@ "vpc_stack": None, "cluster": None, "no_delete": False, + "benchmarks": False, + "benchmarks_target_capacity": 200, + "benchmarks_max_time": 30, } @@ -177,6 +180,24 @@ def _init_argparser(): help="Don't delete stacks after tests are complete.", default=TEST_DEFAULTS.get("no_delete"), ) + parser.add_argument( + "--benchmarks", + help="run benchmarks tests. This disables the execution of all tests defined under the tests directory.", + action="store_true", + default=TEST_DEFAULTS.get("benchmarks"), + ) + parser.add_argument( + "--benchmarks-target-capacity", + help="set the target capacity for benchmarks tests", + default=TEST_DEFAULTS.get("benchmarks_target_capacity"), + type=int, + ) + parser.add_argument( + "--benchmarks-max-time", + help="set the max waiting time in minutes for benchmarks tests", + default=TEST_DEFAULTS.get("benchmarks_max_time"), + type=int, + ) return parser @@ -188,7 +209,17 @@ def _is_file(value): def _get_pytest_args(args, regions, log_file, out_dir): - pytest_args = ["-s", "-vv", "-l", "--rootdir=./tests"] + pytest_args = ["-s", "-vv", "-l"] + + if args.benchmarks: + pytest_args.append("--ignore=./tests") + pytest_args.append("--root=./benchmarks") + pytest_args.append("--benchmarks-target-capacity={0}".format(args.benchmarks_target_capacity)) + pytest_args.append("--benchmarks-max-time={0}".format(args.benchmarks_max_time)) + else: + pytest_args.append("--root=./tests") + pytest_args.append("--ignore=./benchmarks") + # Show all tests durations pytest_args.append("--durations=0") # Run only tests with the given markers