-
Notifications
You must be signed in to change notification settings - Fork 90
Scale Up #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scale Up #43
Conversation
scaling/scaling.cfg
Outdated
| @@ -0,0 +1,5 @@ | |||
| [scaling] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a sample file? Could you add a comment at the top?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment added. This file is used for local testing.
sqswatcher/send_sqs.sh
Outdated
| cfn_region=us-east-1 | ||
| cfn_sqs_queue=https://sqs.us-east-1.amazonaws.com/822857487308/AS-SQS | ||
| cfn_instance_slots=3 | ||
| instance_id=i-04fcafee570cd7073 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this instance_id? Is this file a sample?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this file. It was just used for testing.
scaling/plugins/sge.py
Outdated
| args = shlex.split(command) | ||
| pending = 0 | ||
| try: | ||
| _output = subprocess.check_output(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alas check_output was introduced in Python 2.7 and this is not going to work on Centos6 with Python 2.6.6.
We could make an helper module to cope with this. See also PR #32
(The same comment applies to all the subprocess.check_output below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, I can change this function to use subprocess.Popen
scaling/plugins/sge.py
Outdated
| slots = run_command(command) | ||
| return nodes(slots, instance_type) | ||
|
|
||
| # get nodes requested from pending jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment needs to be changed (the same for the running() functions of the other modules).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a general comment about the current implementation, I'd suggest to better decouple the scheduler plugin modules from other implementation details and choices.
In particular I wouldn't pass the instance_type down to the sge/slurm/torque but a dict of properties, today just the 'cpus' or 'processors' property.
The scaling module should then cope with the details of retrieving the AWS instance details - today the json file put somewhere it knows, etc.... and then pass the properties downstream,
scaling/plugins/torque.py
Outdated
| line_arr = line.split() | ||
| print(line_arr) | ||
| if len(line_arr) >= 10 and line_arr[9] in status: | ||
| pending += int(line_arr[5]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the variable should be renamed to something like 'njobs' since they are not necessarily pending jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
scaling/plugins/sge.py
Outdated
| for line in output: | ||
| line_arr = line.split() | ||
| if len(line_arr) >= 8: | ||
| pending += int(line_arr[7]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename in 'njobs' or similar, see similar comment below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
scaling/plugins/slurm.py
Outdated
| for line in output: | ||
| line_arr = line.split() | ||
| if len(line_arr) == 2 and line_arr[0] == status: | ||
| pending += int(line_arr[1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename it, see comment for 'torque'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we no longer supporting openlava.py? Don't see the changes to that file?
On reviewing the cfncluster.cfn.json, I think maybe we should delete openlava.py from the plugins in a separate task?
jobwatcher/jobwatcher.py
Outdated
| try: | ||
| vcpus = int(instances[instance_type]["vcpus"]) | ||
| log.info("Instance %s has %s slots." % (instance_type, vcpus)) | ||
| return {'compute_instance': vcpus} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the name of the property of the instance. Tomorrow we may need to pass memory or gpus etc. Avoiding to call it vcpu, we can call it processors, slots ...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to slots.
jobwatcher/jobwatcher.py
Outdated
| instance_properties = get_instance_properties(instance_type) | ||
|
|
||
| # Get number of nodes requested | ||
| pending = s.pending(instance_properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd name the methods for what they actually return, get_required_nodes() and get_busy_nodes() or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| # check to make sure it's in limits | ||
| desired = running + pending | ||
| if desired > max: | ||
| log.info("%d requested nodes is greater than max %d. Requesting max %d." % (desired, max, max)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is worth to be a warning level message since we are touching the max limit and some of the pending jobs are not going to be served in the next scale up round..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is normal behaviour. I wouldn't expect it to be a warning.
jobwatcher/plugins/torque.py
Outdated
| log = logging.getLogger(__name__) | ||
|
|
||
| # get nodes requested from pending jobs | ||
| def pending(instance_properties): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider to "raise an exception" (log an error, have some sort of user notification...) when a pending job requires a single node with a number of processors greater than of the num of processors provided by the instance type?
jobwatcher/plugins/sge.py
Outdated
| return -(-slots // vcpus) | ||
|
|
||
| # get nodes reserved by running jobs | ||
| def running(instance_properties): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we're returning the theoretical number of busy hosts computed from the busy slots.
Consider the case we have 2 instances of 2 slots each and 1-slot-job running on each of them. Current ASG-desired = 2.
Then assume a 2-slots-job is submitted:
#busy_hosts = 2/2 = 1
#requested_hosts = 2/2 = 1
==> #new_desired = 1 + 1 = 2 === ASG_current_desired
==> no scaling
We should discuss about the possible scenarios and the best effort approach we want to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. New behaviour counts a whole node as busy if any of the slots are used/reserved.
For example, in the following, there's 8 slots filled which previously would report 2 nodes. With the change it'll report 3 nodes as busy.
$ qstat -f
queuename qtype resv/used/tot. load_avg arch states
---------------------------------------------------------------------------------
all.q@ip-172-31-16-37.ec2.inte BIP 0/2/4 0.00 lx-amd64
---------------------------------------------------------------------------------
all.q@ip-172-31-19-96.ec2.inte BIP 0/4/4 0.00 lx-amd64
---------------------------------------------------------------------------------
all.q@ip-172-31-20-40.ec2.inte BIP 0/0/4 0.27 lx-amd64
---------------------------------------------------------------------------------
all.q@ip-172-31-92-215.ec2.int BIP 0/2/4 0.00 lx-amd64
jobwatcher/plugins/sge.py
Outdated
| _command = shlex.split(command) | ||
| try: | ||
| DEV_NULL = open(os.devnull, "rb") | ||
| process = sub.Popen(_command, env=dict(os.environ), stdout=sub.PIPE, stderr=sub.STDOUT, stdin=DEV_NULL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you add /usr/bin and the likes to this environment. I see that qstat is not being used with the fullpath, how will qstat be resolved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
jobwatcher/plugins/sge.py
Outdated
| process = sub.Popen(_command, env=dict(os.environ), stdout=sub.PIPE, stderr=sub.STDOUT, stdin=DEV_NULL) | ||
| _output = process.communicate()[0] | ||
| exitcode = process.poll() | ||
| if exitcode != 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be an issue if there is nothing written to stdout. I think in such cases exitcode is almost always 1. Could you please verify that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's just for grep. For qstat, it returns a 0 exit code even with no output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at the comments.
Changes the scaling logic from a cloudwatch metric produced every two minutes to a cronjob on the node that directly sets the asg values. Signed-off-by: Sean Smith <seaam@amazon.com>
New script,
scaling.py, scales up based on number of nodes requested in the scheduler's queue. This gets rid of slow scale up times.I merged in the
boto3branch, only files that need review arescaling/*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.