Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow closing workers to be awaited again #5910

Merged
merged 3 commits into from
May 5, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Mar 7, 2022

This fixes some of our deadlock situations causing tests to time out while closing a worker.

This is not a ready fix but rather a preliminary one to communicate the issue.

What's happening:

  • A worker is closing
  • Worker.status is set to closing. This will ensure Worker.close is idempotent and let every other caller to await finished, see
    if self.status in (Status.closed, Status.closing):
    await self.finished()
  • While it is closing, an incoming RPC triggers an await self
  • The await self will attempt to start the worker again since closing is not part of Status.ANY_RUNNING
  • The restart will set the status to running such that a second close attempt will actually try to run a close concurrently. the second call is actually what is deadlocking then while trying to shutdown the threadpool executor

Open questions:

  • Is it justified that we have multiple close attempts?
  • Why did this ever work?
  • ...

cc @crusaderky @graingert

closes #5932

@github-actions
Copy link
Contributor

github-actions bot commented Mar 7, 2022

Unit Test Results

       16 files  +       4         16 suites  +4   7h 14m 9s ⏱️ + 1h 35m 40s
  2 744 tests +       4    2 662 ✔️ +     15       80 💤  -   11  2 +1 
21 853 runs  +5 444  20 805 ✔️ +5 183  1 046 💤 +263  2  - 1 

For more details on these failures, see this check.

Results for commit 0bbb27f. ± Comparison against base commit 70e1fca.

♻️ This comment has been updated with latest results.

distributed/core.py Outdated Show resolved Hide resolved
distributed/core.py Outdated Show resolved Hide resolved
@@ -247,7 +236,9 @@ def set_thread_ident():
self.thread_id = threading.get_ident()

self.io_loop.add_callback(set_thread_ident)
self._startup_lock = asyncio.Lock()
self._started = asyncio.Event()
self.__status = Status.init
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing the status attribute for workers, nannies, schedulers and the core server is not easily possible without some major changes. There are many different semantics encoded and I don't believe the server base class should react to any status change of the child. I don't mind renaming this attribute but I believe there should be different stati

@fjetter fjetter changed the title WIP Do not allow closing workers to be awaited again Do not allow closing workers to be awaited again Mar 10, 2022
Comment on lines -291 to -307
raise TimeoutError(
"{} failed to start in {} seconds".format(
type(self).__name__, timeout
)
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising this as a timeout error is just wrong

@fjetter
Copy link
Member Author

fjetter commented Mar 10, 2022

Working on this, I'm wondering if we should switch from an inheritance model to a composition model

@fjetter fjetter self-assigned this Mar 10, 2022
@@ -247,7 +236,9 @@ def set_thread_ident():
self.thread_id = threading.get_ident()

self.io_loop.add_callback(set_thread_ident)
self._startup_lock = asyncio.Lock()
self._started = asyncio.Event()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This event will be bound to an event loop too early on Python 3.8 and 3.9 - you'll need to use LateBoundEvent or assign the event in async def _start(

distributed/core.py Outdated Show resolved Hide resolved
@fjetter fjetter force-pushed the worker_close_deadlock branch 2 times, most recently from 01d73c7 to ed38a9a Compare March 22, 2022 14:07
Comment on lines +209 to +210
exc = OSError("Unable to contact Actor's worker")
return _Error(exc)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason test_failed_worker in test_actor.py hit this edge case now instead of the ValueError above

@fjetter
Copy link
Member Author

fjetter commented Mar 24, 2022

I chose a different path

  1. No longer await the object itself. We don't want to restart, we merely want to check that it is already up. I added an event to track this
  2. I still kept the refactorings about start I started with. To make things less entangled, I introduced start_unsafe which does not need to check for any concurrent startups, etc. and there is Server.start which wraps this and ensures that everything is handled properly.

@fjetter fjetter force-pushed the worker_close_deadlock branch 2 times, most recently from e92456f to 74a69a6 Compare March 25, 2022 11:50
@mrocklin
Copy link
Member

Checking in, what is the status here?

@mrocklin
Copy link
Member

I hope you don't mind, but I merged in main and pushed. I'm hoping to trigger CI.

@fjetter
Copy link
Member Author

fjetter commented Mar 25, 2022

Checking in, what is the status here?

waiting for CI. GH actions is in maintenance right now https://www.githubstatus.com/ I suppose this is why there are no builds scheduled

@fjetter
Copy link
Member Author

fjetter commented Mar 25, 2022

never mind, it's moving

@fjetter
Copy link
Member Author

fjetter commented Mar 25, 2022

test_nanny_death_timeout is failing all over the place due to a bug in ConnectionPool. The pool is improperly raising an asyncio.CancelledError as an CommClosed. I already started on a patch for this

@fjetter fjetter force-pushed the worker_close_deadlock branch 3 times, most recently from e4ef0fb to 4eca4bf Compare April 10, 2022 14:58
@fjetter fjetter force-pushed the worker_close_deadlock branch 2 times, most recently from 9d5f20d to 648d090 Compare April 28, 2022 13:45
mrocklin pushed a commit that referenced this pull request Apr 29, 2022
…6091)

This reinstates #5883
which was reverted in #5961 / #5932

I could confirm the flakyness of `test_missing_data_errant_worker` after this change and am reasonably certain this is caused by #5910 which causes a closing worker to be restarted such that, even after `Worker.close` is done, the worker still appears to be partially up. 

The only reason I can see why this change promotes this behaviour is that if we no longer block the event loop while the threadpool is closing, this opens a much larger window for incoming requests to come in and being processed while close is running.

Closes #6239
@@ -290,7 +290,6 @@ async def test_failed_worker(c, s, a, b):

assert "actor" in str(info.value).lower()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about moving this all into the pytest raises? with pytest.raises(ValueError, match=r"Worker holding Actor was lost\. Status: error'):

Comment on lines +2121 to +2122
if not isinstance(exc.__cause__, expected_cause):
raise exc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not isinstance(exc.__cause__, expected_cause):
raise exc
assert isinstance(exc.__cause__, expected_cause)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a slightly different thing, isn't it? To be honest, I'm not entirely sure which behavior is best. I suggest to keep it as is for now

@fjetter fjetter merged commit 2286896 into dask:main May 5, 2022
@jacobtomlinson
Copy link
Member

jacobtomlinson commented May 6, 2022

@fjetter since this was merged we are seeing CI failures in dask-kubernetes when calling client.wait_for_workers().

https://github.com/dask/dask-kubernetes/runs/6320080731?check_suite_focus=true

______________________________ test_simplecluster ______________________________

k8s_cluster = <pytest_kind.cluster.KindCluster object at 0x7fb44ad9c070>
kopf_runner = <kopf._kits.runner.KopfRunner object at 0x7fb44afdf280>
gen_cluster = <function gen_cluster.<locals>.cm at 0x7fb44afb11f0>

    @pytest.mark.timeout(180)
    @pytest.mark.asyncio
    async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster):
        with kopf_runner as runner:
            async with gen_cluster() as cluster_name:
                scheduler_pod_name = "simple-cluster-scheduler"
                worker_pod_name = "simple-cluster-default-worker-group-worker"
                while scheduler_pod_name not in k8s_cluster.kubectl("get", "pods"):
                    await asyncio.sleep(0.1)
                while cluster_name not in k8s_cluster.kubectl("get", "svc"):
                    await asyncio.sleep(0.1)
                while worker_pod_name not in k8s_cluster.kubectl("get", "pods"):
                    await asyncio.sleep(0.1)
    
                with k8s_cluster.port_forward(f"service/{cluster_name}", 8786) as port:
                    async with Client(
                        f"tcp://localhost:{port}", asynchronous=True
                    ) as client:
>                       await client.wait_for_workers(2)

dask_kubernetes/operator/tests/test_operator.py:112: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/hostedtoolcache/Python/3.9.12/x64/lib/python3.9/site-packages/distributed/client.py:1328: in _wait_for_workers
    while n_workers and running_workers(info) < n_workers:
/opt/hostedtoolcache/Python/3.9.12/x64/lib/python3.9/site-packages/distributed/client.py:1321: in running_workers
    [
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <dict_valueiterator object at 0x7fb44886e220>

        [
            ws
            for ws in info["workers"].values()
>           if ws["status"] == Status.running.name
        ]
    )
E   KeyError: 'status'

I'm investigating now, perhaps the version of distributed between the CI and the kubernetes cluster is getting out of sync or something. But wanted to raise this in case you spotted anything obvious related to this change.

@jacobtomlinson
Copy link
Member

Looks like it was our fault, this error happens when workers are running 2022.5.0 but the client is using @main. Fixed our CI in dask/dask-kubernetes/pull/461 so that the workers are on @main too. Sorry for the noise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky test_missing_data_errant_worker
5 participants