-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deadlocks and infinite loops connected to failed dependencies #4360
Conversation
56b6ccb
to
9e29ace
Compare
As always, thanks for identifying these tricky issues and reporting upstream. I haven't yet had a time to go through things deeply here (and that time may not come), but if you're confident in things here I'm happy to defer to you. However, currently tests are failing, so something is going wrong I think. |
Indeed, I will need to look deeper into the tests. I opened the PR a bit prematurely but I figured this is an area where a lot is happening lately and somebody else might have an opinion |
+1 on opening PRs before they're polished |
distributed/worker.py
Outdated
ts.runspec = runspec | ||
if ts.state == "erred": | ||
ts.exception = None | ||
ts.traceback = None | ||
else: | ||
ts.state = "waiting" | ||
self.transition(ts, "waiting", runspec=runspec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not setting the runspec for these cases can actually lead to a deadlock. I've been hunting this deadlock for a while now and I am very confident this is the root cause for #4133
I also have a suspicion that this is causing #4173 but I cannot reproduce locally
Fixes are still incomplete and I will need more time fixing CI since I cannot reproduce everything. I'm a bit confused about what the "waiting" state is actually supposed to flag since this is slightly different for dependencies and runnable tasks (see in code comment). Any pointers and opinions appreciated
cc @gforsyth would you be comfortable reviewing this? |
Seems like the infinite loop I initially wanted to fix is appearing on CI (but not locally). Will need to investigate a bit further. There are also some broken tests which are failing on master as well #4374 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for diving in to this @fjetter ! The changes here make sense to me overall -- just a few comments.
And we can certainly add a transition_new_waiting
function -- most of that functionality is currently wrapped up in add_task
Ok, I can reproduce the CI failure locally... just took me 589 tries :/ |
Ok the (hopefully) last race condition is pretty bad and I don't have a resolution for it, yet, but I believe I figured out what's happening. We have two competing mechanisms to deal with missing dependencies (the cluster doesn't distinguish between actually missing deps, e.g. a worker died, and deps which could not be fetched due to some sort of exceptions, comm related or otherwise; everything is a missing dep) 1 Worker internalIf the worker is unsuccessful in fetching the dependencies after 5 times, it flags the dependency as bad and gives up. This is what my test intended to capture distributed/distributed/worker.py Lines 2167 to 2170 in 3fb3d8c
2 Escalate to schedulerThe competing mechanism escalates immediately to the scheduler via distributed/distributed/worker.py Lines 2128 to 2130 in 3fb3d8c
distributed/distributed/scheduler.py Lines 3334 to 3351 in 3fb3d8c
which transitions the faulty dependency to I am trying to find a solution which does not require me to rewrite this entire thing but it feels to me as if this mechanism requires a redesign and cleanup w/ appropriate tests (testing this is hard) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just merged the main branch to resolve some unrelated CI failures
Working on a few more changes and will update by tomorrow. I hope I have stabilised this by now but I'm not certain, yet. |
121ce5e
to
6c1028c
Compare
Changes got a bit bigger than I was hoping for. Here are the highlights
In terms of changed behaviour that should be it. Most code changes where some kind of missing or improper transitions or places I consider dead code. Some of these dead code places I just deleted, others raise exceptions. Apologies for the big PR but as you can imagine this is very hard to break apart |
|
||
dep_ts = self.tasks[dependency] | ||
self.log.append((dependency, "new-dep", dep_ts.state)) | ||
dep_ts.state = "waiting" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm certainly not sure that my approach was the correct one, but if we call release_key
on a task that has any dependents, then the key will end up popped out of self.tasks
without also clearing the corresponding item out of self.data
-- so if we set the state to waiting
here without checking for presence in self.data
it can fail the check in validate_task_waiting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will check this section again. In general I'm not very happy with the approach of modifying the state attribute manually and maybe this should also by done via a proper transition step. tbh, I haven't fully understood your comment, yet, but if multiple tasks / dependencies must be adjusted this sounds like something which should be handled by a transition method anyhow I suppose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment was very unclear (having just re-read it).
It is possible at this point to have a task whose result is present in self.data
even though there is no corresponding TaskState
object in self.tasks
. If that is the case, then the correct state
is memory
. If we blanket set it to waiting
it will fail the assertions in validate_task_waiting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can data end up in memory without it being a task? (other than manual manipulation of the dict)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I indeed am seeing this error case you are describing but I'm wondering if this is an allowed state altogether. Even when effectively setting data manually (i.e. using the respective API) a TaskState
object is created if it doesn't exist.
distributed/distributed/worker.py
Lines 1387 to 1398 in 49e19c0
def update_data(self, comm=None, data=None, report=True, serializers=None): | |
for key, value in data.items(): | |
ts = self.tasks.get(key) | |
if getattr(ts, "state", None) is not None: | |
self.transition(ts, "memory", value=value) | |
else: | |
self.tasks[key] = ts = TaskState(key) | |
self.put_key_in_memory(ts, value) | |
ts.priority = None | |
ts.duration = None | |
self.log.append((key, "receive-from-scatter")) |
I was working under the assumption that data
is always a subset of tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That confused me quite a bit. I don't necessarily agree with the implementation right now but I'll restore everything as it was to reduce the number of changes.
I'm definitely game to sync up at some point and sketch out a cleaner implementation for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm definitely game to sync up at some point and sketch out a cleaner implementation for this.
Sounds good. Once this PR is finally stable, I'll open an issue and summarise the points I've seen where I think we can improve and we can sync up on the topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the PR with my current state. Unfortunately, reinstating the full dependency/release/refcounting as described above seemed to make things worse :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this introduce memory leakage?
Seeing the question about memory leak, I'm curious if this could be related to #3898.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my argumentation about the memory leak was true, the leak would happen on the worker since the worker would not release the data anymore. From my understanding, #3898 is about a potential leak on the scheduler.
Also, the leak I was referring to does not exist and I was mistaken. The worker applies some refcounting by tracking the dependents
of each task and releases the data properly once all dependents are gone, i.e. the data is no longer needed. I missed this mechanism since it's a bit spread out in this module but from my understanding it works as intended.
6c1028c
to
cdd303f
Compare
2b49c73
to
42690f0
Compare
Looking over the changes now -- I've run the two The test_stress failure also fails on my laptop, but my work laptop has several networking issues due to security stuff, so it may not be relevant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes overall look good to me. I'm seeing a few assertion errors when I run test_failed_workers.py::test_broken_worker_during_computation
. Every 5th time or so, there's an assertion error when a worker tries to transition a task from ready -> waiting
which results in a KeyError
.
I'm fairly certain this only happens before the worker in question is killed, so it may just be some messiness as the worker is coming down before a new worker takes its place.
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornad
o.platform.asyncio.AsyncIOLoop object at 0x7f873d8dc5b0>>, <Task finished name='Task-722' coro=<Worker.gather_dep() done, de
fined at /Users/gil/github.com/dask/distributed/distributed/worker.py:2005> exception=KeyError(('ready', 'waiting'))>)
Traceback (most recent call last):
File "/Users/gil/miniforge3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_cal
lback
ret = callback()
File "/Users/gil/miniforge3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 767, in _discard
_future_result
future.result()
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2143, in gather_dep
self.transition(ts, "waiting", worker=worker)
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 1551, in transition
func = self._transitions[start, finish]
KeyError: ('ready', 'waiting')
distributed.nanny - WARNING - Restarting worker
and occasional assertion errors after the new workers come up (these two problems never occur during the same test run, it's either one or the other)
distributed.worker - ERROR -
Traceback (most recent call last):
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2965, in validate_state
assert ts.key in self.data or ts.key in self.actors
AssertionError
distributed.worker - ERROR -
Traceback (most recent call last):
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2274, in release_key
self.validate_state()
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2965, in validate_state
assert ts.key in self.data or ts.key in self.actors
AssertionError
distributed.core - ERROR -
Traceback (most recent call last):
File "/Users/gil/github.com/dask/distributed/distributed/core.py", line 563, in handle_stream
handler(**merge(extra, msg))
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 1430, in delete_data
self.release_key(key, reason="delete")
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2274, in release_key
self.validate_state()
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2965, in validate_state
assert ts.key in self.data or ts.key in self.actors
AssertionError
distributed.worker - ERROR -
Traceback (most recent call last):
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 985, in handle_scheduler
await self.handle_stream(
File "/Users/gil/github.com/dask/distributed/distributed/core.py", line 563, in handle_stream
handler(**merge(extra, msg))
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 1430, in delete_data
self.release_key(key, reason="delete")
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2274, in release_key
self.validate_state()
File "/Users/gil/github.com/dask/distributed/distributed/worker.py", line 2965, in validate_state
assert ts.key in self.data or ts.key in self.actors
AssertionError
I'm trying to figure out the root cause (of mostly the second error) but I don't think it's any worse than the existing behavior and I wouldn't consider it a blocker.
Rechecked the mentioned tests myself and I also can see a
in back to it... |
I believe the first AssertionError mentioned above about the state transition ready->waiting is merely a wrong transition and should be easily fixed. The second looks a bit more troublesome. While I agree that it's probably not worse than before, having a task in state "memory" w/out any data sounds quite serious, even if it is very rare. I'll have another look |
f90381f
to
99ad636
Compare
Currently, I am observing a few failures on travis which seem unrelated. I'll retrigger the runs to see if they are persistent Py3.6
Py3.7
Py3.8 a few failures which look similar to above but eventually segfault #4387 |
I recommend opening an empty PR against master to test CI
…On Mon, Jan 11, 2021 at 7:11 AM Florian Jetter ***@***.***> wrote:
Currently, I am observing a few failures on travis which seem unrelated.
I'll retrigger the runs to see if they are persistent
Py3.6
- distributed/tests/test_stess::test_cancel_stress with an RuntimeError:
('Unclosed Comms', []) error
- distributed/tests/test_stess:: test_close_connections with a simple
timeout
- distributed/tests/test_worker::test_wait_for_outgoing with RuntimeError:
cannot schedule new futures after shutdown
Py3.7
- test_stop_doing_unnecessary_work timeout
- distributed/tests/test_stess:: test_close_connections with a simple
timeout
- distributed/tests/test_worker::test_wait_for_outgoing with RuntimeError:
cannot schedule new futures after shutdown
Py3.8 a few failures which look similar to above but eventually segfault
#4387 <#4387>
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#4360 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTFZH3NU2P4RWS3MLI3SZMIIVANCNFSM4U2YDSXQ>
.
|
Wow, I'm really falling into a rabbit hole with this. I was investigating the Actually looking at the logs reveals that the
This negative value comes from a kind of "doublefree". The counter is decreased once during This connects to distributed/distributed/worker.py Lines 2584 to 2585 in 658fccc
I'm slowly wondering how we want to proceed with this since I continue to find issues and am trying to plug all the holes at once in this PR. That makes the entire thing very messy and almost impossible to review, I assume. Merging anything might make things worse in the meantime, though. Any suggestions on how to proceed here? |
That is a fair concern. Ideally incremental PRs would be better, but we're also fixing bugs after a larger change ( Of course, that sort of relaxed behavior is also what caused this problem in the first place. |
@fjetter do you have this fix pushed up to a branch somewhere? |
e1b92e4
to
d378c84
Compare
Ok, the last one was connected to work stealing. I mentioned this as a potential issue over in #4413 already. My solution is, for now, to put a stolen task into "waiting for dependencies" which translates to the actual state That was the cause for the breakage of at least |
d378c84
to
0c59e63
Compare
I think the CI failures are finally "expected". However, doing some real stress tests with this still reveal instabilities. There are some failures which are probably invalid in-memory task states
Traceback (most recent call last):
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/worker.py", line 2572, in execute
data[k] = self.data[k]
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/zict/buffer.py", line 80, in __getitem__
raise KeyError(key)
KeyError: "('repartition-415-5a8ec5d8ce9f7fa507c212dd1c024d87', 83)"
During handling of the above exception, another exception occurred: Traceback (most recent call last): and some invalid state transitions triggered by some scheduler <-> worker interaction/rescheduling, I suspect again work stealing of some kind
('executing', 'ready')
Traceback (most recent call last):
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/worker.py", line 1538, in add_task
self.transition(ts, "ready")
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/worker.py", line 1560, in transition
func = self._transitions[start, finish]
KeyError: ('executing', 'ready')
Unfortunately, these are causing hard deadlocks and I don't trust this to be merged just yet |
Hi @fjetter is there something that I can run in order to expose these issues? With regards to unexpected transitions, what I would probably do in this situation is call |
@mrocklin I ran into the first issue @fjetter listed running the example at #4430 (comment) |
Yeah, I was hoping to find something reproducible locally, but I'll try taking that and putting that into a stress test. |
I think Florian fixed all of my local reproducers -- I've run the previous failing tests 50 times each and can't get them to fail, so it may require something at scale |
@mrocklin Yes I am aware of the I'm fairly certain that most, if not all, problems are connected to how What's most frustrating is that most issues cannot reliably reproduced, at least not by unit tests. Unit tests hit these cases but only every few 100 times. (bash script with an endless loop...) |
What I noticed is that a lot of the internal state machine is effectively tested using the |
You might also consider iterating from within an ipython session. For fast unit tests you can often run the test several times a second. I have some time to look into this. I'm curious how best I can spend this time. Do you have any suggestions? Also, I know that it's late on a Friday for you, but if you want to connect over video on this let me know. I would be curious to learn more about what is going on and your thoughts on how to fix it. I'm free 6-7pm Central European time if that's interesting to you. |
I would expect the validation tests to stop a |
I opened another smaller PR #4432 with a minimal set of changes. With this I can still see assertion errors (but the tests are not failing ;) e.g. Where this PR escalated was when I tried reworking the missing keys mechanism, I omitted in #4432. I believe there is a lot of ambiguity in how tasks are managed and released, in particular in cases of missing keys, comm failures or missing workers (which also happens in stress situations a lot). This afternoon I was investigating scheduler handlers and the way the worker escalates missing or erroneous tasks where I noticed the handlers @mrocklin I would love to connect but my internet is dropping every few minutes today and its getting late already. Maybe we can find a spot next week, assuming nobody else fixed it by then :) |
Sounds good. In the mean time I'm curious, are there particular things that you're doing to replicate failures? Or is this mostly just the normal test suite? If so, which tests in particular? |
in #4432 you can get some errors (although the test passes) running |
Thanks @gforsyth . That's helpful. Reproduced. |
I'll close this PR now since it escalated pretty badly. The deadlock issue was (at least partially) addressed in #4432 and I suggest to open new PRs for follow up issues. Thanks everyone for review and input! |
Two fixes in here which are unfortunately a bit intertwined.
if
send_rcv
fails for whatever reason (e.g. deserialization error) but doesn't close the comm, the other end might still wait for a response. in the meantime this comm is resused, causing some weird behaviour, see also WIP Proper handling of application errors during get_data #4316 I'll still need to write a test capturing the behaviour described in thereIf we are in the case of a bad_dep, the current implementation never detects this and we're stuck in an infinite loop since the condition
dep.suspicious_count > 5
is never hit. It is never hit because there are multiple calls torelease-keys
issued causing the worker to loose all its memory, therefore the suspicious count never climbs up but the task instance is reinitialized