New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add lock to scheduler for sensitive operations #3259
Conversation
Some operations like retiring workers or rebalancing data shouldn't happen concurrently. Here we add an asynchronous lock around these operations in order to protect them from each other. This doesn't yet have any tests.
If only we had reentrant locks
cc @jcrist if you have a moment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks helpful. Thanks for bringing it forward. I have three remarks and a small code question:
- Looking at it, I am wondering if there is a potential for deadlocks: Multiple scheduler methods that share the same lock and then make calls to workers instances. If one of the workers decides to call back into the scheduler, we could be in trouble (e.g., because it decides to trigger a graceful shutdown).
- Have you considered for how long the lock would be taken? I fear that if substantial data is transferred between workers, then the now sequential shutdown of workers may be stalling, causing subsequent issues (e.g. timeouts in container/cluster schedules, lost data due to stalled transfer).
- Do you see a risk in data that is still bouncing between workers that will soon be shut down? I am not sure how much of a problem that will be in practice.
With an approach such as #3248 this would not be necessary but this introduces additional async/background behaviour that might also be complex to reason about in practice (e.g., how does a worker know that his data is finally moved elsewhere).
else: | ||
workers = set(self.workers.values()) | ||
workers_by_task = {ts: ts.who_has for ts in tasks} | ||
async with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am new to async so please excuse my ignorance: Documentation (https://docs.python.org/3.6/library/asyncio-sync.html) proposes to use this snippet instead:
lock = Lock()
...
with (yield from lock):
...
Is there a functional difference that made you prefer one over the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async with
is a newer syntax that wasn't available until 3.5 I think. It is usable within async def functions while yield from
syntax is usable from within @asyncio.coroutines
. In general the Dask codebase switched from coroutiens to async def functions a while ago, and so we tend to prefer to use that syntax.
Yes, in fact this is the reason for the second commit, which fixes some deadlocks that were exposed by our tests. I hope that tests caught everything, but I don't know for certain.
Potentially a while as some of these operations take a while. However, these operations aren't really designed to work well with each other, so I think that this is sensible. If we want to improve speed here then I think we need to more fundamentally reengineer things here.
That, and #3248 also seems narrower in what it solves. The complexity to solution ratio there seems higher than what I'd like personally. |
I have given this branch a trial run in a scenario where we are using graceful downscaling. I could neither observe crashes nor any deadlocks. Of course this doesn't prove anything but it is at least some evidence. |
Some operations like retiring workers or rebalancing data shouldn't
happen concurrently. Here we add an asynchronous lock around these
operations in order to protect them from each other.
This doesn't yet have any tests.