Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

an example that shows the need for memory backpressure #2602

Open
rabernat opened this issue Mar 18, 2019 · 78 comments · May be fixed by #2765
Open

an example that shows the need for memory backpressure #2602

rabernat opened this issue Mar 18, 2019 · 78 comments · May be fixed by #2765

Comments

@rabernat
Copy link

@rabernat rabernat commented Mar 18, 2019

In my work with large climate datasets, I often concoct calculations that cause my dask workers to run out of memory, start dumping to disk, and eventually grind my computation to a halt. There are many ways to mitigate this by e.g. using more workers, more memory, better disk-spilling settings, simpler jobs, etc. and these have all been tried over the years with some degree of success. But in this issue, I would like to address what I believe is the root of my problems within the dask scheduler algorithms.

The core problem is that the tasks early in my graph generate data faster than it can be consumed downstream, causing data to pile up, eventually overwhelming my workers. Here is a self contained example:

import dask.array as dsa

# create some random data
# assume chunk structure is not under my control, because it originates
# from the way the data is laid out in the underlying files
shape = (500000, 100, 500)
chunks = (100, 100, 500)
data = dsa.random.random(shape, chunks=chunks)

# now rechunk the data to permit me to do some computations along different axes
# this aggregates chunks along axis 0 and dis-aggregates along axis 1
data_rc = data.rechunk((1000, 1, 500))
FACTOR = 15


def my_custom_function(f):
    # a pretend custom function that would do a bunch of stuff along
    # axis 0 and 2 and then reduce the data heavily
    return f.ravel()[::15][None, :]

# apply that function to each chunk
c1 = math.ceil(data_rc.ravel()[::FACTOR].size / c0)
res = data_rc.map_blocks(my_custom_function, dtype=data.dtype,
                         drop_axis=[1, 2], new_axis=[1], chunks=(1, c1))

res.compute()

(Perhaps this could be simplified further, but I have done my best to preserve the basic structure of my real problem.)

When I watch this execute on my dashboard, I see the workers just keep generating data until they reach their memory thresholds, at which point they start writing data to disk, before my_custom_function ever gets called to relieve the memory buildup. Depending on the size of the problem and the speed of the disks where they are spilling, sometimes we can recover and manage to finish after a very long time. Usually the workers just stop working.

This fail case is frustrating, because often I can achieve a reasonable result by just doing the naive thing:

for n in range(500):
    res[n].compute()

and evaluating my computation in serial.

I wish the dask scheduler knew to stop generating new data before the downstream data could be consumed. I am not an expert, but I believe the term for this is backpressure. I see this term has come up in #641, and also in this blog post by @mrocklin regarding streaming data.

I have a hunch that resolving this problem would resolve many of the pervasive but hard-to-diagnose problems we have in the xarray / pangeo sphere. But I also suspect it is not easy and requires major changes to core algorithms.

Dask version 1.1.4

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Mar 18, 2019

Thanks for the writeup and the motivation @rabernat . In general I agree with everything that you've written.

I'll try to lay out the challenge from a scheduling perspective. The worker notices that it is running low on memory. The things that it can do are:

  1. Run one of the many tasks it has sitting around
  2. Stop running those tasks
  3. Write data to disk
  4. Kick tasks back to the scheduler for rescheduling

Probably it should run some task, but it doesn't know which tasks generate data, and which tasks allow it to eventually release data. In principle we know that operations like from_hdf5 are probably bad for memory and operations like sum are probably good, but we probably can't pin these names into the worker itself.

One option that I ran into recently is that we could slowly try to learn which tasks cause memory to arrive and which tasks cause memory to be released. This learning would happen on the worker. This isn't straightforward because there are many tasks running concurrently and their results on the system will be confused (there is no way to tie a system metric like CPU time or memory use to a particular Python function). Some simple model might give a decent idea over time though.

We do something similar (though simpler) with runtime. We maintain an exponentially weighted moving average of task run time, grouped by task prefix name (like from-hdf5), and use this for scheduling heuristics.

This approach would also be useful for other resource constraints, like network use (it'd be good to have a small number of network-heavy tasks like from-s3 running at once), and the use of accelerators like GPUs (the primary cause of my recent interest).

If someone wanted to try out the approach above my suggestion would be to ...

  1. Create a periodic callback on the worker that checked the memory usage of the process with some frequency
  2. Look at the tasks running (self.executing) and the memory growth since the last time and adjust some model for each of those tasks' prefixes (see key_split)
  3. That model might be very simple, like the number of times that memory has increased while seeing that function run. Greater than 0 means that memory increased more often than decreased and vice versa.
  4. Look at the policies in Worker.memory_monitor and maybe make a new one
    • Maybe we go through self.ready and reprioritize?
    • Maybe we set a flag so when we pop tasks from self.ready we only accept those that we think reduce memory use?
    • ...

There are likely other solutions to this whole problem. But this might be one.

@martindurant

This comment has been minimized.

Copy link
Member

@martindurant martindurant commented Mar 18, 2019

there is no way to tie a system metric like CPU time or memory use to a particular Python function

but we do measure the memory usage of the inputs and outputs of functions that have run (not the internal transient memory), and also know whether running of a function ought to free the memory used by its inputs. If measurements were done on the prefix basis mentioned, there could be a reasonable guess at the memory implications of running a given task.

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented Mar 18, 2019

Thanks a lot for your quick response.

One quick clarification question... You say that a central challenge is that the scheduler

doesn't know which tasks generate data, and which tasks allow it to eventually release data

In my mental model, this is obvious from the input and output array shapes. If a task has an input which is 1000x1000 and an output which is 10x10, it is a net sink of memory. The initial nodes of the graph are always sources. I assumed that the graph must know about these input and output shapes, and the resulting memory footprint, even if it has no idea how long each task will take. But perhaps I misunderstand how much the scheduler knows about the tasks it is running.

Wouldn't it be easier to expose this information to the scheduler than it would be to rig up an adaptive monitoring solution?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Mar 18, 2019

but we do measure the memory usage of the inputs and outputs of functions that have run (not the internal transient memory), and also know whether running of a function ought to free the memory used by its inputs. If measurements were done on the prefix basis mentioned, there could be a reasonable guess at the memory implications of running a given task.

That's a good point, and it's much easier to measure :)

In my mental model, this is obvious from the input and output array shapes. If a task has an input which is 1000x1000 and an output which is 10x10, it is a net sink of memory. The initial nodes of the graph are always sources. I assumed that the graph must know about these input and output shapes, and the resulting memory footprint, even if it has no idea how long each task will take. But perhaps I misunderstand how much the scheduler knows about the tasks it is running.

The scheduler doesn't know about shapes or dtypes of your arrays. It only knows that it is running a Python function, and that that function produces some outputs. You're thinking about Dask array, not Dask.

@martindurant 's suggestion is probably enough for your needs though.

@martindurant

This comment has been minimized.

Copy link
Member

@martindurant martindurant commented Mar 18, 2019

The scheduler doesn't know about shapes or dtypes of your arrays

I suppose the client does, at least for arrays, so there could be another route to pass along the expected output memory size of some tasks. In the dataframe case, the client might know enough, and in the general case, there could be a way for users to specify expected output size.

@guillaumeeb

This comment has been minimized.

Copy link
Member

@guillaumeeb guillaumeeb commented Mar 18, 2019

Hi everyone,

This is a subject of interest to me too. But I thought that Dask tried already to minimize memory footprint as explained here. Obviously, this is not what @rabernat is observing, and I've seen the same behavior as him on similar use cases.

Couldn't we just give an overall strategy for the Scheduler to use? Like work on the depth of the graph first?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Mar 18, 2019

But I thought that Dask tried already to minimize memory footprint as explained here.
Couldn't we just give an overall strategy for the Scheduler to use? Like work on the depth of the graph first?

The scheduler does indeed run the graph depth first to the extent possible (actually, it's a bit more complex than this, but your depth-first intuition is correct). Things get odd though if

  1. Some tasks that we would want to run are blocked by other things, like data transfer times. Dask might move on to other tasks
  2. Some tasks that are available we might not want to run, even if we can. We'd prefer to sacrifice parallelism and wait rather than allocate more memory
  3. Some tasks in our graph may be much more expensive in terms of memory than others, and so depth first may not be enough of a constraint to choose the optimal path
@sjperkins

This comment has been minimized.

Copy link
Contributor

@sjperkins sjperkins commented Mar 19, 2019

These issues also impact my use cases.

One thing I've considered is rather than creating a graph purely composed of parallel reductions:

a     b  c   d   e  f    g  h
  \  /    \ /     \ /    \ /
    \     /         \    /
     \  /             \ /
       \              /
         \           /
           \        /
             \     /
               \ /       

is to define some degree of parallelism (2 in this example) and then create two "linked lists", where the results of a previous operation are aggregated with those of the current.

a    e
|    |
b    f
|    |
c    g
|    |
d    h
  \ /

I haven't tried this out yet, so not sure to what extent it would actually help, but its on my todo list to mock the above up with dummy ops.

I suppose the client does, at least for arrays, so there could be another route to pass along the expected output memory size of some tasks. In the dataframe case, the client might know enough, and in the general case, there could be a way for users to specify expected output size.

In this context it's probably worth re-raising Task Annotations again: dask/dask#3783, which in a complete form would allow annotating a task with an estimate of it's memory output. I started on it in #2180 but haven't found time to push it forward.

@abergou

This comment has been minimized.

Copy link

@abergou abergou commented Mar 19, 2019

This also affects my work-flow so I would be curious to see progress on it. On our end, writing out to disk doesn't help since disk is also a finite resource. We have a few workarounds such as adding artificial dependencies into a graph or submitting a large graph in pieces.

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented Mar 22, 2019

Thanks everyone for the helpful discussion. It sounds like @mrocklin and @martindurant have identified a concrete idea to try which could improve the situation. This issue is a pretty high priority for us in Pangeo, so we would be very happy to test any prototype implementation of this idea.

@mrocklin mrocklin transferred this issue from dask/dask Apr 10, 2019
@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Apr 17, 2019

I’ll be on paternity leave for at least the next two weeks (maybe longer, depending on how things are going). If this is still open then I’ll tackle it then.

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented May 22, 2019

Hi folks...I'm just pinging this issue to remind us that it remains a crucial priority for Pangeo.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 22, 2019

👍 it's on my list for tomorrow :)

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 23, 2019

https://nbviewer.jupyter.org/gist/TomAugspurger/2df7828c22882d336ad5a0722fbec842 has a few initial thoughts. The first problem is that the rechunking from along just axis 0 to just axis 1 cause a tough, global communication pattern. The output of my_custom_function will need input from every original chunk.

image

So for this specific problem it may be an option to preserve the chunks along axis 0, and only add additional chunks along axis 1. This leads to a better communication pattern.

image

But, a few issues with that

  1. It may not work for the real problem (@rabernat do you have a "real world" example available publicly somewhere?). We should still attempt a proper solution to the problem.
  2. It increases the number of tasks, which may bump up against other scheduler limitations.

I'll keep looking into this, assuming that a different chunking pattern isn't feasible.

(FYI @mrocklin the HTML array repr came in handy).

@martindurant

This comment has been minimized.

Copy link
Member

@martindurant martindurant commented May 23, 2019

@TomAugspurger , the general problem remains interesting, though, of using the expected output size and known set of things that could be dropped of a given tasks as an additional heuristic in determining when/where to schedule that task.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 23, 2019

Yep.

I notice now that my notebook oversimplified things. The original example still has chunks along the first and second axis before the map_blocks (which means the communication shouldn't be entirely global). I'll update things.

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented May 23, 2019

This is a "real world" example that can be run from ocean.pangeo.io which I believe illustrates the core issue. It is challenging because the chunks are big, leading to lots of memory pressure.

import intake
cat = intake.Catalog('https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean.yaml')
ds = cat.GODAS.to_dask()
print(ds)
salt_clim = ds.salt.groupby('time.month').mean(dim='time')
print(salt_clim)

# launch dask cluster
from dask.distributed import Client
from dask_kubernetes import KubeCluster
# using more workers might alleviate the problem, but that is not a satisfactory workaround
cluster = KubeCluster(n_workers=5)
client = Client(cluster)

# computation A
# compute just one month of the climatology
# it works fine, indicating that the computation could be done in serial
salt_clim[0].load()

# computation B
# now load the whole thing
# workers quickly run out of memory, spill to disk, and even get killed
salt_clim.load()

image

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 24, 2019

Thanks @rabernat. I'll try it out on the pangeo binder once I have something working locally.

I think (hope) I have a workable solution. My basic plan is

  1. Track the cumulative output .nbytes of tasks at the prefix-level (easy)
  2. When deciding how to schedule under load, look up / compute the relative size
    of the input to the output, again at the prefix level. Prioritize tasks that look
    like they'll reduce memory usage. (seems harder. I don't know if we use
    prefix-level relationships anywhere else in scheduling decisions).

So if we currently have high memory usage, and we see a set of tasks like

      a1   a2  # 5 bytes, 5 bytes   |   x1  x2   # 5 bytes, 5 bytes
        \ /                         |    \ /
         b     # 1 byte             |     y      # 20 bytes

We would (ideally) schedule b before y, since it tends to have a net
decrease in memory usage. Or if we have tasks running that we know will free up
some memory, we might wait to schedule y since it tends to increase memory.
I'm sure there are things I'm missing, but this should provide a good start.

@martindurant

This comment has been minimized.

Copy link
Member

@martindurant martindurant commented May 24, 2019

@TomAugspurger , that's exactly how I was picturing things. Additionally, you may need to know whether calculating b, for example, actually releases a1 and a2.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 24, 2019

you may need to know whether calculating b, for example, actually releases a1 and a2.

Yeah, I've been looking through the recommendations -> release logic now. I'm wondering if we can also keep a counter of "completing this task recommended that we release this many bytes". That seems a bit easier to track than knowing whether a1 and a2 were actually released.

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented May 24, 2019

As a user of dask.array, it confuses me that the graph itself doesn't have this information in it already. In my mind, all of my dask.array operations know about the shape and dtype of their inputs and outputs. Consequently, the memory impact of a task is known a-priori.

I do understand that this information is not encoded into the dask graph--all the graph knows about is tasks and their dependencies. But, to my naive interpretation, it seems harder to try to "learn" the memory impact of tasks based on past experience than it does to pass this information through the graph itself, in the form of task annotation.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 24, 2019

As a user of dask.array, it confuses me that the graph itself doesn't have this information in it already. In my mind, all of my dask.array operations know about the shape and dtype of their inputs and outputs. Consequently, the memory impact of a task is known a-priori.

As you say, TaskAnnotations mentioned by @sjperkins in #2602 (comment) would let us pass that information through. I think that approach should be considered, but I have two reasons for pursing the "learning" approach within distributed for now

  1. I suspect it's easier to get a rough implementation of this in place. Task annotations will (I think) be a larger change to dask.
  2. There always be cases where the user doesn't or can't provide the expected memory usage when the graph is constructed. Say as a result of a boolean filter, or when using delayed on a source that can't easily provide the memory usage information.

I will take another look at the task annotation issue though.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 24, 2019

A promising note, the additional LOC for the tracking seems to be ~3 :) I'm just keeping a task_net_nbytes that tracks the difference between task input and output at the prefix-level. With that, we correctly see that

  1. random sample is a net increaser of memory
  2. rechunking has no effect on memory (again, we aren't measuring transient memory here, just final).
  3. my_custom_function is a net reducer of memory
--------------------------------------------------------------------------------
# task_net_nbytes after all the computation is done.
{'my_custom_function': -18666656,
 'random_sample': 20000000,
 'rechunk-merge': 0,
 'rechunk-split-rechunk-merge': 0}

The scheduling changes will be harder, but this seems promising so far.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented May 24, 2019

@TomAugspurger said

We would (ideally) schedule b before y, since it tends to have a net
decrease in memory usage

Sounds reasonable. As a warning, there might be other considerations fighting for attention here. Scheduling is an interesting balancing game.

@martindurant said

Additionally, you may need to know whether calculating b, for example, actually releases a1 and a2.

This is a nice point. One naive approach would be to divide the effect by the number of dependents of the dependency-to-be-released.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented May 24, 2019

Also, I'm very glad to have more people thinking about the internal scheduling logic!

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 24, 2019

@mrocklin do you know off hand whether the changes to the scheduling logic are likely to be on the worker, scheduler, or both? I can imagine both situations occurring, so I suspect the answer is "both".

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented May 24, 2019

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented May 24, 2019

I could also imagine us not asking the client to run dask.order at all, and doing all task prioritization on the workers. This would allow us to integrate the byte sizes into the dask.order computation (and remove it from the single-threaded client-scheduler processing). This is probably something to explore as future work though.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented May 28, 2019

I could also imagine us not asking the client to run dask.order at all, and doing all task prioritization on the workers.

FWIW, dask.order does correctly order this workload. The final task of each parallel branch comes before the first task of the next branch.

Unknown

So this looks more like a problem with reordering / waiting to execute some tasks, if we notice that we

  1. are at high memory
  2. Have a set of ready tasks that are likely to reduce memory usage.
@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Aug 5, 2019

worker_objective might be too late, at least for the kinds of problems I'm playing with right now.

For problems that start with many inputs, which can be processed (mostly) in parallel, we go down the not-ideal path in the elif self.idle here

if ts.dependencies or valid_workers is not True:
worker = decide_worker(
ts,
self.workers.values(),
valid_workers,
partial(self.worker_objective, ts),
)
elif self.idle:
if len(self.idle) < 20: # smart but linear in small case
worker = min(self.idle, key=operator.attrgetter("occupancy"))
else: # dumb but fast in large case
worker = self.idle[self.n_tasks % len(self.idle)]
else:
if len(self.workers) < 20: # smart but linear in small case
worker = min(
self.workers.values(), key=operator.attrgetter("occupancy")
)
else: # dumb but fast in large case
worker = self.workers.values()[self.n_tasks % len(self.workers)]

Naively that makes sense. But for certain problems we're willing to pay a bit of time to find a better worker for each tasks.

@mrocklin do you know if we have any scheduling logic that looks at the "siblings" of a task? If I have (top to bottom)

        a-1   a-2  a-3   a-4  a-5  a-6
          \    |    /     \    |    /
              b-1             b-2

When I'm scheduling a-2, I'd like to look at where a-1 and a-3 are perhaps scheduled to run (in testing on a toy problem, assigning this task to the worker with the most sibling tasks works).

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Aug 5, 2019

It gets even stranger, right? Not just siblings, but cousins of various levels of removed-ness :) And at some point you're going to want to cut the family tree.

I don't know of any scheduling logic that we have currently. If I were to start solving this problem I would start with pen and paper, write down a variety of situations, figure out how I would go about scheduling them in a few situations, and hopefully build out some intuition for the problem.

@dougiesquire

This comment has been minimized.

Copy link

@dougiesquire dougiesquire commented Sep 25, 2019

Any updates or progress here? I’ve been experiencing symptoms that may be related to the same issue.

When running a groupby.mean operation on a relatively large dataset using a dask cluster the memory usage of each worker increases very rapidly, to values substantially larger than what I would expect based on the chunking. The issue is particularly severe when accessing and processing data stored on an ultra-fast parallel file storage system.

My example also uses xarray,
Screen Shot 2019-09-25 at 10 04 25 am

Interestingly, the amount of memory used per worker is a function of the number of workers I provide. For a single worker, the maximum memory load is about what I would expect for this persist (~45GB). For two workers, each max out at 80GB per worker before the calculation finishes. For three workers, they start to spill to disk before the calculation completes (120GB per worker is the maximum I can request on the system I am using).

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented Sep 25, 2019

Just chiming in to say this remains a persistent and serious problem for many pangeo users. We are keen to help resolve it but not sure the best way to engage.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Sep 25, 2019

We are keen to help resolve it but not sure the best way to engage.

I get the sense that at this point the right way to engage is to find people who are interested in low memory task scheduling and have them dive into Dask's heuristics.

@TomAugspurger you looked at this problem most recently. Are you able to summarize the current state of things and avenues for work as you see them?

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Sep 25, 2019

I'm still poking at it, but all my attempts (#2847, #2940, and others that haven't made it to PR stage) have failed.

Thus far, it's seemed like the most common issues have been communication heavy workloads. So the two solutions I've attempted are better scheduling to not need as much communication, and better handling communication when we are already high on memory.

Collecting workloads that fail is still helpful.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Sep 25, 2019

Turning to @dougiesquire's example (thanks for posting that), I'm roughly trying to replicate it with the following random data. For debugging, the arrays and number of chunks is much smaller.

>>> size = (3, 24, 96, 5, 9, 10)
>>> chunks = (1, 1, 96, 5, 9, 10)
>>> arr = da.random.random(size, chunks=chunks)
>>> arr
>>> 
>>> items = dict(
>>>     ensemble = np.arange(size[0]),
>>>     init_date = pd.date_range(start='1960', periods=size[1]),
>>>     lat = np.arange(size[2]).astype(float),
>>>     lead_time = np.arange(size[3]),
>>>     level = np.arange(size[4]).astype(float),
>>>     lon = np.arange(size[5]).astype(float),
>>> )
>>> dims, coords = zip(*list(items.items()))
>>> 
>>> array = xr.DataArray(arr, coords=coords, dims=dims)
>>> dset = xr.Dataset({'data': array})
>>> dset
<xarray.Dataset>
Dimensions:    (ensemble: 3, init_date: 24, lat: 96, lead_time: 5, level: 9, lon: 10)
Coordinates:
  * ensemble   (ensemble) int64 0 1 2
  * init_date  (init_date) datetime64[ns] 1960-01-01 1960-01-02 ... 1960-01-24
  * lat        (lat) float64 0.0 1.0 2.0 3.0 4.0 ... 91.0 92.0 93.0 94.0 95.0
  * lead_time  (lead_time) int64 0 1 2 3 4
  * level      (level) float64 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0
  * lon        (lon) float64 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0
Data variables:
    data       (ensemble, init_date, lat, lead_time, level, lon) float64 dask.array<chunksize=(1, 1, 96, 5, 9, 10), meta=np.ndarray>

I see two issues. First, fuse_ave_width is again a very important parameter to tune configuration, and the presence of a single root task that's the dependency of all the open_zarr tasks causes issues with optimization. Consider the following four task graphs visualizing the dset['data'].groupby("init_date.month").mean(dim="init_date")

  1. From open_zarr, default fusion:
    zarr-unfused
  2. From open_zarr, with fuse_ave_width=200 (some large number):
    zarr-fused
  3. The random data (no to / from Zarr), default fusion:
    random-unfused
  4. The random data, with fusion:
    random-fused

The Zarr vs. random issue looks like #3032. I'll see what xarray is doing in open_zarr. If it's using from_array I'll see what things look like with concat.

I'm not sure yet how that scales up to the larger datasets @dougiesquire's is working with. Presumably too much fusing there would mean way too large of chunks, but I haven't done the math yet.

See https://nbviewer.jupyter.org/gist/TomAugspurger/8c487aeef83fc018f6fed2b345b366c9

@dougiesquire

This comment has been minimized.

Copy link

@dougiesquire dougiesquire commented Sep 30, 2019

Stupid question:
My solution to issues of this type at the moment is to use a single, multi-core worker. This works great for fixing the memory issue, but obviously means I'm limited by the size of a single node. Is there a way to "turn off" inter-worker communication?

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Sep 30, 2019

For now, your best bet may be set the distributed.worker.connections.outgoing config value to some small number (3-10?)

#3071 will effectively do that automatically when the worker is under high memory pressure, so that too much communication won't end up killing a high-memory worker, but you still get the benefit of communicating with many workers when memory is normal.

@dougiesquire

This comment has been minimized.

Copy link

@dougiesquire dougiesquire commented Oct 1, 2019

For now, your best bet may be set the distributed.worker.connections.outgoing config value to some small number (3-10?)

Thanks for this, @TomAugspurger. Unfortunately tweaking distributed.worker.connections.outgoing doesn't seem to make much difference at all.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Oct 1, 2019

@dougiesquire what does the dashboard show just before the workers are dying? High memory usage and a lot of communication (overlaid in red)?

@dougiesquire

This comment has been minimized.

Copy link

@dougiesquire dougiesquire commented Oct 1, 2019

@dougiesquire what does the dashboard show just before the workers are dying? High memory usage and a lot of communication (overlaid in red)?

Yup - this is a screen shot of the dashboard as the workers start to die
Screen Shot 2019-10-02 at 8 10 25 am

@rabernat

This comment has been minimized.

Copy link
Author

@rabernat rabernat commented Oct 2, 2019

the presence of a single root task that's the dependency of all the open_zarr tasks causes issues with optimization

What is this "root task"? Is it something we could eliminate on the xarray (or dask) side?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Oct 8, 2019

What is this "root task"?

By this I mean a task on which many other things depend. It is probably a reference to the Zarr file. You could probably optionally recreate the Zarr file in every task. In some cases that might slow things down. I'm not sure.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Oct 8, 2019

Folks here might want to take a look at dask/dask#5451

Tom and I were playing with high level fusion, and this was an interesting outcome.

@Thomas-Moore-Creative

This comment has been minimized.

Copy link

@Thomas-Moore-Creative Thomas-Moore-Creative commented Nov 13, 2019

I believe @dougiesquire has provided some example(s) but is there anything additional someone facing this issue could log or document that could assist with a solution?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Nov 13, 2019

Nothing comes to mind at the moment. This probably needs someone to think hard about the problem and general task scheduling policies for a while. Most of the people who do that regularly are pretty backed up these days.

@TomAugspurger

This comment has been minimized.

Copy link
Member

@TomAugspurger TomAugspurger commented Dec 13, 2019

Short summary a call with @dougiesquire and @Thomas-Moore-Creative yesterday, looking at the problem described in #2602 (comment).

This is a common

  1. read from zarr
  2. group by month
  3. reduce

workload.

We eventually got things working well by setting fuse_ave_width to a sufficiently large value for the workload. We tried with Dask 2.9 and unfortunately the root fusion wasn't enough to fuse things correctly for this workload. I'm going to explore that a bit more today.

@Thomas-Moore-Creative

This comment has been minimized.

Copy link

@Thomas-Moore-Creative Thomas-Moore-Creative commented Dec 13, 2019

@TomAugspurger - thank you for taking the time to look at the problem with us. Greatly appreciated and part of why we love this Pangeo community.

@eriknw

This comment has been minimized.

Copy link
Contributor

@eriknw eriknw commented Dec 20, 2019

I can narrowly comment on @TomAugspurger's example of "open_zarr with fuse_ave_width=200" that resulted in this DAG:
zarr-fused

Specifically, this doesn't fuse down similar to "random data with fusion" like
random-fused
because of the hold_keys optimization introduced in dask/dask#2080.

Without hold_keys, the optimized DAG looks like
zarr-fused

To be honest, I still don't fully grok what we want to accomplish with hold_keys, if it's still necessary, or if it can be improved.

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 21, 2019

To be honest, I still don't fully grok what we want to accomplish with hold_keys, if it's still necessary, or if it can be improved.

Often we don't want to inline the array object into every task. This is the case with numpy arrays (they're faster to serialize on their own, and we don't want to copy the full array for every individual task), and for objects like h5py.Datasets which don't work well with pickle. It may be that with Zarr we do want to include the Zarr object within every task individually. This would be the case if it was very cheap to serialize.

@eriknw

This comment has been minimized.

Copy link
Contributor

@eriknw eriknw commented Dec 23, 2019

I don't see why hold_keys is necessary for any of what you just said. This may mean I don't understand what you just said, nor do I understand the previous discussions around hold_keys. I understand what the code hold_keys is doing, but I don't get why.

Furthermore, I suspect the graph from open_zarr with fuse_ave_width=200 likely represents a failure of graph optimization, and, for this, hold_keys is the culprit. Whatever the goal of hold_keys, I suspect it's better done as part of fuse. I would prefer to fix this optimization as a more general solution instead of repeating open_zarr in every task.

open_zarr above won't get fused, and it wouldn't get fused if it were a numpy array either.

It seems there are a couple considerations regarding what hold_keys tries to do with getters such as getitem.

  1. getters often return much smaller objects, so we'd sometimes rather send around the object after the getters are applied than before, and
  2. the objects getters are applied to may not be serializable, so, once again, we'd rather send the object after the getters are applied.

Is this correct? Is (2) a primary concern? The consequence of being unserializable is that all direct dependents must be executed on the same worker, right?

I believe the open_zarr example above is a failure case, because it results in sending the original objects to many, many tasks, and then the results of those tasks need to be sent around as part of a reduction. If the user knows their workflow and sets fuse_ave_width very high, then dask should respect that and use the graph with three branches that I shared in my previous post.

I think fuse can be improved if it's given two sets of keys that:

  1. are likely to reduce the size of data, and
  2. are unable to be serialized.

Thoughts?

@mrocklin

This comment has been minimized.

Copy link
Member

@mrocklin mrocklin commented Dec 23, 2019

Whatever the goal of hold_keys, I suspect it's better done as part of fuse

That could be. Looking at the docstring of hold_keys, one of the main objectives was to avoid fusing things like "x" below:

{"x": np.array([1, 2, 3, ...]),
 "y": (inc, "x"),
 "z": (dec, "x"),
  ...
}

We don't want "x" to be fused anywhere because it is easier to serialize as a raw value, rather than tucked away into a task.

Furthermore, I suspect the graph from open_zarr with fuse_ave_width=200 likely represents a failure of graph optimization, and, for this, hold_keys is the culprit.

I definitely agree that a width of 200 is a fail case. What are we trying to achieve with that example? I'm not sure that there is an optimization to be made?

For root fusion (I'm not sure that this is what we're talking about here or not) we would want to benefit in cases like the following:

x = da.open_zarr(...)
y = da.open_zarr(...)
z = x + y
z.sum().compute()

Ideally the matching chunks from x and y would be loaded on the same machine. This is easy to guarantee if they are part of the same task. Root fusion does this if the task doesn't have any dependencies, which in this case, they do, which blocks things. I'm not sure that any of the problems presented above have this structure though, so that might not be what we're talking about here.

So, do we know what the ask is here? Do we have confidence that the problem presented above is likely to be resolved by improved optimization?

Is (2) a primary concern? The consequence of being unserializable is that all direct dependents must be executed on the same worker, right?

Not quite. We can serialize these things, but only if they're bare tasks, because then our custom serialization can take over, rather than having to rely on pickle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
You can’t perform that action at this time.