Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker <-> Worker Communication Failures bring Cluster in inconsistent State #5951

Closed
nils-braun opened this issue Mar 16, 2022 · 9 comments · Fixed by #6112
Closed

Worker <-> Worker Communication Failures bring Cluster in inconsistent State #5951

nils-braun opened this issue Mar 16, 2022 · 9 comments · Fixed by #6112
Labels
bug Something is broken deadlock The cluster appears to not make any progress stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@nils-braun
Copy link
Contributor

Due to network issues, overloaded workers or just bad luck, the worker-to-worker communication needed for getting task dependencies from other workers might fail (distributed.worker.get_data_from_worker). This is (in principle) successfully caught by the worker and scheduler and reacted on. During this however a race-condition can be triggered, which brings the cluster in an inconsistent and stuck state.

Minimal Complete Verifiable Example:

The following code will introduce a random failure in distributed.worker.get_data_from_worker with higher probability than a real-world use-case might have, just to demonstrate:

from unittest.mock import patch
from random import randint

import dask.bag as db
from distributed import Client

# Calling this function on the worker will temporarily replace
# the `get_data_from_worker` function with a version, which
# fails in 1 out of 4 cases. In the "real world" the 
# probability of a failure due to communication problems is
# of course a lot smaller, but it might happen.
def replace_get_data_from_worker_function(dask_worker):
    from distributed.worker import get_data_from_worker
    
    dask_worker.my_counter = 0

    async def get_data_from_worker_function(*args, **kwargs):
        # "Fail" in 1 out of 4 cases
        if randint(0, 3) == 1:
            dask_worker.my_counter += 1
            raise OSError

        return await get_data_from_worker(*args, **kwargs)
            
    p = patch("distributed.worker.get_data_from_worker", 
              get_data_from_worker_function)
    p.start()

if __name__ == "__main__":
    client = Client()
    client.run(replace_get_data_from_worker_function)

    PARTITIONS = 20

    # Just some arbitrary computation, which takes
    # reasonably long and (most importantly) creates tasks
    # with many dependencies, so that we need a lot of communication
    data = db.from_sequence(range(PARTITIONS), npartitions=PARTITIONS)\
             .map_partitions(lambda x: range(100_000))\
             .groupby(lambda x: x % PARTITIONS)\
             .map_partitions(lambda x: len(list(x)))
    data.compute()

Running this code, will sometimes not finish the computation, but one (or multiple) workers are stuck while waiting in fetching the dependencies for a specific task.
Note: it is a race-condition, so you might need to run the code multiple times until it is stuck...

Here are some observations I have made using the worker's and scheduler's properties and the transition log.
Let's say worker A is stuck while processing task T1, which depends on task T2 owned by worker B.

  • the scheduler task state shows that T2 is present on B
  • B has the task in its memory (dask_worker.data)
  • A however thinks, that no-one owns the task dask_worker.tasks[T1].who_owns == {}, so A does not even start asking B for the data.

From the transition log, this is what I think that happens (but I am happy if someone with more knowledge on the way the worker works could confirm my observations):

  • some time before that, worker A wants to process another task, T3, which needs a dependency T4 also from B
  • it calls gather_dep, which calls get_data_from_worker. This fails (either due to a real network issue or due to our patched function above).
  • in the meantime/around this time, the scheduler also tells A to do T1, which depends on T2 (owned by B).
  • during the error handling for an OSError of gather_dep, the local state of the worker A is changed in a way, so that all tasks owned by B are marked as not owned by B anymore. In our case, that is T2 and T4.
  • However (and I think this is where the bug is), only the initially requested dependencies are notified as missing to the scheduler, in this case T4. (see here)

The final state is, that the worker A thinks no one would own the data for T2, while the scheduler will not re-distributed the task (as it was never marked as missing).

One last comment: using the setting DASK_DISTRIBUTED__COMM__RETRY__COUNT it is possible to make the failures
of the get_data_from_worker function less likely. But unfortunately, this will just decrease the probability, not fix the problem.

Environment:

  • Dask version: 2022.2.1
  • Python version: 3.8
  • Operating System: macOS
  • Install method (conda, pip, source): pip
@fjetter fjetter added bug Something is broken stability Issue or feature related to cluster stability (e.g. deadlock) labels Mar 22, 2022
@nils-braun
Copy link
Contributor Author

Just wanted to confirm that I am also seeing this with version 2022.3.0 (even though the links to the code are slightly off in the description above, but the overall function gather_dep still looks the same).

@fjetter (as I was seeing you did some changes to this logic :-)): do you think it would make sense to either not mark all of the dependencies of a worker as removed in the except OSError case of gather_dep (here) (but instead only the ones we were asking for) or do you think it would be better to make the logic in the finally call more complex, so that tasks which were marked as removed during the except OSError are finally notified as missing to the scheduler?

I am wondering if there is something special in our setup (large scale, many memory-hungry tasks) which triggers this bug so often.

@fjetter
Copy link
Member

fjetter commented Mar 30, 2022

Indeed, gather_dep was a very frequent source of deadlocks due to it's exception handling.

Thank you for the reproducer. I can confirm this is happening on my end as well.

(but instead only the ones we were asking for) or do you think it would be better to make the logic in the finally call more complex, so that tasks which were marked as removed during the except OSError are finally notified as missing to the scheduler?

Maybe. The system should be robust enough to recover from us removing too many tasks. I suspect something else is not working as intended but this may be a hotfix.
I'm a bit hesitant of introducing more complexity but if it is necessary...

We're currently investing in some larger refactoring which should allow us to test these edge cases more thoroughly, see also #5736

@gjoseph92
Copy link
Collaborator

Thanks for the reproducer and explanation @nils-braun. I think I've somewhat minimized this to

import dask
import distributed
from distributed.deploy.local import LocalCluster


class BreakingWorker(distributed.Worker):
    broke_once = False

    def get_data(self, comm, **kwargs):
        if not self.broke_once:
            self.broke_once = True
            raise OSError("fake error")
        return super().get_data(comm, **kwargs)


if __name__ == "__main__":
    print(distributed.__version__)

    df = dask.datasets.timeseries(
        start="2000-01-01",
        end="2000-01-15",
    )
    s = df.shuffle("id", shuffle="tasks")

    cluster = LocalCluster(
        n_workers=2, threads_per_worker=1, processes=True, worker_class=BreakingWorker
    )
    client = distributed.Client(cluster)

    f = s.persist()
    distributed.wait(f, timeout=5)

A git bisect points to #5653 as the first failing commit (cc @fjetter). I believe this is also implicated in #5960 (comment).

Interestingly, this also makes the reproducer succeed (not that I think it's a real fix though):

diff --git a/distributed/worker.py b/distributed/worker.py
index 13e5adef..f1e1563d 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2960,14 +2960,7 @@ class Worker(ServerNode):
 
             except OSError:
                 logger.exception("Worker stream died during communication: %s", worker)
-                has_what = self.has_what.pop(worker)
                 self.pending_data_per_worker.pop(worker)
-                self.log.append(
-                    ("receive-dep-failed", worker, has_what, stimulus_id, time())
-                )
-                for d in has_what:
-                    ts = self.tasks[d]
-                    ts.who_has.remove(worker)
 
             except Exception as e:
                 logger.exception(e)

@gjoseph92
Copy link
Collaborator

A few thoughts:

There's a fundamental question here: does an OSError mean we can safely assume the other worker has died and will never come back (vs temporarily disconnected)? That's how the current error handling logic is acting, but I don't know if that's a valid assumption. I think the current logic is trying to add an optimization that "an OSError means the worker is probably dead, so let's preemptively stop trying to talk to it". This is a nice idea, but I don't actually know if OSError is even the signal we should be acting on to make that choice. (#5678 may be relevant?)

But regardless that, as @nils-braun has pointed out, we're not correctly dealing with the TaskStates we mutate.

If we end up making a task's who_has empty, then by definition the task is missing, so an obvious thing to do would be just transition it to missing in that case:

diff --git a/distributed/worker.py b/distributed/worker.py
index 7a062876..5e72f007 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2991,6 +2991,9 @@ class Worker(ServerNode):
                 for d in has_what:
                     ts = self.tasks[d]
                     ts.who_has.remove(worker)
+                    if not ts.who_has:
+                        # TODO send `missing-data` to scheduler?
+                        recommendations[ts] = "missing"
 
             except Exception as e:
                 logger.exception(e)

However, you'll then get a Impossible transition from fetch to missing error.

Should we just add a transition function for that? Or is there a good reason why that isn't currently a valid transition?


Another thought: in ensure_communicating, we check for tasks that are essentially missing (who_has is empty). But we seem to just ignore them, and assume that something else is dealing with them (the find_missing mechanism).

I think these assertions are the assumptions ensure_communicating is making:

diff --git a/distributed/worker.py b/distributed/worker.py
index 7a062876..f8401a80 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2699,6 +2699,9 @@ class Worker(ServerNode):
             )
 
         for el in skipped_worker_in_flight:
+            # Assuming something else has already dealt with making these `missing`
+            assert el.state == "missing", el.state
+            assert el in self._missing_dep_flight
             self.data_needed.push(el)

Of course, in the reproducer, they fail. But arguably, ensure_communicating should be validating this (even if just in tests). And maybe it could be ensure_communicating's job to explicitly deal with tasks like this?


I think it comes down to:

  • If fetch->missing is a valid transition, then we should write that and transition tasks through it
  • If fetch->missing is not a valid transition, then we shouldn't be effectively putting tasks in a missing state but not transitioning them

@gjoseph92
Copy link
Collaborator

For the record, @fjetter and I discussed offline, and we're going to add back a fetch->missing transition. We're also both skeptical about using OSError in this way, but that's a broader change that seems to throw off other things.

@mrocklin
Copy link
Member

Thank you for the detailed summary @gjoseph92

mrocklin added a commit to mrocklin/distributed that referenced this issue Apr 12, 2022
Fixes dask#5951

In dask#5653 we removed the fetch
-> missing transition.  This caused deadlocks.  Now we add it back in.
@mrocklin
Copy link
Member

For the record, @fjetter and I discussed offline, and we're going to add back a fetch->missing transition.

This is done here: #6112

We're also both skeptical about using OSError in this way, but that's a broader change that seems to throw off other things.

My apologies for being absent with historical context here. We went through this decision several years ago. It's not a correct assumption, but it is an assumption which prioritizes stability over performance, and so is a win. We had odd issues when we tried to get smart around this, and found that it was just better to let clusters think that workers were gone, and celebrate when they magically returned. (I may not be fully understanding the situation though)

@gjoseph92
Copy link
Collaborator

it is an assumption which prioritizes stability over performance

Maybe that was the case then, but these days I think it actually prioritizes performance over stability. It's short-circuiting other tasks from trying and failing to fetch from that worker in the future. It assumes the current error is predictive of what will happen if we try again, so we preemptively choose not to try again. This adds complexity compared to just letting those fetches fail sometime in the future, and just dealing with the failure per-key when it happens.

mrocklin added a commit that referenced this issue Apr 13, 2022
Fixes #5951

In #5653 we removed the fetch
-> missing transition.  This caused deadlocks.  Now we add it back in.
@nils-braun
Copy link
Contributor Author

Thanks @gjoseph92, @mrocklin and @fjetter!
I am happy to confirm that this has fixed the reproducer above, and I am quite confident that it will also fix the problematic large-scale use cases.

@gjoseph92 gjoseph92 added the deadlock The cluster appears to not make any progress label Apr 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants