From 2155542ca1bbd779ec661cd3b9202a2699d6baea Mon Sep 17 00:00:00 2001 From: Sean Smith Date: Mon, 9 Jul 2018 16:39:12 +0200 Subject: [PATCH] Scale Up Based on Nodes Requested Changes the scaling logic from a cloudwatch metric produced every two minutes to a cronjob on the node that directly sets the asg values. Signed-off-by: Sean Smith --- jobwatcher/__init__.py | 10 +++ jobwatcher/jobwatcher.cfg | 9 ++ jobwatcher/jobwatcher.py | 131 ++++++++++++++++++++++++++++++ jobwatcher/plugins/__init__.py | 10 +++ jobwatcher/plugins/run_command.py | 19 +++++ jobwatcher/plugins/sge.py | 36 ++++++++ jobwatcher/plugins/slurm.py | 28 +++++++ jobwatcher/plugins/test.py | 38 +++++++++ jobwatcher/plugins/torque.py | 30 +++++++ setup.py | 7 +- tests/test.sh | 3 +- 11 files changed, 317 insertions(+), 4 deletions(-) create mode 100644 jobwatcher/__init__.py create mode 100644 jobwatcher/jobwatcher.cfg create mode 100644 jobwatcher/jobwatcher.py create mode 100644 jobwatcher/plugins/__init__.py create mode 100644 jobwatcher/plugins/run_command.py create mode 100644 jobwatcher/plugins/sge.py create mode 100644 jobwatcher/plugins/slurm.py create mode 100644 jobwatcher/plugins/test.py create mode 100644 jobwatcher/plugins/torque.py diff --git a/jobwatcher/__init__.py b/jobwatcher/__init__.py new file mode 100644 index 000000000..1f9facc27 --- /dev/null +++ b/jobwatcher/__init__.py @@ -0,0 +1,10 @@ +# Copyright 2013-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. diff --git a/jobwatcher/jobwatcher.cfg b/jobwatcher/jobwatcher.cfg new file mode 100644 index 000000000..070a703d2 --- /dev/null +++ b/jobwatcher/jobwatcher.cfg @@ -0,0 +1,9 @@ +# Testing config file +# Create an ASG with name Test +[jobwatcher] +region = us-east-1 +asg_name = Test +scheduler = test +proxy = NONE +cfncluster_dir = /opt/cfncluster +compute_instance_type = c4.xlarge diff --git a/jobwatcher/jobwatcher.py b/jobwatcher/jobwatcher.py new file mode 100644 index 000000000..7a0bd0bed --- /dev/null +++ b/jobwatcher/jobwatcher.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python2.6 + +# Copyright 2013-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. + +__author__ = 'seaam' + +import ConfigParser +import boto3 +import os +import sys +import time +import logging +import json +from botocore.exceptions import ClientError +from botocore.config import Config + +log = logging.getLogger(__name__) +pricing_file = '/opt/cfncluster/instances.json' + +def loadSchedulerModule(scheduler): + scheduler = 'jobwatcher.plugins.' + scheduler + _scheduler = __import__(scheduler) + _scheduler = sys.modules[scheduler] + + log.debug("scheduler=%s" % repr(_scheduler)) + + return _scheduler + +def get_instance_properties(instance_type): + with open(pricing_file) as f: + instances = json.load(f) + try: + slots = int(instances[instance_type]["vcpus"]) + log.info("Instance %s has %s slots." % (instance_type, slots)) + return {'slots': slots} + except KeyError as e: + log.error("Instance %s not found in file %s." % (instance_type, pricing_file)) + exit(1) + +def fetch_pricing_file(proxy_config, cfncluster_dir, region): + s3 = boto3.resource('s3', region_name=region, config=proxy_config) + try: + if not os.path.exists(cfncluster_dir): + os.makedirs(cfncluster_dir) + except OSError as ex: + log.critical('Could not create directory %s. Failed with exception: %s' % (cfncluster_dir, ex)) + raise + bucket_name = '%s-cfncluster' % region + try: + bucket = s3.Bucket(bucket_name) + bucket.download_file('instances/instances.json', '%s/instances.json' % cfncluster_dir) + except ClientError as e: + log.critical("Could not save instance mapping file %s from S3 bucket %s. Failed with exception: %s" % (cfncluster_file, bucket_name, e)) + raise + +def main(): + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s' + ) + + config = ConfigParser.RawConfigParser() + config.read('jobwatcher.cfg') + if config.has_option('jobwatcher', 'loglevel'): + lvl = logging._levelNames[config.get('jobwatcher', 'loglevel')] + logging.getLogger().setLevel(lvl) + region = config.get('jobwatcher', 'region') + asg_name = config.get('jobwatcher', 'asg_name') + scheduler = config.get('jobwatcher', 'scheduler') + instance_type = config.get('jobwatcher', 'compute_instance_type') + cfncluster_dir = config.get('jobwatcher', 'cfncluster_dir') + _proxy = config.get('jobwatcher', 'proxy') + proxy_config = Config() + + if not _proxy == "NONE": + proxy_config = Config(proxies={'https': _proxy}) + + # fetch the pricing file on startup + fetch_pricing_file(proxy_config, cfncluster_dir, region) + + # load scheduler + s = loadSchedulerModule(scheduler) + + while True: + # get the number of vcpu's per compute instance + instance_properties = get_instance_properties(instance_type) + + # Get number of nodes requested + pending = s.get_required_nodes(instance_properties) + + # Get number of nodes currently + running = s.get_busy_nodes(instance_properties) + + # connect to asg + asg_conn = boto3.client('autoscaling', region_name=region) + + # get current limits + asg = asg_conn.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name])\ + .get('AutoScalingGroups')[0] + + min = asg.get('MinSize') + current_desired = asg.get('DesiredCapacity') + max = asg.get('MaxSize') + log.info("min/desired/max %d/%d/%d" % (min, current_desired, max)) + log.info("Nodes requested %d, Nodes running %d" % (pending, running)) + + # check to make sure it's in limits + desired = running + pending + if desired > max: + log.info("%d requested nodes is greater than max %d. Requesting max %d." % (desired, max, max)) + asg_conn.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=max) + elif desired <= current_desired: + log.info("%d nodes desired %d nodes in asg. Noop" % (desired, current_desired)) + else: + log.info("Setting desired to %d nodes, requesting %d more nodes from asg." % (desired, desired - current_desired)) + asg_conn.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=desired) + + time.sleep(60) + + +if __name__ == '__main__': + main() diff --git a/jobwatcher/plugins/__init__.py b/jobwatcher/plugins/__init__.py new file mode 100644 index 000000000..1f9facc27 --- /dev/null +++ b/jobwatcher/plugins/__init__.py @@ -0,0 +1,10 @@ +# Copyright 2013-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. diff --git a/jobwatcher/plugins/run_command.py b/jobwatcher/plugins/run_command.py new file mode 100644 index 000000000..2330d12db --- /dev/null +++ b/jobwatcher/plugins/run_command.py @@ -0,0 +1,19 @@ +import shlex +import subprocess as sub +import os + +def run_command(command): + _command = shlex.split(command) + try: + DEV_NULL = open(os.devnull, "rb") + process = sub.Popen(_command, env=dict(os.environ), stdout=sub.PIPE, stderr=sub.STDOUT, stdin=DEV_NULL) + _output = process.communicate()[0] + exitcode = process.poll() + if exitcode != 0: + log.error("Failed to run %s:\n%s" % (_command, _output)) + return _output + except sub.CalledProcessError: + log.error("Failed to run %s\n" % _command) + exit(1) + finally: + DEV_NULL.close() diff --git a/jobwatcher/plugins/sge.py b/jobwatcher/plugins/sge.py new file mode 100644 index 000000000..bc398f0c4 --- /dev/null +++ b/jobwatcher/plugins/sge.py @@ -0,0 +1,36 @@ +import logging +import json +from run_command import run_command + +log = logging.getLogger(__name__) + +# get nodes requested from pending jobs +def get_required_nodes(instance_properties): + command = "/opt/sge/bin/lx-amd64/qstat -g d -s p -u '*'" + _output = run_command(command) + slots = 0 + output = _output.split("\n")[2:] + for line in output: + line_arr = line.split() + if len(line_arr) >= 8: + slots += int(line_arr[7]) + vcpus = instance_properties.get('slots') + return -(-slots // vcpus) + +# get nodes reserved by running jobs +# if a host has 1 or more job running on it, it'll be marked busy +def get_busy_nodes(instance_properties): + command = "/opt/sge/bin/lx-amd64/qstat -f" + _output = run_command(command) + nodes = 0 + output = _output.split("\n")[2:] + for line in output: + line_arr = line.split() + if len(line_arr) == 5: + # resv/used/tot. + (resv, used, total) = line_arr[2].split('/') + if int(used) > 0 or int(resv) > 0: + nodes += 1 + return nodes + +print get_required_nodes({"slots": 4}) diff --git a/jobwatcher/plugins/slurm.py b/jobwatcher/plugins/slurm.py new file mode 100644 index 000000000..d4704b0e5 --- /dev/null +++ b/jobwatcher/plugins/slurm.py @@ -0,0 +1,28 @@ +import logging +from run_command import run_command + +log = logging.getLogger(__name__) + +# get nodes requested from pending jobs +def get_required_nodes(instance_properties): + command = "/opt/slurm/bin/squeue -r -h -o '%t %D'" + _output = run_command(command) + nodes = 0 + output = _output.split("\n") + for line in output: + line_arr = line.split() + if len(line_arr) == 2 and line_arr[0] == 'PD': + nodes += int(line_arr[1]) + return nodes + +# get nodes reserved by running jobs +def get_busy_nodes(instance_properties): + command = "/opt/slurm/bin/squeue -r -h -o '%t %D'" + _output = run_command(command) + nodes = 0 + output = _output.split("\n") + for line in output: + line_arr = line.split() + if len(line_arr) == 2 and line_arr[0] == 'R': + nodes += int(line_arr[1]) + return nodes diff --git a/jobwatcher/plugins/test.py b/jobwatcher/plugins/test.py new file mode 100644 index 000000000..92235b9de --- /dev/null +++ b/jobwatcher/plugins/test.py @@ -0,0 +1,38 @@ +# Copyright 2013-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging + +log = logging.getLogger(__name__) + +# get nodes requested from pending jobs +def get_required_nodes(instance_properties): + # Test function. Change as needed. + slots = 4 + vcpus = instance_properties.get('slots') + return -(-slots // vcpus) + +# get nodes reserved by running jobs +def get_busy_nodes(instance_properties): + # Test function. Change as needed. + slots = 13 + vcpus = instance_properties.get('slots') + return -(-slots // vcpus) + +def nodes(slots, instance_properties): + if slots <= 0: + return 0 + with open('/opt/cfncluster/instances.json') as f: + instances = json.load(f) + vcpus = int(instances[instance_type]["vcpus"]) + log.info("Instance %s has %s slots." % (instance_type, vcpus)) + return diff --git a/jobwatcher/plugins/torque.py b/jobwatcher/plugins/torque.py new file mode 100644 index 000000000..0a879a0d6 --- /dev/null +++ b/jobwatcher/plugins/torque.py @@ -0,0 +1,30 @@ +import logging +from run_command import run_command + +log = logging.getLogger(__name__) + +# get nodes requested from pending jobs +def get_required_nodes(instance_properties): + command = "/opt/torque/bin/qstat -a" + status = ['C', 'Q'] + _output = run_command(command) + output = _output.split("\n")[5:] + nodes = 0 + for line in output: + line_arr = line.split() + if len(line_arr) >= 10 and line_arr[9] in status: + nodes += int(line_arr[5]) + return nodes + +# get nodes reserved by running jobs +def get_busy_nodes(instance_properties): + command = "/opt/torque/bin/qstat -a" + status = ['R'] + _output = run_command(command) + output = _output.split("\n")[5:] + nodes = 0 + for line in output: + line_arr = line.split() + if len(line_arr) >= 10 and line_arr[9] in status: + nodes += int(line_arr[5]) + return nodes diff --git a/setup.py b/setup.py index ce46e574a..2755711e6 100644 --- a/setup.py +++ b/setup.py @@ -20,9 +20,10 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() console_scripts = ['sqswatcher = sqswatcher.sqswatcher:main', - 'nodewatcher = nodewatcher.nodewatcher:main'] -version = "1.5.2rc2" -requires = ['boto3>=1.7.55', 'python-dateutil>=2.6.1'] + 'nodewatcher = nodewatcher.nodewatcher:main', + 'jobwatcher = jobwatcher.jobwatcher:main'] +version = "1.5.2" +requires = ['boto3>=1.7.52', 'boto>=2.48.0', 'python-dateutil>=2.6.1'] if sys.version_info[:2] == (2, 6): # For python2.6 we have to require argparse since it diff --git a/tests/test.sh b/tests/test.sh index 044aa6e07..22340d225 100644 --- a/tests/test.sh +++ b/tests/test.sh @@ -4,4 +4,5 @@ set -x echo $PATH which nodewatcher -which sqswatcher \ No newline at end of file +which sqswatcher +which jobwatcher