New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve robustness of PipInstall
plugin
#7111
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how much effort we should put into testing this given that testing is entirely artificial and based on mocks at the moment.
distributed/diagnostics/plugin.py
Outdated
logger.error( | ||
"Pip install failed with '%s'", stderr.decode().strip() | ||
) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that in addition to logging the error, we may want to raise a RuntimeError
here instead of returning. This way, we can propagate the error to the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. This would result in the worker failing to start which, I believe, is a good default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, this will not result in the worker failing to start due to
catch_errors
defaulting to True
in Worker.plugin_add
but it will raise an error on the client-side:
distributed/distributed/worker.py
Lines 1811 to 1816 in aaab17c
async def plugin_add( | |
self, | |
plugin: WorkerPlugin | bytes, | |
name: str | None = None, | |
catch_errors: bool = True, | |
) -> dict[str, Any]: |
--------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Input In [5], in <cell line: 1>()
----> 1 client.register_worker_plugin(plugin)
File ~/projects/dask/distributed/distributed/client.py:4632, in Client.register_worker_plugin(self, plugin, name, nanny)
4628 name = _get_plugin_name(plugin)
4630 assert name
-> 4632 return self.sync(
4633 self._register_worker_plugin, plugin=plugin, name=name, nanny=nanny
4634 )
File ~/projects/dask/distributed/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
337 return future
338 else:
--> 339 return sync(
340 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
341 )
File ~/projects/dask/distributed/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
404 if error:
405 typ, exc, tb = error
--> 406 raise exc.with_traceback(tb)
407 else:
408 return result
File ~/projects/dask/distributed/distributed/utils.py:379, in sync.<locals>.f()
377 future = asyncio.wait_for(future, callback_timeout)
378 future = asyncio.ensure_future(future)
--> 379 result = yield future
380 except Exception:
381 error = sys.exc_info()
File /opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed-py3.9/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File ~/projects/dask/distributed/distributed/client.py:4558, in Client._register_worker_plugin(self, plugin, name, nanny)
4554 if response["status"] == "error":
4555 _, exc, tb = clean_exception(
4556 response["exception"], response["traceback"]
4557 )
-> 4558 raise exc.with_traceback(tb)
4559 return responses
File ~/projects/dask/distributed/distributed/diagnostics/plugin.py:304, in setup()
302 msg = f"Pip install failed with '{stderr.decode().strip()}'"
303 logger.error(msg)
--> 304 raise RuntimeError(msg)
305 else:
306 logger.info(
307 "The following packages have already been installed: %s",
308 self.packages,
309 )
RuntimeError: Pip install failed with 'ERROR: Could not find a version that satisfies the requirement pyarrov (from versions: none)
ERROR: No matching distribution found for pyarrov'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catch_errors defaulting to True in Worker.plugin_add but it will raise an error on the client-side:
That's unfortunate but still OK
distributed/diagnostics/plugin.py
Outdated
logger.error( | ||
"Pip install failed with '%s'", stderr.decode().strip() | ||
) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. This would result in the worker failing to start which, I believe, is a good default.
@@ -282,34 +278,74 @@ def __init__(self, packages, pip_options=None, restart=False): | |||
self.packages = packages | |||
self.restart = restart | |||
self.pip_options = pip_options or [] | |||
self.id = f"pip-install-{uuid.uuid4()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the worker.id
instead. I think that there is a 1to1 relationship between PipInstallPlugin and Worker instances but either way, the uniqueness property should link to the worker, shouldn't ti?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ID is unique per PipInstall
instance. It's mainly there to avoid conflicts between multiple PipInstall
plugins all trying to install a bunch of packages on the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See PipInstall._compose_{installed|restarted}_key()
for the namespacing. We achieve the uniqueness per worker of PipInstall._compose_restarted_key()
using worker.nanny
, which should be the correct key, but we may want to add the pid
to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worker.nanny is not perfect but I believe good enough. I don't think we have the PID of the nanny and I don't think this is necessary. Let's not overdo it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL: We teardown plugins that are overwritten which allows us to nicely remove their metadata. We can register also plugins to custom names ignoring their name attribute. However, we do not tell those plugins the name they are registered by. For PipInstall
plugins, plugin.name
will remain pip
. We also do not pass that information on setup or teardown. Otherwise, we would be able to drop the ID and just use their name.
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files + 1 15 suites +1 6h 33m 35s ⏱️ + 56m 40s For more details on these failures, see this check. Results for commit d34c5c9. ± Comparison against base commit 6ac679e. ♻️ This comment has been updated with latest results. |
distributed/diagnostics/plugin.py
Outdated
if not await self._is_installed(worker): | ||
logger.info("Pip installing the following packages: %s", self.packages) | ||
await self._set_installed(worker) | ||
proc = subprocess.Popen( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the PipInstall
plugin from hogging the GIL if the installation of packages takes some time, we could switch to using asynchronous subprocesses here. One thing I'm a bit concerned about in that scenario is that we have no way of blocking a restart of the worker while the subprocess is running due to some other reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's take one step at a time. This is only relevant on startup and during plugin initialization, the worker is not even connected to the scheduler, yet. I don't think anything will happen if we block here.
(Also, does Popen really block the GIL or just the event loop? Same effect really but I'm still curious)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I was not careful with the wording here. subprocess.communicate()
hogs the event loop. After a small expedition down the GIL rabbithole, it looks like subprocess.communicate()
does indeed release the GIL:
https://github.com/python/cpython/blob/e39ae6bef2c357a88e232dcab2e4b4c0f367544b/Modules/posixmodule.c#L8489-L8491
The visible effect remains the same, the worker might become unresponsive for a while, which will likely be exacerbated in #7103 since conda install
doesn't feel like the fastest thing in the world.
distributed/tests/test_worker.py
Outdated
@gen_cluster(client=True, nthreads=[("", 1), ("", 1)]) | ||
async def test_pip_install_multiple_workers(c, s, a, b): | ||
with captured_logger( | ||
"distributed.diagnostics.plugin", level=logging.INFO | ||
) as logger: | ||
mocked = mock.Mock() | ||
mocked.configure_mock( | ||
**{"communicate.return_value": (b"", b""), "wait.return_value": 0} | ||
) | ||
with mock.patch( | ||
"distributed.diagnostics.plugin.subprocess.Popen", return_value=mocked | ||
) as Popen: | ||
await c.register_worker_plugin( | ||
PipInstall(packages=["requests"], pip_options=["--upgrade"]) | ||
) | ||
|
||
args = Popen.call_args[0][0] | ||
assert "python" in args[0] | ||
assert args[1:] == ["-m", "pip", "install", "--upgrade", "requests"] | ||
assert Popen.call_count == 1 | ||
logs = logger.getvalue() | ||
assert "Pip installing" in logs | ||
assert "already been installed" in logs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should ensure that the worker is restarted, shouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restarting only happens if
- we use a Nanny
- we set
restart=True
This test does not restart since it is just using your average Worker
which can't restart.
I've been trying to build a test that uses a Nanny
and restarts, but I'm running into problems with mocking out the calls to pip
and the subprocessing happening inside a Nanny
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been trying to build a test that uses a
Nanny
and restarts, but I'm running into problems with mocking out the calls topip
and the subprocessing happening inside aNanny
.
I've been able to resolve this and added another test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised to see the codecov complaining about all the missed lines. Just mocking the popen call should be sufficient to test the entire plugin, shouldn't it?
3817177
to
74f71c5
Compare
Note that without throwing another SchedulerPlugin into the mix to manage metadata, we do not remove metadata upon removal of |
That's perfectly fine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hendrikmakait
Closes #7102
Closes #7037
Supersedes #7086
pre-commit run --all-files