Skip to content

Commit

Permalink
integ tests: implement test to benchmark scaling performance
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco De Martino <fdm@amazon.com>
  • Loading branch information
demartinofra committed Jun 25, 2019
1 parent 95da11c commit 8213ae2
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 2 deletions.
27 changes: 27 additions & 0 deletions tests/integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,25 @@ python -m test_runner \
Keep in mind, the cluster you pass can have different `scheduler`, `os` or other features
than what is specified in the test. This can break the tests in unexpected ways. Be mindful.

### Benchmark and performance tests

Performance tests are disabled by default due to the high resource utilization involved with their execution.
In order to run performance tests you can use the following options:
* `--benchmarks`: run benchmarks tests. This disables the execution of all tests defined under the tests directory.
* `--benchmarks-target-capacity`: set the target capacity for benchmarks tests (default: 200).
* `--benchmarks-max-time`: set the max waiting time in minutes for benchmarks tests (default: 30).

The same filters by dimensions and features can be applied to this kind of tests.

The output produced by the performance tests is stored under the following directory tree:
```
tests_outputs
└── $timestamp..out
└── benchmarks: directory storing all cluster configs used by test
   ├── test_scaling_speed.py-test_scaling_speed[c5.xlarge-eu-west-1-centos7-slurm].png
└── ...
```

## Write Integration Tests

All integration tests are defined in the `integration-tests/tests` directory.
Expand Down Expand Up @@ -507,3 +526,11 @@ def vpc(cfn_stacks_factory):
cfn_stacks_factory.create_stack(stack)
return stack
```

### Benchmark and performance tests

Benchmark and performance tests follow the same rules described above for a normal integration test.
The only differences are the following:
- the tests are defined under the `benchmarks/` directory
- they are not executed by default with the rest of the integration tests
- they write their output to a specific benchmarks directory created in the output dir
11 changes: 11 additions & 0 deletions tests/integration-tests/benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file.
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
# See the License for the specific language governing permissions and limitations under the License.
245 changes: 245 additions & 0 deletions tests/integration-tests/benchmarks/test_scaling_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file.
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import datetime
import json
import logging
import os
from time import sleep

import boto3
import pytest
from retrying import RetryError, retry

from assertpy import assert_that
from remote_command_executor import RemoteCommandExecutor
from tests.common.assertions import assert_no_errors_in_logs
from tests.common.schedulers_common import get_scheduler_commands
from time_utils import minutes, seconds

METRIC_WIDGET_TEMPLATE = """
{{
"metrics": [
[ "ParallelCluster/benchmarking/{cluster_name}", "ComputeNodesCount", {{ "stat": "Maximum", "label": \
"ComputeNodesCount Max" }} ],
[ "...", {{ "stat": "Minimum", "label": "ComputeNodesCount Min" }} ],
[ "AWS/AutoScaling", "GroupDesiredCapacity", "AutoScalingGroupName", "{asg_name}", {{ "stat": "Maximum", \
"label": "GroupDesiredCapacity" }} ],
[ ".", "GroupInServiceInstances", ".", ".", {{ "stat": "Maximum", "label": "GroupInServiceInstances" }} ]
],
"view": "timeSeries",
"stacked": false,
"stat": "Maximum",
"period": 1,
"title": "{title}",
"width": 1400,
"height": 700,
"start": "{graph_start_time}",
"end": "{graph_end_time}",
"annotations": {{
"horizontal": [
{{
"label": "Scaling Target",
"value": {scaling_target}
}}
],
"vertical": [
{{
"label": "Start Time",
"value": "{start_time}"
}},
{{
"label": "End Time",
"value": "{end_time}"
}}
]
}},
"yAxis": {{
"left": {{
"showUnits": false,
"label": "Count"
}},
"right": {{
"showUnits": true
}}
}}
}}"""


@pytest.mark.schedulers(["slurm", "sge"])
@pytest.mark.benchmarks
def test_scaling_performance(region, scheduler, os, instance, pcluster_config_reader, clusters_factory, request):
"""The test runs benchmarks for the scaling logic."""
benchmarks_max_time = request.config.getoption("benchmarks_max_time")

benchmark_params = {
"region": region,
"scheduler": scheduler,
"os": os,
"instance": instance,
"scaling_target": request.config.getoption("benchmarks_target_capacity"),
"scaledown_idletime": 2,
"job_duration": 60,
}

cluster_config = pcluster_config_reader(
scaledown_idletime=benchmark_params["scaledown_idletime"], scaling_target=benchmark_params["scaling_target"]
)
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = get_scheduler_commands(scheduler, remote_command_executor)

_enable_asg_metrics(region, cluster)
logging.info("Starting benchmark with following parameters: %s", benchmark_params)
start_time = datetime.datetime.utcnow()
if scheduler == "sge":
kwargs = {"slots": _get_instance_vcpus(region, instance) * benchmark_params["scaling_target"]}
else:
kwargs = {"nodes": benchmark_params["scaling_target"]}
result = scheduler_commands.submit_command("sleep {0}".format(benchmark_params["job_duration"]), **kwargs)
scheduler_commands.assert_job_submitted(result.stdout)
compute_nodes_time_series, timestamps = _publish_compute_nodes_metric(
scheduler_commands,
max_monitoring_time=minutes(benchmarks_max_time),
region=region,
cluster_name=cluster.cfn_name,
)
end_time = timestamps[-1]

logging.info("Benchmark completed. Producing outputs and performing assertions.")
benchmark_params["total_time"] = "{0}seconds".format(int((end_time - start_time).total_seconds()))
_produce_benchmark_metrics_report(
benchmark_params,
region,
cluster.cfn_name,
cluster.asg,
start_time.replace(tzinfo=datetime.timezone.utc).isoformat(),
end_time.replace(tzinfo=datetime.timezone.utc).isoformat(),
benchmark_params["scaling_target"],
request,
)
assert_that(max(compute_nodes_time_series)).is_equal_to(benchmark_params["scaling_target"])
assert_that(compute_nodes_time_series[-1]).is_equal_to(0)
assert_no_errors_in_logs(remote_command_executor, ["/var/log/sqswatcher", "/var/log/jobwatcher"])


def _publish_compute_nodes_metric(scheduler_commands, max_monitoring_time, region, cluster_name):
logging.info("Monitoring scheduler status and publishing metrics")
cw_client = boto3.client("cloudwatch", region_name=region)
compute_nodes_time_series = [0]
timestamps = [datetime.datetime.utcnow()]

@retry(
retry_on_result=lambda _: len(compute_nodes_time_series) == 1 or compute_nodes_time_series[-1] != 0,
wait_fixed=seconds(20),
stop_max_delay=max_monitoring_time,
)
def _watch_compute_nodes_allocation():
try:
compute_nodes = scheduler_commands.compute_nodes_count()
logging.info("Publishing metric: count={0}".format(compute_nodes))
cw_client.put_metric_data(
Namespace="ParallelCluster/benchmarking/{cluster_name}".format(cluster_name=cluster_name),
MetricData=[{"MetricName": "ComputeNodesCount", "Value": compute_nodes, "Unit": "Count"}],
)
# add values only if there is a transition.
if compute_nodes_time_series[-1] != compute_nodes:
compute_nodes_time_series.append(compute_nodes)
timestamps.append(datetime.datetime.utcnow())
except Exception as e:
logging.warning("Failed while watching nodes allocation with exception: %s", e)
raise

try:
_watch_compute_nodes_allocation()
except RetryError:
# ignoring this error in order to perform assertions on the collected data.
pass

logging.info(
"Monitoring completed: %s", "compute_nodes_time_series [" + " ".join(map(str, compute_nodes_time_series)) + "]"
)
logging.info("Sleeping for 3 minutes to wait for the metrics to propagate...")
sleep(180)

return compute_nodes_time_series, timestamps


def _enable_asg_metrics(region, cluster):
logging.info("Enabling ASG metrics for %s", cluster.asg)
boto3.client("autoscaling", region_name=region).enable_metrics_collection(
AutoScalingGroupName=cluster.asg,
Metrics=["GroupDesiredCapacity", "GroupInServiceInstances", "GroupTerminatingInstances"],
Granularity="1Minute",
)


def _publish_metric(region, instance, os, scheduler, state, count):
cw_client = boto3.client("cloudwatch", region_name=region)
logging.info("Publishing metric: state={0} count={1}".format(state, count))
cw_client.put_metric_data(
Namespace="parallelcluster/benchmarking/test_scaling_speed/{region}/{instance}/{os}/{scheduler}".format(
region=region, instance=instance, os=os, scheduler=scheduler
),
MetricData=[
{
"MetricName": "ComputeNodesCount",
"Dimensions": [{"Name": "state", "Value": state}],
"Value": count,
"Unit": "Count",
}
],
)


def _produce_benchmark_metrics_report(
benchmark_params, region, cluster_name, asg_name, start_time, end_time, scaling_target, request
):
title = ", ".join("{0}={1}".format(key, val) for (key, val) in benchmark_params.items())
graph_start_time = _to_datetime(start_time) - datetime.timedelta(minutes=2)
graph_end_time = _to_datetime(end_time) + datetime.timedelta(minutes=2)
scaling_target = scaling_target
widget_metric = METRIC_WIDGET_TEMPLATE.format(
cluster_name=cluster_name,
asg_name=asg_name,
start_time=start_time,
end_time=end_time,
title=title,
graph_start_time=graph_start_time,
graph_end_time=graph_end_time,
scaling_target=scaling_target,
)
logging.info(widget_metric)
cw_client = boto3.client("cloudwatch", region_name=region)
response = cw_client.get_metric_widget_image(MetricWidget=widget_metric)
_write_results_to_outdir(request, response["MetricWidgetImage"])


def _to_datetime(timestamp):
return datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")


def _write_results_to_outdir(request, image_bytes):
out_dir = request.config.getoption("output_dir")
os.makedirs("{out_dir}/benchmarks".format(out_dir=out_dir), exist_ok=True)
graph_dst = "{out_dir}/benchmarks/{test_name}.png".format(
out_dir=out_dir, test_name=request.node.nodeid.replace("::", "-")
)
with open(graph_dst, "wb") as image:
image.write(image_bytes)


def _get_instance_vcpus(region, instance):
bucket_name = "{0}-aws-parallelcluster".format(region)
s3 = boto3.resource("s3", region_name=region)
instances_file_content = s3.Object(bucket_name, "instances/instances.json").get()["Body"].read()
instances = json.loads(instances_file_content)
return int(instances[instance]["vcpus"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[global]
cluster_template = default

[aws]
aws_region_name = {{ region }}

[cluster default]
base_os = {{ os }}
key_name = {{ key_name }}
vpc_settings = parallelcluster-vpc
scheduler = {{ scheduler }}
master_instance_type = {{ instance }}
compute_instance_type = {{ instance }}
initial_queue_size = 0
max_queue_size = {{ scaling_target + 10 }}
scaling_settings = custom

[scaling custom]
scaledown_idletime = {{ scaledown_idletime }}

[vpc parallelcluster-vpc]
vpc_id = {{ vpc_id }}
master_subnet_id = {{ public_subnet_id }}
compute_subnet_id = {{ private_subnet_id }}
2 changes: 2 additions & 0 deletions tests/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def pytest_addoption(parser):
parser.addoption(
"--no-delete", action="store_true", default=False, help="Don't delete stacks after tests are complete."
)
parser.addoption("--benchmarks-target-capacity", help="set the target capacity for benchmarks tests", type=int)
parser.addoption("--benchmarks-max-time", help="set the max waiting time in minutes for benchmarks tests", type=int)


def pytest_generate_tests(metafunc):
Expand Down
2 changes: 1 addition & 1 deletion tests/integration-tests/reports_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _put_metrics(cw_client, namespace, metrics, dimensions, timestamp):
time.sleep(put_metric_sleep_interval)

failures_errors = metrics["failures"] + metrics["errors"]
failure_rate = float(failures_errors) / metrics["total"] * 100
failure_rate = float(failures_errors) / metrics["total"] * 100 if metrics["total"] > 0 else 0
additional_metrics = [
{"name": "failures_errors", "value": failures_errors, "unit": "Count"},
{"name": "failure_rate", "value": failure_rate, "unit": "Percent"},
Expand Down
33 changes: 32 additions & 1 deletion tests/integration-tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
"vpc_stack": None,
"cluster": None,
"no_delete": False,
"benchmarks": False,
"benchmarks_target_capacity": 200,
"benchmarks_max_time": 30,
}


Expand Down Expand Up @@ -177,6 +180,24 @@ def _init_argparser():
help="Don't delete stacks after tests are complete.",
default=TEST_DEFAULTS.get("no_delete"),
)
parser.add_argument(
"--benchmarks",
help="run benchmarks tests. This disables the execution of all tests defined under the tests directory.",
action="store_true",
default=TEST_DEFAULTS.get("benchmarks"),
)
parser.add_argument(
"--benchmarks-target-capacity",
help="set the target capacity for benchmarks tests",
default=TEST_DEFAULTS.get("benchmarks_target_capacity"),
type=int,
)
parser.add_argument(
"--benchmarks-max-time",
help="set the max waiting time in minutes for benchmarks tests",
default=TEST_DEFAULTS.get("benchmarks_max_time"),
type=int,
)

return parser

Expand All @@ -188,7 +209,17 @@ def _is_file(value):


def _get_pytest_args(args, regions, log_file, out_dir):
pytest_args = ["-s", "-vv", "-l", "--rootdir=./tests"]
pytest_args = ["-s", "-vv", "-l"]

if args.benchmarks:
pytest_args.append("--ignore=./tests")
pytest_args.append("--root=./benchmarks")
pytest_args.append("--benchmarks-target-capacity={0}".format(args.benchmarks_target_capacity))
pytest_args.append("--benchmarks-max-time={0}".format(args.benchmarks_max_time))
else:
pytest_args.append("--root=./tests")
pytest_args.append("--ignore=./benchmarks")

# Show all tests durations
pytest_args.append("--durations=0")
# Run only tests with the given markers
Expand Down

0 comments on commit 8213ae2

Please sign in to comment.