Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resilient example #75

Merged
merged 5 commits into from
Jun 13, 2019
Merged

Conversation

willirath
Copy link
Contributor

@willirath willirath commented Jun 5, 2019

Binder

See #74

ToDo's (so far):

@mrocklin
Copy link
Member

mrocklin commented Jun 5, 2019

Some small notes:

  1. Lets merge cells together when we can. For example the first three that import, make a cluster, make a client. The user doesn't gain anything by pressing Shift-Enter three times rather than once.

  2. The memory limit is small. If they just import pandas then we'll probably be close to the limit, which will raise lots of annoying error messages.

  3. cluster.scheduler.allowed_failures = int(1e32) This is internal API. Maybe use dask.config.set({'distributed.scheduler.allowed-failures': 1e32}) before creating the cluster instead?

  4. The function _get_worker_pids seems to be a one-liner that is called once. Lets just call the code directly instead of hiding it in a function.

    pids = [w.pid for w in cluster.scheduler.workers.values()]
    pids
  5. Same with _get_preemptible_worker_pids. Also, lets change the filter into a list comprehension, which seems to be more broadly understood

  6. It's taking me a while to understand maybe_kill_n_perc_of_workers_and_wait, which means that a new reader probably doesn't have much chance. There are too many function calls deep for me to trace through what is happening directly.

    I wonder if we might be able to do something with fewer steps of indirection like the following.

    def kill_a_worker():
        worker = random.choice(cluster.scheduler.workers.values())
        os.kill(worker.pid)
    
    summed = client.compute(summed)
    
    while not summed.finished():
        kill_a_worker()
        sleep(...)

    This is subjective (code is always way simpler to the author than the reader) but I think that I could put this in front of someone and they would have a better chance of tracking things through in a 10-20s attention span.

- Merge cells where it makes sense
- Increase memory limit
- use dask's config API to increase reslience
- simplify worker-killing logic
@willirath
Copy link
Contributor Author

Thanks @mrocklin for these remarks. I have addressed them as far as possible.

There's a few things that I couldn't, however, simplify as much as I'd have liked to:

  • While somewhat more streamlined now, the logic of finding a worker to kill and leaving some of them running all the time is now this:
_all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]
non_preemptible_workers = _all_current_workers[:2]
def kill_a_worker():
    preemptible_workers = [
        w.pid for w in cluster.scheduler.workers.values()
        if w.pid not in non_preemptible_workers]
    if preemptible_workers:  # need this  because random.choice([]) will raise
        os.kill(random.choice(preemptible_workers), 15)
  • The memory limits are at 400e6 times 4 workers now. But I'm not sure if binder will be fine if we really use all of this at the same time. (They're limiting to 1 or 2 GB depending on load, IIRC.)

@willirath
Copy link
Contributor Author

Add example dealing with Exceptions in user code? Relevant docs: http://distributed.dask.org/en/latest/resilience.html#user-code-failures

I think this deserves a separate example (could add more than one).

resilience.ipynb Outdated Show resolved Hide resolved
resilience.ipynb Outdated Show resolved Hide resolved
resilience.ipynb Outdated Show resolved Hide resolved
TomAugspurger added a commit to TomAugspurger/distributed that referenced this pull request Jun 7, 2019
This allows for setting the config after importing distributed

xref dask/dask-examples#75 (comment)
Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally looks good to me. I would remove the comment about needing to set the config before importing distributed, since that may change.

@willirath
Copy link
Contributor Author

I've removed the comment.. Thanks for having a look, @TomAugspurger.

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really interesting from dask-jobqueue perspective!

I've made some small comments.

More generally, I executed the whole notebook before undestanding the proposed solution to the problem was implemented in the first executable cell!! Would this be possible to demonstrate the problem, and then fix it to be more educative?

What about an example with as_completed? But this may not be the point here, and may be more adapted to dask-jobqueue documentation. Speeking of which, the solution proposed in this notebook should be in dask-jobqueue docs, or even distributed docs.

resilience.ipynb Show resolved Hide resolved
resilience.ipynb Outdated
"source": [
"## Increase resilience\n",
"\n",
"Whenever a worker shuts down, the scheduler will increment the suspicousness counter of _all_ tasks that were assigned (not necessarily computing) to the worker in question. Whenever the suspiciousness of a task exceeds a certain threshold (3 by default), the task will be considered broken. We want to compute many tasks on only a few workers with workers shutting down randomly. So we expect the suspiciousness of all tasks to grow rapidly. Let's effectively disable the threshold by setting it to a large number:"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! Isn't there another way? Add a per task counter in dask somehow?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple tasks executing at once when worker is killed (say from a segfault) it may be hard to know exactly which task caused the segfault.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I agree, but shouldn't it count as one global failure? And we should be allowed 3 times this failure. As far as I understand, currently if there are more than 3 tasks failing due to this one task error, the computation is halted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I understand where the misunderstanding is: This is a per task counter. But all tasks belonging to a given worker when this worker dies will be marked as suspicious. With a large number of tasks per worker, this counter still grows fast. There's no way for the scheduler to know which task was active at the time of failure. So all are marked.

(And it's also not always clear that the task currently computing is the problem. Think of a task that needs more memory or disk than the cluster can cope with. This task might not directly lead to failure but just let reasonably sized tasks trigger OOM kills further down the road.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you confirm there is one counter for each task? And these counters take a +1 failure for all the tasks that were on a given Worker when it dies? That means that by default, we should be robust to 3 workers down in one computation, is this the case?

mrocklin pushed a commit to dask/distributed that referenced this pull request Jun 8, 2019
This allows for setting the config after importing distributed

xref dask/dask-examples#75 (comment)
@willirath
Copy link
Contributor Author

@guillaumeeb part of the reason for creating this example was to make others explore Dask's resilience. There's a lot of potential!

I agree that this should be expanded on in a jobqueue or kubernetes context. But I'd argue for keeping this notebook here short an simple.

@guillaumeeb
Copy link
Member

And thanks for that @willirath!

And yes, just keep this notebook simple, and elaborate more into dask-jobqueue context, That's one good example for a dedicated notebook gallery.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small suggested changes.

Also, when I run this on binder I find that we make progress only very slightly faster than we kill workers, which is a little frustrating. I wonder if we might reduce the total time of execution and also increase the duration between killing workers. I think that we still get the point across that "yes, workers can die" without having them die ten times during the computation. Thoughts?

resilience.ipynb Outdated Show resolved Hide resolved
resilience.ipynb Outdated Show resolved Hide resolved
@willirath
Copy link
Contributor Author

I've added links to Dask Kubernetes and Dask Jobqueue. And after making the example computation smaller and faster and letting fewer workers die, I've reduced the allowed_failures threshold to a still comfortable 100.

@TomAugspurger
Copy link
Member

Restarted the failed build. LMK if you see that it fails / succeeds before I do.

@TomAugspurger
Copy link
Member

All green. Thanks @willirath!

@TomAugspurger TomAugspurger merged commit f6485d2 into dask:master Jun 13, 2019
@willirath
Copy link
Contributor Author

Let's revisit this as soon as distributed v2 is out. With dask/distributed#2761 and later modification of allowed_failures this will be easier to understand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants