Skip to content

NannyPlugins may not be registered on all workers #7035

@gjoseph92

Description

@gjoseph92

If workers are connecting to the scheduler at the same time that a NannyPlugin is registered, it's possible that some workers will never receive the plugin.

Specifically, if this sequence happens:

  1. Nanny starts and calls the scheduler add_nanny RPC to retrieve plugins. There are none.
  2. A nanny plugin is registered. The new Nanny isn't in self.workers, because its worker process hasn't started up and connected yet, so no message is broadcast to it.
  3. The worker process starts up and connects to the scheduler.

Then that worker will never be informed of the plugin.

This is quite problematic, since nanny plugins are usually used to set up important state on workers before they should process tasks (environment variables, installing packages, etc.). The fact that worker state could be non-deterministically inconsistent (some workers randomly don't have the right packages/variables/etc.) will be confusing to debug.

class SlowWorkerStart(WorkerPlugin):
    def __init__(self) -> None:
        self.started = mp.Event()
        self.continue_start = mp.Event()

    async def setup(self, worker):
        self.started.set()
        while not self.continue_start.is_set():
            await asyncio.sleep(0.1)


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_nanny_plugin_race(c, s, a):
    from dask.distributed import Environ

    worker_plugin = SlowWorkerStart()
    # Note `plugins=` is passed to worker
    n = Nanny(s.address, plugins=[worker_plugin], nthreads=1)
    start_task = None
    try:
        # Can't use `async with Nanny`, because `Nanny.start_unsafe` blocks on the worker
        # having connected to the scheduler.
        start_task = asyncio.create_task(n.start())

        # Wait until the worker process has started (but not connected to the scheduler)
        await async_wait_for(lambda: worker_plugin.started.is_set(), 5)
        assert len(s.workers) == 1

        # Now, register a plugin after the Nanny has asked the scheduler about plugins,
        # but before the worker process has connected.
        assert not a.plugins
        assert not n.plugins
        await c.register_worker_plugin(Environ({"ABC": 123}))

        # Wait for the worker to connect
        worker_plugin.continue_start.set()
        try:
            await start_task
        finally:
            start_task = None
        await c.wait_for_workers(2)

        assert a.plugins
        assert n.plugins  # this will fail, it didn't get the plugin
    finally:
        if start_task:
            start_task.cancel()
            try:
                await start_task
            finally:
                start_task = None
        await n.close()

To solve this, I think we'd have to maintain state on the scheduler tracking all the nannies, separate from SchedulerState.workers. Since Nannies and workers have different lifetimes, it's not enough to assume that .workers is always equivalent to the nannies. Doing this would probably improve cleanliness overall anyway.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions