New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle CancelledError
properly in ConnectionPool
#8110
Conversation
CancelledError
properly in ConnectionPool
@gen_cluster(client=True, nthreads=[("", 1)]) | ||
async def test_gather_dep_cancelled_error(c, s, a): | ||
"""Something somewhere in the networking stack raises CancelledError while | ||
gather_dep is running | ||
|
||
See Also | ||
-------- | ||
test_get_data_cancelled_error | ||
https://github.com/dask/distributed/issues/8006 | ||
""" | ||
async with BlockedGetData(s.address) as b: | ||
x = c.submit(inc, 1, key="x", workers=[b.address]) | ||
y = c.submit(inc, x, key="y", workers=[a.address]) | ||
await b.in_get_data.wait() | ||
tasks = { | ||
task for task in asyncio.all_tasks() if "gather_dep" in task.get_name() | ||
} | ||
assert tasks | ||
# There should be only one task but cope with finding more just in case a | ||
# previous test didn't properly clean up | ||
for task in tasks: | ||
task.cancel() | ||
|
||
b.block_get_data.set() | ||
assert await y == 3 | ||
|
||
assert_story( | ||
a.state.story("x"), | ||
[ | ||
("x", "fetch", "flight", "flight", {}), | ||
("x", "flight", "missing", "missing", {}), | ||
("x", "missing", "fetch", "fetch", {}), | ||
("x", "fetch", "flight", "flight", {}), | ||
("x", "flight", "memory", "memory", {"y": "ready"}), | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is too artificial. It trigger a condition that is only possible during worker shutdown. Nothing is ever cancelling the actual gather_dep
task itself unless we're shutting down the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other PR is introducing also test_get_data_cancelled_error
which is, I believe, a much more represenative test (which also works on this PR).
The test is provoking a (still wrong and artifical) CancelledError
on write
. This is wrong and will never happen, I believe. However, due to our low level handshakes, this CancelledError
cancels the actual connect
and is therefore kind of representative of the real issue
try: | ||
return connect_attempt.result() | ||
except asyncio.CancelledError: | ||
reason = self._reasons.pop(connect_attempt, "ConnectionPool closing.") | ||
raise CommClosedError(reason) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graingert I'm all ears if you know how to better test this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got test_remove_cancels_connect_before_task_running
which appears to produce this condition 🎉 (not a perfect test but it's something)
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 21 files ± 0 21 suites ±0 10h 57m 32s ⏱️ -38s For more details on these failures, see this check. Results for commit 6a2a838. ± Comparison against base commit 8aa04a8. This pull request removes 3 and adds 1 tests. Note that renamed tests count towards both.
|
This reverts #8013 and changes the way how we're handling cancellations in the connection pool.
I haven't figured out quite how we end up in this situation but somehow theconnect_attempt
task is raising aCancelledError
even though the task itself is catchingCancelledError
s and is supposed to reraise asCommClosed
. For whatever reason this is not happening and is therefore raising theCancelledError
If the task is cancelled before the task is actually scheduled, awaiting it can still cause a
CancelledError
to be raised even if the body handles this and reraises asCommClosed
. I don't know why this issue hasn't affected us before.Catching the CancelledError in
gather_dep
is causing other tests to flake. Explicitly, I found thatdistributed.tests.test_scheduler.test_tell_workers_when_peers_have_left
is affected by this change in gather_dep. If #8013 is reverted, that test is rock solid.Generally, these two tests have been sensitive to changes around this cancellation
distributed/tests/test_worker_metrics.py::test_gather_dep_network_error
distributed/tests/test_scheduler.py::test_tell_workers_when_peers_have_left
and with this PR, both tests are rock solid on my machine.
This may collide / supersede / be superseded by #8109
Related #8108