-
-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle errors on individual workers in run/broadcast #5590
Conversation
distributed/client.py
Outdated
kwargs=dumps(kwargs, protocol=4), | ||
function=dumps(function), | ||
args=dumps(args), | ||
kwargs=dumps(kwargs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump to protocol 5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using protocol=4
here was to support cases where different nodes are running Python 3.7 (where protocol=5
isn't available) and 3.8+. To be clear, there may be other places where things break in this situation, but historically we've tried to be accommodating where possible (within reason). We'll probably drop Python 3.7 support soon -- it might be worth keeping protocol=4
for now and dropping it when we drop Python 3.7
@@ -3546,7 +3604,7 @@ def dump_cluster_state( | |||
filename: str = "dask-cluster-dump", | |||
exclude: Collection[str] = (), | |||
format: Literal["msgpack", "yaml"] = "msgpack", | |||
) -> Awaitable | None: | |||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous version would confuse mypy (see mypy guidelines re. return Any
). I don't think there's an easy way to annotate sync/async client functions short of some major refactoring work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @crusaderky. @ncclementi @scharlottej13 and I just took a look at this and overall it looks good, we left a couple of small comments below
distributed/client.py
Outdated
kwargs=dumps(kwargs, protocol=4), | ||
function=dumps(function), | ||
args=dumps(args), | ||
kwargs=dumps(kwargs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using protocol=4
here was to support cases where different nodes are running Python 3.7 (where protocol=5
isn't available) and 3.8+. To be clear, there may be other places where things break in this situation, but historically we've tried to be accommodating where possible (within reason). We'll probably drop Python 3.7 support soon -- it might be worth keeping protocol=4
for now and dropping it when we drop Python 3.7
serializers=None, | ||
): | ||
on_error: "Literal['raise', 'return', 'return_pickle', 'ignore']" = "raise", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove these quotes like in other places where we're using Literal
?
on_error: "Literal['raise', 'return', 'return_pickle', 'ignore']" = "raise", | |
on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = "raise", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in this module, because Cython does not support from __future__ import annotations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see -- thanks for clarifying. I should have known you would have done this is you could : )
warnings.warn( | ||
"workers=True is deprecated; pass workers=None or omit instead", | ||
category=FutureWarning, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a small pytest.warns
test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Hrm, gpuCI is failing with 11:12:05 E TypeError: broadcast() got an unexpected keyword argument 'on_error' which is related because this PR is where EDIT: We're getting a version mismatch warning for the failing test (see below). The scheduler in that test isn't running with the changes in this PR, which is why we're getting the 11:12:05 =============================== warnings summary ===============================
11:12:05 distributed/comm/tests/test_ucx_config.py::test_ucx_config_w_env_var
11:12:05 /workspace/distributed/client.py:1096: VersionMismatchWarning: Mismatched versions found
11:12:05
11:12:05 +-------------+------------------------+-----------+---------+
11:12:05 | Package | client | scheduler | workers |
11:12:05 +-------------+------------------------+-----------+---------+
11:12:05 | distributed | 2021.12.0+11.g957be0fc | 2021.12.0 | None |
11:12:05 +-------------+------------------------+-----------+---------+
11:12:05 warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"])) |
Reverted to protocol=4. All review comments have been incorporated. |
Thanks for bringing this up @jrbourbeau! I unfortunately don't have much experience with this test and I believe @pentschev is offline; @quasiben does anything about these failures stand out to you? |
@@ -3872,7 +3930,8 @@ async def _get_versions(self, check=False, packages=[]): | |||
scheduler = await self.scheduler.versions() # this raises | |||
|
|||
workers = await self.scheduler.broadcast( | |||
msg={"op": "versions", "packages": packages} | |||
msg={"op": "versions", "packages": packages}, | |||
on_error="ignore", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, one last comment after thinking about this a bit more. We often ask users to inspect the output of Client.get_versions
when debugging package version mismatches. With on_error="ignore"
here we could run into cases where there's a problematic version mismatch on an unresponsive worker which will be silently ignored (and hard to debug). What happens today? My guess is a connection error is raised
Thoughts on emitting a warning instead of silently ignoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now logging an error in the scheduler log
I can't reproduce the error locally neither with main nor this PR: $ pytest -vs distributed/comm/tests/test_ucx_config.py
================================================== test session starts ==================================================
platform linux -- Python 3.8.12, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /datasets/pentschev/miniconda3/envs/rn-115-22.02.211213/bin/python
cachedir: .pytest_cache
rootdir: /datasets/pentschev/src/distributed, configfile: setup.cfg
plugins: asyncio-0.16.0, repeat-0.8.0, rerunfailures-10.2, timeout-2.0.1
timeout: 300.0s
timeout method: thread
timeout func_only: False
collected 2 items
distributed/comm/tests/test_ucx_config.py::test_ucx_config PASSED
distributed/comm/tests/test_ucx_config.py::test_ucx_config_w_env_var PASSED
================================================= slowest 20 durations ==================================================
2.65s call distributed/comm/tests/test_ucx_config.py::test_ucx_config_w_env_var
0.01s call distributed/comm/tests/test_ucx_config.py::test_ucx_config
0.01s setup distributed/comm/tests/test_ucx_config.py::test_ucx_config
0.00s setup distributed/comm/tests/test_ucx_config.py::test_ucx_config_w_env_var
0.00s teardown distributed/comm/tests/test_ucx_config.py::test_ucx_config_w_env_var
0.00s teardown distributed/comm/tests/test_ucx_config.py::test_ucx_config
=================================================== 2 passed in 3.07s ===================================================
$ ipython
Python 3.8.12 | packaged by conda-forge | (default, Oct 12 2021, 21:59:51)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.30.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: import distributed
In [2]: distributed.__version__
Out[2]: '2021.12.0+13.g8a085126' I don't quite get why there are mismatched versions here, but perhaps that's the issue then? |
Also as @charlesbluca mentioned, I'm indeed mostly off through the rest of the year, so my response may be severely delayed. |
Thanks @charlesbluca @pentschev. #5595 should resolve the version warning (Also, hope you enjoy your time off @pentschev 🙂) |
All test failures (besides gpuCI) seem unrelated |
…cast_error_handling
Just merged |
All test failures are unrelated. |
…cast_error_handling
Add sensible handling of failures on individual workers:
Client.run
By default, continue raising as before. Add the option to return or ignore errors raised by the function on individual workers, so that the output from other workers is not lost. Deal with RPC errors in the same way as with exceptions raised by functions.
Scheduler.broadcast
In case of RPC errors, by default, continue raising as before. Add option to return or ignore the raised OSErrors instead, so that the output from other workers is not lost.
Client.dump_cluster_state
If a worker is unresponsive, dump the string repr of the RPC error alongside the _to_dict output of healthy workers
Client.version_info
If a worker is unresponsive, omit it and return the output of healthy workers