Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Idea] Handle large graphs by spilling tasks to disk on Scheduler #5630

Open
gjoseph92 opened this issue Dec 27, 2021 · 8 comments
Open

[Idea] Handle large graphs by spilling tasks to disk on Scheduler #5630

gjoseph92 opened this issue Dec 27, 2021 · 8 comments
Labels
discussion Discussing a topic with no specific actions yet performance stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@gjoseph92
Copy link
Collaborator

As clusters and larger compute jobs become more accessible, large task counts become more common. The distributed scheduler currently struggles when task counts get high (xref https://docs.dask.org/en/stable/best-practices.html#avoid-very-large-graphs, #3898, others). Anecdotally, things often feel rough in the 100K range, and may struggle to function beyond 1M tasks. There are likely many reasons for this, and we don't understand them well (one of my personal theories is that the sheer number of Python objects slows down GC #4987), but when there aren't lots of tasks in memory, things usually seem fine—so what if rather than carefully profiling and understanding this problem, we just avoid ever having many tasks in memory?

At any given moment, usually only a small fraction of tasks are actually relevant to scheduling decisions or state updates. The scheduler is designed around O(1) scheduling—we don't look at the whole graph, just the immediate next step. In principle, that should mean we could strategically swap most TaskStates to disk without affecting scheduling performance, since we wouldn't look at most of them right now anyway.

The goal here would be for the number of tasks in memory to only depend on the number of tasks actually running/processable at the moment (closer to number of workers, or nthreads), not the total size of the graph. And to therefore support running nearly-arbitrarily-large graphs (billion tasks anyone?), swapping a small subset of tasks in and out of disk in a rolling fashion. (Obviously, actually doing a billion tasks would require streaming serialization of graphs and other such changes; I'm just talking high-level for now. We could implement this progressively, and it would still add a lot of value at the million-task scale with HLGs but no serialization changes.)

Additionally, moving the primary "database" of TaskStates out of Python memory could open doors for interesting (and tricky) work around scheduler restarting or fault-tolerance.

Also just to note, I'm using "spilling" as a general term—I don't think this should happen transparently though a zict buffer like Worker.data. Scheduler performance is too critical for implicit behavior. The loading/unloading from disk should happen explicitly, so we can reason about and guarantee behavior.

cc @fjetter @crusaderky @jcrist @jakirkham @jrbourbeau

@gjoseph92 gjoseph92 added performance discussion Discussing a topic with no specific actions yet stability Issue or feature related to cluster stability (e.g. deadlock) labels Dec 27, 2021
@jakirkham
Copy link
Member

cc @madsbk @rjzamora @quasiben

@crusaderky
Copy link
Collaborator

crusaderky commented Jan 7, 2022

I like the general idea, but I think that spilling to disk is unnecessarily expensive. If the target is hiding the objects from the GC, then converting the TaskState objects to pickled blobs in memory would be a lot faster - as long as RAM usage on the scheduler is not an issue (and I doubt it is for most people).

@crusaderky
Copy link
Collaborator

...which makes the point emerge:
If the tasks are not being accessed by the scheduler to begin with, then that leaves a burden exclusively on the GC.
Which means that we can get a good best-case estimate of the benefit of your proposal without touching any code: if you disable the GC, how much faster does a scheduler with 1m+ tasks on it run?

@gjoseph92
Copy link
Collaborator Author

if you disable the GC, how much faster does a scheduler with 1m+ tasks on it run?

Great question. I'll try to test this at some point. We've experimented with ~100k tasks in #4987 (comment) and seen a speedup, but I haven't tried in the millions.
I'm also particularly curious about qualitatively how responsive the dashboard feels. The lagginess of the dashboard is something you can really feel going from the 10k task range to the 100k/1m task range.

I think that spilling to disk is unnecessarily expensive. If the target is hiding the objects from the GC, then converting the TaskState objects to pickled blobs in memory would be a lot faster

I like this idea too; I was trying to think of some way to hide objects from GC in memory, but this is much simpler than anything I came up with. I agree that actual scheduler RAM usage from tasks is usually not the issue, but part of that is probably because it falls over for other reasons if you give it a lot of tasks, so nobody is able to push that RAM limit right now. If these changes made the scheduler easily able to handle millions of tasks, then we might see RAM actually start to become a limiting factor. The good thing is that switching from a model of pickled TaskStates in memory to pickled TaskStates on disk seems like a pretty natural progression.

@jcrist
Copy link
Member

jcrist commented Jan 7, 2022

Not all objects are tracked by the GC for cycle detection - only objects that are capable of creating reference cycles increase the load on the GC. Builtin objects have the following optimizations to reduce the number of items tracked by the GC:

  • Tuple containing only untracked objects are also untracked. These are untracked at creation time.
  • Dicts are untracked at creation time, added to the GC list when a tracked item is added to them, and removed from the GC list during a GC pass if no tracked items remain in the dict (so adding a tracked item will track the dict, but removing the tracked item won't untrack the dict until a GC pass).

Also note that the cpython GC is object counter based. Every time a tracked object is created, the counter increases, and every time a tracked object is destroyed the counter decreases. So creating and destroying a few tracked objects in a hot loop won't trigger the GC, but suddenly creating a lot tracked objects (during e.g. deserialization) will.

With c extensions/cython, you can also disable GC tracking for certain objects (if you're sure no reference cycles can ever be created by those objects alone). For example - in msgspec I added an optimization that msgspec structs are untracked at creation time if none of their attributes contain a tracked object. This has a measurable impact on deserialization speed, as it can avoid triggering additional unneeded GC cycles.

So a few points we might take from this:

  • If we use cython and are certain our TaskState objects can never create a cycle themselves (they may still participate in cycles, but no cycle made of only TaskState objects can exist), we could annotate them with @cython.no_gc. Trackable subfields (e.g. lists/dicts/sets) will still be tracked, but that might reduce some GC load. I believe this should be safe to do, should be easy (a one line change), and may be effective.
  • We might increase the GC threshold to reduce the frequency of GC runs.
  • We might disable GC temporarily during operations that are likely to create many many trackable objects (which may needlessly trigger a GC cycle where all new objects are still live).
  • We might take the above tuple/dict optimizations into account when designing our data structures to reduce GC load.

@gjoseph92
Copy link
Collaborator Author

Yeah, there's been a bit of discussion about these GC strategies in #4987, especially from @jakirkham. My intent with this issue was to present a totally alternative approach:

  • We don't know what the problem is with GC, or even it it's the problem.
  • Trying to figure out what the problem is with GC has been difficult and a huge time-sink in the past, without yielding anything useful
  • Making any of these GC-related tweaks (@cython.no_gc, changing thresholds, tuple/dict optimizations, etc.) probably require understanding what the problem is in order to not be shooting in the dark.

However, we also notice:

  • When there aren't many tasks things are generally fine. If everybody just had 1000 tasks we wouldn't be talking about GC on the scheduler.
  • So, could we make there just be ~1000 tasks in memory even if there are more in play? Would we then have "solved" the "GC problem" without ever figuring out what the problem exactly was (or if GC was even the problem)?
  • Basically, we have a state that we know works (few tasks); can we turn the state that doesn't work (many tasks) into the state that we know does?

@jcrist
Copy link
Member

jcrist commented Jan 7, 2022

Making any of these GC-related tweaks (@cython.no_gc, changing thresholds, tuple/dict optimizations, etc.) probably require understanding what the problem is in order to not be shooting in the dark.

Sure, and I don't want to derail that discussion. But if you already have a benchmark setup, I'd think that annotating TaskState with @cython.no_gc and recompiling would be a lot simpler to benchmark than writing all the code to store excess task states as in memory bytes. If it makes no noticeable difference then I wouldn't keep delving down this rabbit hole, but it might be a nice easy win in this case.

@chrisroat
Copy link
Contributor

FWIW, in the issue mentioned at the start (#3898), the memory of the scheduler is not released when the graph is complete -- even after client restart. So after several large graphs, the memory grows quite large and I see degradation of performance. It's not clear to me if this is task-related, or potentially some I/O buffer.

Here is an example showing the memory growth on the scheduler, which I check by watch at the time series graph of the "System" tab.

import dask
import dask.array as da
import distributed


with dask.config.set({"distributed.scheduler.transition-log-length": 100}):
    client = distributed.Client(threads_per_worker=1)
_ = da.zeros(100_000, chunks=1).map_blocks(lambda x: x).compute()

In my application, I simply monitor the scheduler memory usage. When it gets large (a couple GB), I kill the cluster and then restart.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet performance stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

5 participants