-
-
Notifications
You must be signed in to change notification settings - Fork 22
Compatibility fixes with distributed 1.21.3 #63
Conversation
Are you able to test this, @azjps? Can we enable any of the tests that are currently marked as skipped or xfailed? |
- Support passing kwargs to distributed.Adaptive.__init__, which now takes keyword arguments like minimum and maximum [number of workers]. - Add an optional workers argument to _retire_workers() to match dask/distributed#1797 -- currently Adaptive raises a TypeError.
In dask/distributed#1594, the scheduler's internal maps of task objects were changed from using their keys to using TaskState objects. However, dask_drmaa.Adaptive was still querying for keys, causing new workers to never find the memory resource constraints for pending tasks and consequently tasks to never find workers with sufficient resources. This was causing the unit test test_adaptive_memory to wait indefinitely. Try to fix this to support both distributed pre- and post- 1.21.0, and un-skip test_adaptive_memory.
I don't use SGE @jakirkham but I was able to make some small modifications to try running the unit tests. Then again, I'm not positive Travis CI is using |
Well drat, this commit does work fine for me locally ( |
Travis CI just uses |
I ran into an error with Edit: This was because I was running docker from a Windows machine, had to remove all of the carriage returns from all of the |
(Was testing on python2, switching to python2/3-compatible)
Okay, so after fixing some python2-specific syntax, Travis seems happier. Even though the builds state they are using both python2 and python3, it would seem as if both builds are deferring to a shared docker image which is using python3. Based on my local testing this seems to work correctly for python2 also. So this should close #58. Unrelatedly, I still am not able to get the docker-compose build to work locally on my Windows desktop. I'll file a new issue for this. |
dask_drmaa/adaptive.py
Outdated
@@ -6,6 +6,7 @@ | |||
from distributed import Scheduler | |||
from distributed.utils import log_errors | |||
from distributed.deploy import adaptive | |||
from six import string_types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add six
to requirements.txt
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI six
is in the requirements.txt
for distributed
, but I suppose there is no harm and it is more clear in duplicating here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I spend part of my days with package management issues. Have generally found explicit requirements makes things easier to manage.
@@ -93,7 +105,17 @@ def get_scale_up_kwargs(self): | |||
return kwargs | |||
|
|||
@gen.coroutine | |||
def _retire_workers(self): | |||
def _retire_workers(self, workers=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM. Comes from PR ( dask/distributed#1797 ).
We might want to revisit whether we should be carrying this function at all or just using the parent class' functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jakirkham Currently dask_drmaa==0.1.2
raises a TypeError
when run with distributed==1.21.3
on this line of code in Adaptive
. I guess the unit tests don't cover this or the distributed
version is not pinned high enough in Travis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, sorry, missed where this was coming from on the first pass.
Would we be able to reuse that method or do you see issues with that approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, agreed that we should try to move back to the base implementation. I am not really sure why the implementation diverged from the base class to begin with, although it definitely seemed intentional: c51a15a#diff-d2ee7bfcb2312cc404b8b4953eaa2576L47. I haven't had a chance to step through the dask/distributed
internals to see if there's still any behavioral change here. Perhaps @nevermindewe can shed some light? Anyway for now, like you suggested, it may be safer to move back to the base class in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as issue ( #65 ).
dask_drmaa/adaptive.py
Outdated
# instead of string keys in its task collections: | ||
# https://github.com/dask/distributed/pull/1594 | ||
key = task.key | ||
prefix = task.prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any thoughts on bumping our distributed
requirement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, I think eventually we will definitely want to so that we don't have to deal with backwards-compatibility, but as of right now distributed==1.21.0
has only been out for a month and a half so maybe the dust hasn't settled yet. cc @mrocklin if you have any quick recommendations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm comfortable bumping up requirements. If people want to use old versions of distributed then they can use old versions of dask-drmaa as well. I think we should maintain a little bit of slack between the various dask-foo projects, but not too much.
@@ -8,8 +8,6 @@ | |||
from distributed import Client | |||
from distributed.utils_test import loop, inc, slowinc | |||
|
|||
|
|||
@pytest.mark.skip(reason="currently times out for an unknown reason") | |||
def test_adaptive_memory(loop): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this seem to be working reliably now or is it still a little flaky?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I've tried, it seems to work reliably now 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it acts up again, we can always reopen issue ( #58 ). Guessing you fixed it though as the issue cropped up with Distributed 1.21. Thanks for working on it.
Thanks @azjps. This looks pretty good. Would just add Ideally would like us to simplify this code a bit more and reuse things from |
Also a couple of miscellaneous comments, including Windows-specific comment for running docker-based tests.
README.rst
Outdated
as follows | ||
Engine cluster with a master and two slaves. You can initialize this system | ||
as follows (for Windows, make sure to remove any carriage returns from all | ||
`*.sh` shell scripts and `docker-compose.yml`): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add these to the .gitattributes
file to standardize these by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good idea -- I'll look into and split off into a separate PR.
Update requirements.txt to require distributed >= 1.21.0, since there are some internal changes in the way tasks are stored. Also drop the corresponding backwards- compatibility fixes. Feel free to revert if distributed 1.20.x support if desired.
Thanks @azjps! |
distributed.Adaptive.__init__
, which now includes arguments likeminimum
andmaximum
[number of workers].workers
argument to_retire_workers()
to match Adaptive min max distributed#1797 (currently throws aTypeError
)