Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions jobwatcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright 2013-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the
# License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
9 changes: 9 additions & 0 deletions jobwatcher/jobwatcher.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Testing config file
# Create an ASG with name Test
[jobwatcher]
region = us-east-1
asg_name = Test
scheduler = test
proxy = NONE
cfncluster_dir = /opt/cfncluster
compute_instance_type = c4.xlarge
131 changes: 131 additions & 0 deletions jobwatcher/jobwatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python2.6

# Copyright 2013-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the
# License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.

__author__ = 'seaam'

import ConfigParser
import boto3
import os
import sys
import time
import logging
import json
from botocore.exceptions import ClientError
from botocore.config import Config

log = logging.getLogger(__name__)
pricing_file = '/opt/cfncluster/instances.json'

def loadSchedulerModule(scheduler):
scheduler = 'jobwatcher.plugins.' + scheduler
_scheduler = __import__(scheduler)
_scheduler = sys.modules[scheduler]

log.debug("scheduler=%s" % repr(_scheduler))

return _scheduler

def get_instance_properties(instance_type):
with open(pricing_file) as f:
instances = json.load(f)
try:
slots = int(instances[instance_type]["vcpus"])
log.info("Instance %s has %s slots." % (instance_type, slots))
return {'slots': slots}
except KeyError as e:
log.error("Instance %s not found in file %s." % (instance_type, pricing_file))
exit(1)

def fetch_pricing_file(proxy_config, cfncluster_dir, region):
s3 = boto3.resource('s3', region_name=region, config=proxy_config)
try:
if not os.path.exists(cfncluster_dir):
os.makedirs(cfncluster_dir)
except OSError as ex:
log.critical('Could not create directory %s. Failed with exception: %s' % (cfncluster_dir, ex))
raise
bucket_name = '%s-cfncluster' % region
try:
bucket = s3.Bucket(bucket_name)
bucket.download_file('instances/instances.json', '%s/instances.json' % cfncluster_dir)
except ClientError as e:
log.critical("Could not save instance mapping file %s from S3 bucket %s. Failed with exception: %s" % (cfncluster_file, bucket_name, e))
raise

def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s'
)

config = ConfigParser.RawConfigParser()
config.read('jobwatcher.cfg')
if config.has_option('jobwatcher', 'loglevel'):
lvl = logging._levelNames[config.get('jobwatcher', 'loglevel')]
logging.getLogger().setLevel(lvl)
region = config.get('jobwatcher', 'region')
asg_name = config.get('jobwatcher', 'asg_name')
scheduler = config.get('jobwatcher', 'scheduler')
instance_type = config.get('jobwatcher', 'compute_instance_type')
cfncluster_dir = config.get('jobwatcher', 'cfncluster_dir')
_proxy = config.get('jobwatcher', 'proxy')
proxy_config = Config()

if not _proxy == "NONE":
proxy_config = Config(proxies={'https': _proxy})

# fetch the pricing file on startup
fetch_pricing_file(proxy_config, cfncluster_dir, region)

# load scheduler
s = loadSchedulerModule(scheduler)

while True:
# get the number of vcpu's per compute instance
instance_properties = get_instance_properties(instance_type)

# Get number of nodes requested
pending = s.get_required_nodes(instance_properties)

# Get number of nodes currently
running = s.get_busy_nodes(instance_properties)

# connect to asg
asg_conn = boto3.client('autoscaling', region_name=region)

# get current limits
asg = asg_conn.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name])\
.get('AutoScalingGroups')[0]

min = asg.get('MinSize')
current_desired = asg.get('DesiredCapacity')
max = asg.get('MaxSize')
log.info("min/desired/max %d/%d/%d" % (min, current_desired, max))
log.info("Nodes requested %d, Nodes running %d" % (pending, running))

# check to make sure it's in limits
desired = running + pending
if desired > max:
log.info("%d requested nodes is greater than max %d. Requesting max %d." % (desired, max, max))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is worth to be a warning level message since we are touching the max limit and some of the pending jobs are not going to be served in the next scale up round..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is normal behaviour. I wouldn't expect it to be a warning.

asg_conn.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=max)
elif desired <= current_desired:
log.info("%d nodes desired %d nodes in asg. Noop" % (desired, current_desired))
else:
log.info("Setting desired to %d nodes, requesting %d more nodes from asg." % (desired, desired - current_desired))
asg_conn.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=desired)

time.sleep(60)


if __name__ == '__main__':
main()
10 changes: 10 additions & 0 deletions jobwatcher/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright 2013-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the
# License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
19 changes: 19 additions & 0 deletions jobwatcher/plugins/run_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import shlex
import subprocess as sub
import os

def run_command(command):
_command = shlex.split(command)
try:
DEV_NULL = open(os.devnull, "rb")
process = sub.Popen(_command, env=dict(os.environ), stdout=sub.PIPE, stderr=sub.STDOUT, stdin=DEV_NULL)
_output = process.communicate()[0]
exitcode = process.poll()
if exitcode != 0:
log.error("Failed to run %s:\n%s" % (_command, _output))
return _output
except sub.CalledProcessError:
log.error("Failed to run %s\n" % _command)
exit(1)
finally:
DEV_NULL.close()
36 changes: 36 additions & 0 deletions jobwatcher/plugins/sge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
import json
from run_command import run_command

log = logging.getLogger(__name__)

# get nodes requested from pending jobs
def get_required_nodes(instance_properties):
command = "/opt/sge/bin/lx-amd64/qstat -g d -s p -u '*'"
_output = run_command(command)
slots = 0
output = _output.split("\n")[2:]
for line in output:
line_arr = line.split()
if len(line_arr) >= 8:
slots += int(line_arr[7])
vcpus = instance_properties.get('slots')
return -(-slots // vcpus)

# get nodes reserved by running jobs
# if a host has 1 or more job running on it, it'll be marked busy
def get_busy_nodes(instance_properties):
command = "/opt/sge/bin/lx-amd64/qstat -f"
_output = run_command(command)
nodes = 0
output = _output.split("\n")[2:]
for line in output:
line_arr = line.split()
if len(line_arr) == 5:
# resv/used/tot.
(resv, used, total) = line_arr[2].split('/')
if int(used) > 0 or int(resv) > 0:
nodes += 1
return nodes

print get_required_nodes({"slots": 4})
28 changes: 28 additions & 0 deletions jobwatcher/plugins/slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
from run_command import run_command

log = logging.getLogger(__name__)

# get nodes requested from pending jobs
def get_required_nodes(instance_properties):
command = "/opt/slurm/bin/squeue -r -h -o '%t %D'"
_output = run_command(command)
nodes = 0
output = _output.split("\n")
for line in output:
line_arr = line.split()
if len(line_arr) == 2 and line_arr[0] == 'PD':
nodes += int(line_arr[1])
return nodes

# get nodes reserved by running jobs
def get_busy_nodes(instance_properties):
command = "/opt/slurm/bin/squeue -r -h -o '%t %D'"
_output = run_command(command)
nodes = 0
output = _output.split("\n")
for line in output:
line_arr = line.split()
if len(line_arr) == 2 and line_arr[0] == 'R':
nodes += int(line_arr[1])
return nodes
38 changes: 38 additions & 0 deletions jobwatcher/plugins/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2013-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the
# License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging

log = logging.getLogger(__name__)

# get nodes requested from pending jobs
def get_required_nodes(instance_properties):
# Test function. Change as needed.
slots = 4
vcpus = instance_properties.get('slots')
return -(-slots // vcpus)

# get nodes reserved by running jobs
def get_busy_nodes(instance_properties):
# Test function. Change as needed.
slots = 13
vcpus = instance_properties.get('slots')
return -(-slots // vcpus)

def nodes(slots, instance_properties):
if slots <= 0:
return 0
with open('/opt/cfncluster/instances.json') as f:
instances = json.load(f)
vcpus = int(instances[instance_type]["vcpus"])
log.info("Instance %s has %s slots." % (instance_type, vcpus))
return
30 changes: 30 additions & 0 deletions jobwatcher/plugins/torque.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
from run_command import run_command

log = logging.getLogger(__name__)

# get nodes requested from pending jobs
def get_required_nodes(instance_properties):
command = "/opt/torque/bin/qstat -a"
status = ['C', 'Q']
_output = run_command(command)
output = _output.split("\n")[5:]
nodes = 0
for line in output:
line_arr = line.split()
if len(line_arr) >= 10 and line_arr[9] in status:
nodes += int(line_arr[5])
return nodes

# get nodes reserved by running jobs
def get_busy_nodes(instance_properties):
command = "/opt/torque/bin/qstat -a"
status = ['R']
_output = run_command(command)
output = _output.split("\n")[5:]
nodes = 0
for line in output:
line_arr = line.split()
if len(line_arr) >= 10 and line_arr[9] in status:
nodes += int(line_arr[5])
return nodes
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()

console_scripts = ['sqswatcher = sqswatcher.sqswatcher:main',
'nodewatcher = nodewatcher.nodewatcher:main']
version = "1.5.2rc2"
requires = ['boto3>=1.7.55', 'python-dateutil>=2.6.1']
'nodewatcher = nodewatcher.nodewatcher:main',
'jobwatcher = jobwatcher.jobwatcher:main']
version = "1.5.2"
requires = ['boto3>=1.7.52', 'boto>=2.48.0', 'python-dateutil>=2.6.1']

if sys.version_info[:2] == (2, 6):
# For python2.6 we have to require argparse since it
Expand Down
3 changes: 2 additions & 1 deletion tests/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
set -x
echo $PATH
which nodewatcher
which sqswatcher
which sqswatcher
which jobwatcher