-
-
Notifications
You must be signed in to change notification settings - Fork 715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Overhaul transitions for the resumed state #6699
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 6h 30m 37s ⏱️ - 35m 18s For more details on these failures and errors, see this check. Results for commit f190b4b. ± Comparison against base commit 1d0701b. ♻️ This comment has been updated with latest results. |
64100c4
to
04cc7ac
Compare
There are two failing tests. |
distributed/worker_state_machine.py
Outdated
ts.coming_from = None | ||
ts.state = "released" | ||
ts.done = False | ||
ts.previous = None | ||
ts.next = None | ||
return {ts: "waiting"}, [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're missing a in_flight_tasks.remove/discard(ts)
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved them all in _gather_dep_done_common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of small things but overall this looks great. I think spelling out the resumed transitions explicitly was long overdue
so we're entering cancelled state and waiting until it completes. | ||
""" | ||
assert ts.state in ("executing", "long-running") | ||
ts.previous = cast(Literal["executing", "long-running"], ts.state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, what does this literal cast give us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ts.state is a Literal[executing, long-running, flight, <all other states>]
ts.previous is a Literal[executing, long-running, flight, None]
mypy is not smart enough to realise that the assertion on the line above guarantees the intersection of the two domains.
distributed/worker_state_machine.py
Outdated
ts.state = "released" | ||
msg = error_message(e) | ||
recs = {ts: tuple(msg.values())} | ||
recs2 = {ts: tuple(msg.values())} | ||
instr2: Instructions = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this would create a released->error transition which is not defined.
By not setting ts.state = "released"
we'd get an ordinary executing->error
transition. The serialization should be picked up as a task compute / user error which I think is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
released->error
is defined, and we need it anyway in case the serialization problem is caused by scatter (although I'd bet the use case is untested).
03554df
to
fbbeb29
Compare
@fjetter e.g. In this case, there's going to be a transition executing->memory while ts.done is False, followed by a transition memory->memory when execute() completes. You'll end up with the task permanently in WorkerState.executing, permanently 1 less thread to run other tasks on, and permanently reduced resources. I'm deleting |
distributed/worker_state_machine.py
Outdated
if ts.previous in ("executing", "long-running"): | ||
# Task dependencies may have been released; | ||
# can't call _validate_task_executing | ||
assert not ts.waiting_for_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you call _validate_task_executing()
from here, only one test in the whole suite becomes flaky, test_cancel_fire_and_forget
(dependencies can be released if the dependent is cancelled, but they must be in memory if it's executing) and on my computer only 0.1% of the times (more frequently on CI).
I added test_cancel_with_dependencies_in_memory
to reproduce the use case deterministically. Still I find it very scary how little coverage we have for cancelled/resumed tasks with dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it very scary how little coverage we have for cancelled/resumed tasks with dependencies.
I do not disagree. At the time we started patching up the worker we barely had any coverage at all for any of these internals. We've come a long way and there is still a lot to do to reach 100% coverage.
We have to be a bit pragmatic here; I do not believe it is feasible or effective for us right now to push for absolute coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow-up: #6893
possible
Why is scattering even transitioning via executing? That sounds wrong. I remember concretely having a released->memory transition for this at some point in time |
There's nothing that stops a user from scattering an object while a task with the same key is already running. This is most likely to happen while prototyping on a jupyter notebook. |
|
||
class BrokenWorker(Worker): | ||
block_get_data = True | ||
def test_flight_cancelled_error(ws): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous test did not trip on #6877. I chose not to bother investigating why and just rewrote it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not keep both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because a test that does not test its declared use case is useless and misleading
@@ -533,7 +514,7 @@ async def release_all_futures(): | |||
|
|||
await lock_compute.release() | |||
|
|||
if not raise_error: | |||
if not wait_for_processing and not raise_error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_resumed_cancelled_handle_compute
defeated me.
I could not make any sense of it.
I found it so unfathomable that it made me originally can this PR and restart from scratch with #6716, and then again with #6844.
This change makes it green again, but I am not sure that the tested stories are not highlighting any problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #6905 with some explanations. I admit the test is a bit convoluted but I believe it is valuable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the test is flaky now. I'll try investigating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My best guess for the flakyness is that the ordering with wait_for_state(f3.key, "processing", s)
and lock_compute.release
is very timing sensitive. Previously, the state machine was wired in such a way that this wouldn't matter but that is no longer the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, if I add a sleep(1) just before await lock_compute.release()
, the two tests with wait_for_processing=False
fail deterministically.
It's a race condition of client.submit (client->scheduler comms) vs. distributed.Lock release (client->scheduler->worker).
Pushing a fix.
@@ -1902,41 +1911,6 @@ def _transition_waiting_ready( | |||
|
|||
return self._ensure_computing() | |||
|
|||
def _transition_cancelled_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now uses _transition_cancelled_released
@@ -1960,7 +1934,7 @@ def _transition_generic_error( | |||
|
|||
return {}, [smsg] | |||
|
|||
def _transition_executing_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now uses _transition_generic_error
return {}, [] | ||
else: | ||
assert ts.previous == "flight" | ||
ts.state = "resumed" | ||
ts.next = "waiting" | ||
return {}, [] | ||
|
||
def _transition_cancelled_forgotten( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is impossible
ts.done = False | ||
return {}, [] | ||
|
||
def _transition_generic_memory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved below and refactored
return self._transition_generic_fetch(ts, stimulus_id=stimulus_id) | ||
|
||
def _transition_flight_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now uses _transition_generic_error
for ts in self.in_flight_tasks: | ||
assert ts.state == "flight" or ( | ||
ts.state in ("cancelled", "resumed") and ts.previous == "flight" | ||
), ts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions can actually fail with a poorly-timed scatter()
@fjetter ready for final review and merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Most comments are mostly informative and should not block.
elif not wait_for_processing and raise_error: | ||
assert await f4 == 4 + 2 | ||
|
||
assert_story( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: instead of asserting on the story, we could as well assert on the events triggered. I consider the events much more readable and concise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I'm actually very curious what kind of events we previously ignored. It makes sense that the worker should not behave identically with different scheduler timings but apparently we ignored something before this change
|
||
class BrokenWorker(Worker): | ||
block_get_data = True | ||
def test_flight_cancelled_error(ws): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not keep both?
(f3.key, "ready", "executing", "executing", {}), | ||
(f3.key, "executing", "released", "cancelled", {}), | ||
(f3.key, "cancelled", "fetch", "resumed", {}), | ||
(f3.key, "resumed", "error", "released", {f3.key: "fetch"}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a change in behavior. Previously we stayed in the error
state. That was arguable a false behavior given the definitions of the cancelled and resumed state but I don't think this will matter in practice.
Just pointing it out, I'm fine with the new behavior
(f3.key, "resumed", "waiting", "executing", {}), | ||
(f3.key, "executing", "memory", "memory", {}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 looks much cleaner. I vaguely remember that I intentionally did not reset to executing for some reason but I'm glad if we can do it properly. This makes the states much more deterministic
|
||
self._release_resources(ts) | ||
ts.previous = None | ||
ts.done = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not necessary. The generic_release triggers a purge_state which rests this
ts.done = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was a conscious decision. I'd like to be as explicit as possible in the specific code.
distributed/worker_state_machine.py
Outdated
# Note: this is not the same as recommending {ts: "released"} on the | ||
# previous line, as it would instead transition the task to cancelled - but | ||
# a task that raised the Reschedule() exception is finished! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment still relevant? I don't really understand it and it seems to reference a line that no longer exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is still valid.
transition_executing_released
and transition_resumed_released
contain assert not ts.done
. I could of course change them with a special code path that is exclusive of the rescheduled state, but I thought that keeping the logic within transition_*_rescheduled
was a lot more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow-up: #6685
@@ -533,7 +514,7 @@ async def release_all_futures(): | |||
|
|||
await lock_compute.release() | |||
|
|||
if not raise_error: | |||
if not wait_for_processing and not raise_error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My best guess for the flakyness is that the ordering with wait_for_state(f3.key, "processing", s)
and lock_compute.release
is very timing sensitive. Previously, the state machine was wired in such a way that this wouldn't matter but that is no longer the case
_transition_from_resumed
contains legacy code and documentation #6693AssertionError
inWorkerState._transition_cancelled_error
#6877