Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoRestrictor scheduler plugin #4864

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

AutoRestrictor scheduler plugin #4864

wants to merge 7 commits into from

Conversation

JSKenyon
Copy link

@JSKenyon JSKenyon commented Jun 1, 2021

  • Closes #xxxx
  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

Hi all! I have taken the time to slightly refine the scheduler plugin which I described in my contribution to the Dask Summit (the final talk in the Radio Astronomy Applications with Dask workshop). Whilst it may not yet be ready to merge, I believe that it is now in a state for people to start playing with it and offering feedback.

Motivation

I am developing QuartiCal, an application for calibrating radio interferometer data. Whilst testing this on an HPC system, I would regularly see dashboard output resembling the following:

Screenshot from 2021-06-01 10-32-29

This includes a large number of transfers (red) as well as a large amount of time spent doing nothing (white-space). The transfers are particularly bothersome as the graph in question consists of many parallel, independent subgraphs i.e. each can be processed completely independently of its neighbours. I had previously assumed that the scheduler would realise this and avoid the unnecessary transfers. With the assistance of @sjperkins, I have attempted to design a scheduler plugin to coerce this behaviour by automatically restricting tasks to specific workers. The goal was to design something:

  • non-invasive
  • generic
  • low-overhead

How it works

  1. Identify all root nodes (no dependencies).
  2. Identify all terminal nodes (no dependents).
  3. If the number of terminal nodes (outputs) is smaller than the number of available workers, replace terminal nodes with their dependencies (partition the graph horizontally) until there are enough nodes in the partitioning layer to ensure every worker gets work.
  4. Associate each node in the partitioning layer with its roots (the root nodes on which it depends, directly or indirectly).
  5. Associate each unique set of roots with an abstract group number (essentially an index), taking care to ensure that tasks which share roots are assigned to the same group.
  6. Assign each group to a worker by setting the worker restrictions for all tasks in the group (tasks may be assigned to multiple workers if they are e.g. a shared dependency). Groups are iteratively assigned to the least subscribed worker.

How to use it

This example uses a LocalCluster, but it should work regardless of cluster setup.

from distributed import Client, LocalCluster
from distributed.plugins.autorestrictor import install_plugin

cluster = LocalCluster(...)  # Cluster setup.
client = Client(cluster)
client.run_on_scheduler(install_plugin)

Example

The following example demonstrates how the plugin alters the scheduler's default behaviour. I have chosen a sum over a single axis of a chunked dask array as I feel it is a representative example of common use-cases (reduction, non-negligible transfer sizes).

import dask.array as da
from distributed import Client, LocalCluster
from distributed.plugins.autorestrictor import install_plugin


def example():
    # Each chunk is 100MB of data. Entire array is 100GB.
    data = da.zeros((12500000, 1000), chunks=(12500000, 1))
    return da.sum(data, axis=1)


if __name__ == "__ main__":

    cluster = LocalCluster(processes=True,
                           n_workers=4,
                           threads_per_worker=1,
                           memory_limit=0)
    client = Client(cluster)
    #client.run_on_scheduler(install_plugin)  # Uncomment to enable.

    da.compute(example)

This is a screenshot of the task stream with the scheduler plugin disabled:

Screenshot from 2021-06-01 10-44-35

This takes just under 90s on my laptop. Note the huge number of transfers (red). Running the example again, this time with the plugin installed we see the following in the dashboard:

Screenshot from 2021-06-01 10-44-43

This took just under 60s on my laptop. Transfers have been all but eliminated; those that remain are those which cannot be avoided due to distributing the problem (at some point during this type of reduction, one has to combine results from different workers.)

It is also interesting to look at memory usage with and without the plugin. The following results were obtained using memory_profiler. Without the plugin:

Screenshot from 2021-06-01 10-18-38
Here we see very inconsistent behaviour between workers and the peak memory usage is around 27GB. Note that every run will differ when using the default behaviour as the scheduling is non-deterministic (to the best of my knowledge).

With the plugin:

Screenshot from 2021-06-01 10-18-27

Here the behaviour is very consistent between workers and we would expect multiple runs to produce very similar results. The peak memory usage is just under 7GB. This demonstrates a fairly dramatic improvement in memory footprint. This is due to the fact that the plugin discourages greediness (which may lead to many large tasks being in memory) in favour of minimizing transfers.

Real-world example

The following is the same problem shown in the motivation section but with the scheduler plugin enabled:

Screenshot from 2021-06-01 10-32-32

Note how the transfers have been eliminated and that workers spend substantially less time idling (performance improved by approximately a factor of two).

Caveats

  • I do not know where this should live inside distributed.
  • Behaviour may be poor for very small graphs.
  • graph_metrics and get_node_depths are expensive (plugin takes around 2s for around 100000 nodes), but I believe I can remove/improve them. I wanted to get other feedback before optimizing.
  • Heterogeneous compute is supported, da.compute(a, b, c), but I have likely not considered every use case.
  • Failure should be graceful (revert to the default behaviour) but this could likely be improved.
  • I have not implemented proper tests, largely because I am unsure how to test something like this. Suggestions welcome.

@fjetter
Copy link
Member

fjetter commented Jun 2, 2021

This looks very interesting. I'll try to take a closer look at the code and think about where this should end up in. From first glance this is a mixture of dask.order and optimize. It also connects to our recent efforts to move to HighLevelGraphs. Haven't studied the code enough, yet, for proper judgement.

In the meantime, people love performance_reports for these kind of things, see https://distributed.dask.org/en/latest/diagnosing-performance.html#performance-reports
these offer more insight into how we save time in transmissions, compute, etc. not mandatory but usually very interesting :)

cc @mrocklin @madsbk @gjoseph92

@JSKenyon
Copy link
Author

JSKenyon commented Jun 2, 2021

I have uploaded some performance reports. The first is without the plugin, the second is with. Note that these are from new runs as I had accidentally overwritten the reports I had generated for the original PR.

https://gistcdn.githack.com/JSKenyon/9e2bb495de861301389099646e6b18e6/raw/60135474e2f127bf6a5d1ed1085db9a751444592/noplugin-report.html

https://gistcdn.githack.com/JSKenyon/d7177495a8eede1f3c31df4fbfdc368f/raw/40745319b0fb8a0878c4bf4bea9f04488e3b2737/plugin-report.html

@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

If I understand the situation correctly, Dask was moving data between nodes when it would have been faster to compute on the node where the data resided. It might be worth trying to figure out why Dask was making that decision.

A common cause of this is that Dask understimates the cost to move a piece of data, this is commonly the case when moving custom Python objects, for which Dask does not have a good way to estimate movement size. Was this the case in your problem situation? Were you moving around custom Python objects that held on to larger numpy arrays for example? If so, then a more general solution to this problem might be for Dask to start looking at actual transmitted data sizes to correct for our more theoretical expectation with dask.sizeof.

@JSKenyon
Copy link
Author

JSKenyon commented Jun 2, 2021

If I understand the situation correctly, Dask was moving data between nodes when it would have been faster to compute on the node where the data resided. It might be worth trying to figure out why Dask was making that decision.

A common cause of this is that Dask understimates the cost to move a piece of data, this is commonly the case when moving custom Python objects, for which Dask does not have a good way to estimate movement size. Was this the case in your problem situation? Were you moving around custom Python objects that held on to larger numpy arrays for example? If so, then a more general solution to this problem might be for Dask to start looking at actual transmitted data sizes to correct for our more theoretical expectation with dask.sizeof.

In the example (specifically the example section, rather than the motivating problem), I only make use of a Dask array with known chunk sizes. Even in this regime, the scheduler chooses to move data around (each chunk in the input is 100MB) when there is no need to.

In the motivating case, the problem is definitely more complicated. However, the mast majority of inputs and outputs (and certainly all the large ones) are also Dask arrays with known chunk sizes. My current theory is that this problem is caused by greedily consuming the task graph i.e. prioritising ordering (do the next task on the next available worker) even when that decision necessitates moving large amounts of data.

This PR is an initial attempt at addressing this problem by automatically restricting tasks to specific workers based on their dependencies. There are certainly cases in which the plugin will fail (a shuffle being the prime example, I am considering how to approach this), but I feel that the profiling I did above clearly demonstrates that the scheduler can do a very poor job on a very simple problem. The memory profiling in particular should be a cause for concern as the demonstrated behaviour could impact resilience.

@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

In the motivating case, the problem is definitely more complicated. However, the mast majority of inputs and outputs (and certainly all the large ones) are also Dask arrays with known chunk sizes. My current theory is that this problem is caused by greedily consuming the task graph i.e. prioritising ordering (do the next task on the next available worker) even when that decision necessitates moving large amounts of data.

Dask's current heuristic here is "earliest start time". It runs a task on the worker where that task will start running the soonest. This takes into account both the time of all the other tasks currently queued up on the worker, and an estimate of the time that it will take to move the dependencies to the new worker. My experience is that this usually results in good performance, but fails if the estimates of compute or transfer times is inaccurate. A lot of Dask's metadata collection is really just to inform this decision.

So when I see Dask making bad decisions like this my first reaction is to see why it was making those bad decisions. You're right that aggressive fusion of tasks is also effective in that it avoids this problem altogether.

@JSKenyon
Copy link
Author

JSKenyon commented Jun 2, 2021

Dask's current heuristic here is "earliest start time". It runs a task on the worker where that task will start running the soonest. This takes into account both the time of all the other tasks currently queued up on the worker, and an estimate of the time that it will take to move the dependencies to the new worker. My experience is that this usually results in good performance, but fails if the estimates of compute or transfer times is inaccurate. A lot of Dask's metadata collection is really just to inform this decision.

That makes sense.

Do you feel that the behaviour demonstrated in the simple example (single function, known chunk sizes, no custom anything) above can be explained by an error in Dask's estimations? If so, where would that problem be in the code? How do those estimations impact memory footprint? I must admit I was rather surprised by the memory_profiler results.

@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

Yeah, I think that the example that you have here is great at showing a simple and reproducible case where we're not behaving optimally. That's really valuable.

At first glance I don't yet have a sense for what's going on. I'm certainly able to reproduce the behavior though, which is a great start.

@gjoseph92
Copy link
Collaborator

@JSKenyon thanks for sharing your work here, I'm excited to read through it.

I suspect whatever underlying problem your plugin is solving is affecting a lot of people. The experience where

the graph in question consists of many parallel, independent subgraphs i.e. each can be processed completely independently of its neighbours

and yet the scheduler chooses to transfer tasks is something I know I've seen often. In particular, the memory usage pattern without your plugin sounds exactly like many people were commiserating with during the Pangeo forum at the summit: workers running out of memory on computations that should, in theory, be completable with only a trivial, constant amount of memory. For another example, see ocean-transport/coiled_collaboration#3: simply storing an xarray to zarr—an embarrassingly parallel operation—can run out of memory when the chunks are too small.

@mrocklin @jrbourbeau I personally think that understanding the root problem here (and considering aspects of this PR in a solution) would be a very valuable use of time. I think there's a real issue here, and we've seen it pop up from many angles now. Having this PR could provide a helpful point of comparison to broaden our thinking about how to order and schedule tasks.

@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

I agree that this is a high value problem to investigate. I am hopeful that we can resolve this with a small but nuanced change to internal heuristics.

I haven't gone through the solution proposed here in depth, but in general the idea of going through the graph, identifying common groups, setting annotations/restrictions, etc.. sounds complex. Historically adding complex solutions like this have been great short-term, but they're hard to compose with orthogonal efforts.

@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

Said a different way "rather than layer on band-aids, I think that we should figure out what is bleeding"

@JSKenyon
Copy link
Author

JSKenyon commented Jun 3, 2021

Said a different way "rather than layer on band-aids, I think that we should figure out what is bleeding"

I agree completely. If this can be fixed in a simple way elsewhere, I am all for it. However, I do not think that reasoning about graph structure is a band-aid - it is one of the few pieces of information which we have a priori.

I haven't gone through the solution proposed here in depth, but in general the idea of going through the graph, identifying common groups, setting annotations/restrictions, etc.. sounds complex. Historically adding complex solutions like this have been great short-term, but they're hard to compose with orthogonal efforts.

I agree that this solution is complex, but I think it is very important to note that it is opt-in and doesn't alter any existing functionality. That was part of the reason for implementing it as a plugin. As mentioned above, I know there are cases where it will fail, but my hope was that users could try it out and help me refine the approach. Of course, as you mentioned @mrocklin, if this can be fixed with improvements to Dask's internal heuristics, that would be ideal.

@mrocklin
Copy link
Member

mrocklin commented Jun 3, 2021

One might also consider setting optimization.fuse.ave-width to a decently high level

optimization:
  fuse:
    ave-width: 10

@fjetter
Copy link
Member

fjetter commented Jun 5, 2021

I also do consider this plugin to be a bit too complex, in particular considering that recently a lot of work has been poured into the graph representation and I'm not entirely sure how stable these APIs actually are. Also testing of this will be incredibly complex.

However, this is incredibly helpful and apart from the complex logic, there are a few important lessons to learn and I could produce an almost as effective fix to our heuristics which performs great for your example. My fixes are not stable and should only be considered POC

What you are doing with this plugin is to effectively short circuit our decide_worker logic. The decide worker logic itself is relatively stupid (leaving out a few optimizations). For tasks without dependencies (the bulk of your example computation) this boils down to "pick the worker with the least occupancy". That's effectively a random round robin. That also means that the reducer will on average need to fetch (W-1) / W of its dependencies where W = #Workers , that's a lot, especially that we haven't gained anything by using round robin, other than a slightly faster scheduling decision. Paying the cost for a smarter decision here should be easily amortised, at least for this simple example.

The below patch modifies our decision by weighing the already assigned tasks and whether they have a related dependent higher than the occupancy (effectively, occupancy is only a tie-breaker). With this, using your example, I can reduce transfer time to about 20% and total runtime to about 60% compared to default on my machine.

As I said, this is, by no means, stable and should be taken with a grain of salt but I think that's an interesting lesson. This is, kind of, a first step towards speculative task assignment, i.e. if we were to decide where the dependents should be computed first, assigning the dependencies is an easier decision.

Final note about work stealing: Work stealing is actually not really affected by this example since until the very end of the computation all workers are classified as saturated and no worker is classified as idle. under these circumstances we do not steal.

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index de3843c1..a497e6eb 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -2327,34 +2327,12 @@ class SchedulerState:
             ts.state = "no-worker"
             return ws

-        if ts._dependencies or valid_workers is not None:
-            ws = decide_worker(
-                ts,
-                self._workers_dv.values(),
-                valid_workers,
-                partial(self.worker_objective, ts),
-            )
-        else:
-            worker_pool = self._idle or self._workers
-            worker_pool_dv = cast(dict, worker_pool)
-            wp_vals = worker_pool.values()
-            n_workers: Py_ssize_t = len(worker_pool_dv)
-            if n_workers < 20:  # smart but linear in small case
-                ws = min(wp_vals, key=operator.attrgetter("occupancy"))
-                if ws._occupancy == 0:
-                    # special case to use round-robin; linear search
-                    # for next worker with zero occupancy (or just
-                    # land back where we started).
-                    wp_i: WorkerState
-                    start: Py_ssize_t = self._n_tasks % n_workers
-                    i: Py_ssize_t
-                    for i in range(n_workers):
-                        wp_i = wp_vals[(i + start) % n_workers]
-                        if wp_i._occupancy == 0:
-                            ws = wp_i
-                            break
-            else:  # dumb but fast in large case
-                ws = wp_vals[self._n_tasks % n_workers]
+        ws = decide_worker(
+            ts,
+            self._workers_dv.values(),
+            valid_workers,
+            partial(self.worker_objective, ts),
+        )

         if self._validate:
             assert ws is None or isinstance(ws, WorkerState), (
@@ -3208,13 +3186,22 @@ class SchedulerState:
                 nbytes = dts.get_nbytes()
                 comm_bytes += nbytes

+        processing_counter = 0
+        for dependent in ts.dependents:
+            for dependency in dependent.dependencies:
+                if dependency is ts:
+                    continue
+                elif dependency in ws.processing:
+                    processing_counter -= 1
+
         stack_time: double = ws._occupancy / ws._nthreads
         start_time: double = stack_time + comm_bytes / self._bandwidth

         if ts._actor:
             return (len(ws._actors), start_time, ws._nbytes)
         else:
-            return (start_time, ws._nbytes)
+            prio = (processing_counter, start_time, ws._nbytes)
+            return prio


 class Scheduler(SchedulerState, ServerNode):

@JSKenyon
Copy link
Author

JSKenyon commented Jun 7, 2021

Thanks @fjetter - that is really awesome! I am impressed by how simple the change is. I have tested it out locally too and my results seem consistent with yours. The plugin is still marginally superior in terms of transfer reduction/speed (but not by much in this example), but this will likely be infinitely more robust.

For my own interest, I ran a memory profiler on the example with the change applied. I did this a few times as the default scheduler (with the fix applied) is still not consistent between runs i.e. spotting poor memory performance requires a bit of "fishing". Here are three sets of results:

Screenshot from 2021-06-07 09-00-54
Screenshot from 2021-06-07 09-00-41
Screenshot from 2021-06-07 09-00-30

This is a substantial improvement over the default, though as you may notice, there is still the occasional inconsistent behaviour (child 4 in the first image).

I appreciate that this sort of change probably needs a great deal of thought/testing before it can propagate into a release. But I feel that there is sufficient evidence in this PR to at least motivate its consideration.

When I have a moment/resources, I will also see how this change affects my application. Thanks again for your contribution @fjetter!

@fjetter
Copy link
Member

fjetter commented Jun 7, 2021

The plugin is still marginally superior in terms of transfer reduction/speed (but not by much in this example), but this will likely be infinitely more robust.

there is still the occasional inconsistent behaviour (child 4 in the first image).

That's in alignment with my observation. My simple fixes where not able to eliminate all wrong task assignments and still required some transfer. I do not know for certain but I assume there is still some randomness involved, e.g. during the initial task assignments where there are no dependents, yet, to group by. Idk.
Another source for potential "randomness" is that my patch is subject to a relatively strong skew in key distribution which then would trigger less predictable work stealing. Consider the extreme example of a simple map-reduce graph with 1000 tasks reducing to one. My logic would assign all tasks to one worker. Then work stealing would kick in and distribute it onto the cluster. I don't think the example is affected by this strongly but I am not certain.

My patch would at least require some sort of load balancing approach to be actually robust to real world applications. I just wanted to point out where one can apply a lever to fix the problem.

@mrocklin
Copy link
Member

mrocklin commented Jun 8, 2021

So, I suspect that @fjetter already knows that issues exist for this approach, but I'll bring up one here anyway for completeness

Checking all dependencies of all dependents for all tasks is potentially very expensive. In this particular case we're only multiplying by four, (one dependent for each task, four dependencies for each dependent, so 1 x 4) which is probably ok, but this can get worse, especially in situations like a full shuffle where we're more likely to have something like 16 x 16.

We could still do something like this, but we would need to restrict it to cases where there are very few dependents (probably one) and relatively few dependencies (less than five?) but lots of tasks more generally (otherwise simple examples like the inc-dec-add demo will fail)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants