-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix threaded scheduler memory backpressure regression #8040
Conversation
I see test failures are related to ordering of the tasks. Someone from dask core is going to have to explain to me or point me to the issues/PRs where ordering was determined to be necessary. |
dask/local.py
Outdated
@@ -487,7 +489,7 @@ def fire_tasks(chunksize): | |||
pack_exception, | |||
) | |||
) | |||
del state["ready"][:ntasks] | |||
del state["ready"][nready - ntasks : nready] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this would be more natural as the following?
while len(state["ready"]) > ...:
key = state["ready"].pop()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes along with the assumption I mention above. Does state
get updated by other threads/asynchronous routines? If so then doing pop
could be removing things that were being added by these other methods. That's why I did the fancy nready
-based indexing. Otherwise, if that assumption is wrong then the original [:ntasks]
would work.
Regarding while
loop, that would perform worse compared to del
wouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, if that assumption is wrong then the original [:ntasks] would work.
Sorry, no it wouldn't it would have to be del state["ready"][-ntasks:]
because of the reverse ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does state get updated by other threads/asynchronous routines?
The scheduler code is all run from within the same thread. While Dask enables multi-threaded execution it has historically been hard to get any speedup in data-structure heavy code like this.
Regarding while loop, that would perform worse compared to del wouldn't it?
I doubt that it would perform worse in any measurable way. I think that it's also a more common pattern.
I think that what you're trying to do is to pull tasks off the end of the ready stack until we're saturated again. I think that while True: L.pop()
is a more common idiom there.
When I see ready[nready - ntasks:nready]
my brain has to think a lot more to figure out what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I see ready[nready - ntasks:nready] my brain has to think a lot more to figure out what's going on.
How does del state["ready"][-ntasks:]
make you feel?
I'll see how I can clean this up...maybe time the while loop approach. I personally (Computer Engineering background) would see the while
loop and wonder why del
wasn't used, but not my project so 🤷
This also ignores the failing tests that don't like that I'm processing things in reverse order. I don't have the historical context to know why that test exists and why execution order matters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aw, I just finished timing it:
In [10]: %%timeit
...: state = {"ready": list(range(100))}
...: ntasks = 10
...: nready = len(state["ready"])
...: while len(state["ready"]) > nready - ntasks: state["ready"].pop()
...:
...:
1.94 µs ± 46.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [11]: %%timeit
...: state = {"ready": list(range(100))}
...: ntasks = 10
...: nready = len(state["ready"])
...: del state["ready"][-ntasks:]
...:
...:
685 ns ± 4.22 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
I don't use timeit a lot so I put all the setup code in the timed code to make sure that the list didn't go empty and change the results of the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally (Computer Engineering background)
I just realized how arrogant this sounds. Not how I meant it, sorry. Just saying it may explain a differing opinion on the matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it was added / modified the last time someone edited this code. Regardless, we should add tests to serve as a strong signal that execution order should follow dask.order ordering. Maybe a Callback that ensures that the order/priority of executed tasks in a single-threaded situation match dask.order?
Last change to the test_local.py
ordering test was you in #3271. So that means dask.order
doesn't work well for this example case (220MB versus 10GB memory usage). I see priorities can be controlled (https://distributed.dask.org/en/latest/priority.html), but is there any other way to order things so reductions like this calculation work better?
d = da.random.random((5000, 250000), chunks=(1, -1)).sum()
Let me try turning graph optimization back on...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, 1us of difference doesn't matter much here. Getting an item off of a Queue takes 50us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this ends up not mattering since the whole point of this is to delete the tasks in reverse order, but since I should be following dask.order
that means del state["ready"][:ntasks]
is the way it has to be done. That is, unless there is a common while
loop based usage in dask for deleting things off the front of a list.
I just want to document this somewhere. I noticed that the example code from #7583 turns off graph optimization. Leaving it on, the default produces the following profiles for the same example code otherwise. I ran it with a few different variations of this PR. Available workers - dask.order orderMax memory: ~187MB All workers - dask.order orderMax memory: ~197MB Available workers - reverse dask.order orderMax memory: ~179MB The cache difference is very interesting to me. I also did the dask.order versus reverse order with a real world Satpy case and the difference was ~3GB of memory usage (~8GB -> ~5GB). ...just documenting some findings. |
Also use while loop list pop to be consistent with other parts of dask.
To be clear with this last commit and my recent findings:
|
Thanks for working on this @djhoese. Your changes look promising. However, I tried running the xarray example I posted in pydata/xarray#5165 on this PR and unfortunately memory still seems blow up somewhere in the calculation, where it did not on 2021.03.1. import xarray as xr
import pandas as pd
import numpy as np
import dask.array as da
dates = pd.date_range('1980-01-01', '2019-12-31', freq='D')
ds = xr.Dataset(
data_vars = {
'x':(
('time', 'lat', 'lon'),
da.random.random(size=(dates.size, 360, 720), chunks=(1, -1, -1))),
'clim':(
('dayofyear', 'lat', 'lon'),
da.random.random(size=(366, 360, 720), chunks=(1, -1, -1))),
},
coords = {
'time': dates,
'dayofyear': np.arange(1, 367, 1),
'lat': np.arange(-90, 90, .5),
'lon': np.arange(-180, 180, .5),
}
)
# My original use case was pulling this data from disk, but it doesn't actually seem to matter
ds.to_zarr('test-data', mode='w')
ds = xr.open_zarr('test-data')
ds['anom'] = ds.x.groupby('time.dayofyear') - ds.clim
ds[['anom']].to_zarr('test-anom', mode='w') |
Another option here would be to walk back things to pre concurrent.futures, and then do a minimal change that swaps out |
Ok I was able to manually apply everything from the original concurrent.futures PR except for the batch job processing and that seems to stay at a low memory usage. Now to investigate the batch processing more... Edit: Side note: I noticed that |
FWIW I personally do not care about batch job processing. It's not at all
a commonly used feature. I wouldn't optimize for it.
…On Mon, Aug 16, 2021 at 12:46 PM David Hoese ***@***.***> wrote:
Ok I was able to manually apply everything from the original
concurrent.futures PR except for the batch job processing and that seems to
stay at a low memory usage. Now to investigate the batch processing more...
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#8040 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTAT3VREZAU5ND3EP3DT5FFGLANCNFSM5CFG4UQQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
.
|
Got it...again! So the reverse order is needed. The 2021.03.1 code uses One sorted ready sets backwards but that was at the start of The second one was when finishing a task and adding any dependents that were waiting for that task to finish. These were being reversed which as I said goes against what So my original code was actually correct with pulling things off the end of I noticed one last difference though that I need to make a commit for: in the older code the The point is: get_async for local should always do things in reverse. Edit: Ugh but then why do the tests fail!?!?!? |
Found it! It's because the tests were changed in the concurrent.futures PR to match the behavior that was being done: https://github.com/dask/dask/pull/6322/files?file-filters%5B%5D=.py#r598060370 I disagree with @jakirkham here in this comment. Because we are changing Here is the old version of the test: def test_local_parents_of_reduction(abcde):
"""
c1
|
b1 c2
| /|
a1 b2 c3
| /|
a2 b3
|
a3
Prefer to finish a1 stack before proceeding to b2
"""
...
expected = [a3, a2, a1, b3, b2, b1, c3, c2, c1] And the new expected: expected = [a3, b3, c3, a2, a1, b2, b1, c2, c1] So the question is: which one is correct? Or is there another one that is more correct? I need to process the tasks in some kind of reverse order so that If I put the |
Whichever one agrees with dask.order :) (also, I thought I had already responded to this by e-mail, but I'm not seeing that response come up here. My apologies if this is a duplicate) |
This fixes my xarray example above, nice work! |
This seems wrong to me. We should be able to release a3 before starting anything on the c branch. |
Agreed. If you check out the comment I linked to on the related PR, @jakirkham justifies the difference because the overall groups of tasks (a, b, c) are still processed in the same order. In his PR it also meant that the sync scheduler behaved more closely to the threaded/multiprocess schedulers. I'm going to verify some stuff and try to add a more complex numpy test as you mentioned in the bug report. Still catching up on morning emails though. |
Ok I did a little check and the old order ( Where the original concurrent.futures (current Side note: Learning all of the nuts and bolts of the threaded scheduler really makes me want to rewrite it to see if it could be made faster (less copy/moving things to different lists, multiple threads that are allowed to update the state for the main thread). Sadly I think that the |
We're talking a total overhead of 100us per task. Assuming that your tasks are taking longer than a millisecond then this overhead isn't that big of a deal. There are bigger fish to fry :) |
Thank you for the test. That looks great and very reassuring. Is there anything left here from your perspective? |
def test_complex_ordering(): | ||
da = pytest.importorskip("dask.array") | ||
from dask.diagnostics import Callback | ||
|
||
actual_order = [] | ||
|
||
def track_order(key, dask, state): | ||
actual_order.append(key) | ||
|
||
x = da.random.normal(size=(20, 20), chunks=(-1, -1)) | ||
res = (x.dot(x.T) - x.mean(axis=0)).std() | ||
dsk = dict(res.__dask_graph__()) | ||
exp_order_dict = order(dsk) | ||
exp_order = sorted(exp_order_dict.keys(), key=exp_order_dict.get) | ||
with Callback(pretask=track_order): | ||
get_sync(dsk, exp_order[-1]) | ||
assert actual_order == exp_order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrocklin So this is the first draft of a test like you talked about (I think). However, it only uses one large chunk so even though the ordering is correct I'm not sure it is actually testing what you were hoping. If I make the array more than one chunk then the order does not match. dask.order
expects all the normal
and transpose
tasks to be done for an entire row of chunks (I think) before it does the first mean_chunk
.
The pre-concurrent.futures scheduler matches the current scheduler but does not match dask.order
exactly. The scheduler prefers to work on a chunk as far as it can which apparently means doing normal
-> transpose
-> mean_chunk
and then going on to the next chunk. This is because it immediately processes any tasks that were recently added to the "ready" list (dependencies have all been processed and cached).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the local scheduler more and the only way to resolve this that I can think of is to only add waiting
tasks to ready
when the task before it (order-wise) is in the ready
/running
/finished
list already. However, this part of the code doesn't know anything about the dask.order
order, it is only using the pre-sorted lists of tasks. It says "ah this waiting task has no waiting dependencies, then I should run it". Eager execution I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @eriknw should we trust order or should we trust runtime decisions like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to take a closer look to understand the current behavior to provide a more informed opinion.
Generally, I would expect the local scheduler to trust and defer to dask.order
within reason unless it has a good reason not to. A good reason not to would be that a dynamic scheduler can know the sizes of data, and can prioritize releasing large data. Another good reason not to could be for performance. A dynamic scheduler may also estimate runtime and data sizes, and is better able to react to the current state of execution.
Didn't the previous threaded scheduler basically run the best available task according to dask.order
? If there is now under-informed greedy, LIFO, or FIFO behavior occurring, I do worry about some workloads behaving poorly. dask.order
is complicated for good reasons! The local scheduler should probably be stress-tested similar to how I stress-test dask.order
. I may be able to get some time to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't the previous threaded scheduler basically run the best available task according to dask.order? If there is now under-informed greedy, LIFO, or FIFO behavior occurring, I do worry about some workloads behaving poorly.
The main thing in this PR is that it is restoring pre-2021.04.0 behavior. This PR basically takes away the "oh I think we can do this better" decisions that were made in the concurrent.futures PR which maybe tried to do too much for a single PR (concurrent.futures, batch task submission, reordering tasks to avoid reverse sorting).
The threaded scheduler will keep processing down (up?) a branch of the tree until it can't any longer. This is against what dask.order
would prefer where dask.order
would rather stop processing a certain chunk and balance out the processing of other chunks. Which dependencies get processed first are influenced by dask.order
so overall order of execution for branches further up the tree (not directly depending on leaves) should remain intact.
- Could the threaded scheduler "listen" to
dask.order
more? Absolutely. - Was it listening more before the concurrent.futures switch? No. This PR has equivalent behavior.
- Was it listening more after the concurrent.futures switch? Even less so. It was directly contradicting what
dask.order
was trying to do.
See my comment on the test I added. |
Looking at the changes this seems fine to me. I'm +1 to merge. I'm also happy to pause for a moment to see if @eriknw has suggestions or to see if we have any possible improvements on testing. |
My opinion on this is that this should be merged and released (sooner rather than later) given that it is reproducing the previous behavior and reduces memory usage so dramatically for real world cases. I think if |
I agree that this is a priority fix. We would normally release next Friday. However we discussed this PR in weekly maintainer meeting and brought up the possibility of doing an out-of-cycle release this Friday instead. When I say pause I really just mean pausing for the day to give Erik a chance to respond. |
Thanks for working on this David! Sorry for the issues here. Just got back from PTO, so have only skimmed the conversation above and changes here. In general this looks sensible to me. As to the ordering with the one test, I was seeing the same ordering when using threaded and multiprocessing prior to the The batching code was added based on user feedback. It also was needed to keep the multiprocessing case performant. Agree in retrospect that was maybe a bit too much and we should have handled that in a separate PR. It looks like we were able to preserve batching with these changes (which is good news!), but if there are still issues here please let me know. Everything is still running in a single blocking threaded. Though one of the hopes I had with the Not sure if that covered all of the questions. So please let me know if there are others. |
Hi @jakirkham!
Do you remember if you were seeing that with a single worker? I agree that it doesn't make sense why the schedulers should run a different order for a single worker case. For multiple workers it makes sense though as you get to eat away at more of the leaves of the tree causing more multi-dependent tasks to get added to the "ready" queue.
It is a great feature and I noticed differences when applying it to some of my Satpy real world cases. You are right that it is preserved here and there are no issues. The main issue was just the order that the jobs were being pulled off the
I had this feeling too. When playing around with the |
OK, I'm going to merge this in. I suspect that we can do better by adhering to dask.order more, although I also suspect that our deviations here are mild and local. Thank you @djhoese for stepping up and resolving this quickly. Thank you also to @crusaderky for raising the original issue and to everyone for reviewing. Unless any objections get raised on the community release issue this is slated to go out on Friday. |
I don't recall unfortunately.
Glad it has been useful. Was a bit worried that might be getting to niche. Happy to hear that is not the case :) Gotcha thanks for identifying/fixing this. Sorry for missing this ordering issue earlier.
Yeah exactly. I had some ideas about how we could chain |
Regarding the batching, I should clarify that I don't actively use it in my work. I just tried it out while working on this PR and saw some difference. However, I think it could be really useful in the future if/when the local scheduler gets smarter. Like knowing that all the dependencies for a particular task are ready to be computed so instead of scheduling all the dependencies then on the next scheduling iteration schedule the dependent task, send them as one batch (assuming they are near by in I like the idea of using |
The main debug of this issue can be seen in #7583. I went through the entire series of steps for this where I started with no knowledge of how this worked to now coming up with ideas for how this could perform better 😉
The Issues
The issues fixed here were introduced in #6322 (CC @jakirkham). They can be tested by monitoring/profiling the memory usage from this example:
If you uncomment the visualize call then you can see the full profile output for it (see the related issue for screenshots). Ever since #6322 the memory for this example is huge at ~10GB max. Prior to that PR the max memory was ~220MB.
Fixes
This PR fixes things by doing two main things:
concurrent.futures
in local scheduler #6322 only the number of workers are used, not the available workers. This ends up with dask submitting all tasks and over subscribing workers.concurrent.futures
in local scheduler #6322 that the reverse order wasn't needed, but without it in cases like the example above you end up filling up memory by producing all thenp.random
chunks into memory before ever going on to thesum
tasks. This is the main reason memory fills up.Assumptions
state['ready']
can be updated by other threads so instead of doing the simpler syntax of grabbing and deleting the end of thestate['ready']
list I keep track of the current index and do changes relative to that. If someone can confirm that thisstate
is only updated in the current thread and isn't asynchronous (never gets updated outside ofget_async
) then this can be simplified.if not len(state["running"]): continue
in the main loop because I wanted to avoid the case where we wait on thequeue.get
but there is nothing submitted byfire_tasks
. However, after thinking about it more I don't think this is possible.Tests and Questions
state['nready']
list? I assume since everything else was done in-place in previous versions that I must also do things in-place here.sum
tasks with thenp.random
tasks that they depend on.black dask
/flake8 dask
/isort dask