Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance regressions after queuing PR #295

Open
gjoseph92 opened this issue Sep 1, 2022 · 7 comments
Open

Performance regressions after queuing PR #295

gjoseph92 opened this issue Sep 1, 2022 · 7 comments

Comments

@gjoseph92
Copy link
Contributor

dask/distributed#6614 causes a significant performance improvement to test_vorticity:
image

And a significant performance regression to test_dataframe_align:
image

I think the key change is fixing dask/distributed#6597.

test_vorticity comes from dask/distributed#6571. This workload is the one where we discovered the co-assignment bug that's now fixed. So it's encouraging (and not surprising) that improving co-assignment significantly reduces transfers, and improves performance.

test_dataframe_align is a bit surprising. You'd think that assigning matching partitions of the two input dataframes to the same worker would reduce downstream transfers—which it indeed does.

Worth noting: in my original benchmarks, test_dataframe_align was probably the most affected by root task overproduction out of everything I tested:

I ran it manually a few times both before and after the queuing PR. I also tried turning work stealing off, but it didn't make a difference.

Before

before

After

after

If you stare at these GIFs for a while, you can notice that:

  1. In "after", progress bars 3, 4, and 5 take longer to start, then accelerate faster. In "before", it feels like there's more of a linear lag between the top two bars and the next three.
  2. The memory usage peaks higher "after" (we already knew this from the benchmark)

Here's a comparison between the two dashboards at the same point through the graph (785 make-timeseries tasks complete):

comparison2

This is also a critical point, because it's when we start spilling to disk in the "after" case. The "before" case never spills to disk.

You can see that:

  1. There were no transfers in the "after" case. Before, we had lots of transfers. So improved co-assignment is working correctly.
  2. There are way more repartition-merge tasks in memory (top progress bar) in the "after" case. Nearly every executed task is in memory. Compare that to the before case, where half have already been released from memory.
  3. ...because there are way fewer sub, dataframe-count, and dataframe-sum executed. These are the data-consuming tasks.
    1. A bunch of subs have just been scheduled. A moment before, none of those were in processing. So sadly, they arrive just a moment too late to prevent the workers from spilling to disk.

The simplest hypothesis is that the repartition-merges are completing way faster now, since they don't have to transfer data. Maybe that increase in speed gives them the chance to run further ahead of the scheduler before it can submit the sub tasks? This pushes the workers over the spill threshold, so then everything slows down.

I know I tend to blame things on root task overproduction, so I'm trying hard to come up with some other explanation here. But it does feel a bit like, because there's no data transfer holding the tasks back anymore, they are able to complete faster than the scheduler is able to schedule data-consuming tasks.

What's confusing is just that repartition-merge isn't a root task—and we see relatively few make-timeseries or repartition-splits hanging around in memory. So why is it that scheduling subs lags behind more?

@fjetter
Copy link
Member

fjetter commented Sep 1, 2022

I know I tend to blame things on root task overproduction, so I'm trying hard to come up with some other explanation here.

Root task overproduction is an observation but not a cause. As you are telling us already, repartition-merge is not a root task so this is rather a general "data producer overproduction".
What is causing this overproduction? Why are the ready subs not computed asap? There always seems to be a couple of sub ready, i.e. grayed out in the bar. IIUC this is the processing state on scheduler side, i.e. there is already a message queued up to the worker. Do we really believe that scheduler latency is causing this?
Especially after a disk-write I would expect the workers to immediately pick up sub tasks. If not sub, they should pick up repartition tasks. What I see more often than not is that they are executing yet another make-timeseries. It's really hard to tell, though with all the green shades. I think the only worker that executes stuff as I would expect it to is the last worker on the bottom. There is a lot of green stuff and there is barely any memory in RAM. That's what my intuition would tell is how a near perfect scheduling should look like in this situation.

  1. We may want to verify graph ordering. This may be a bit tricky but I would suggest looking at the ordering of the full graph. dask.order has many heuristics that sometimes choose different paths depending on (sub)graph sizes (at least this was the case last time I ventured into these depths).
  2. I would be really interested to see the instruction log of a worker that is entering this spilling nightmare. This way we could at least verify if the worker is doing the right thing. This may also serve as a decent proxy for the ordering problem.

@crusaderky
Copy link
Contributor

I was under the impression that the queuing PR was supposed to have no impact at all if you leave worker-saturation: .inf? If you take away the new code paths that are exclusively triggered when worker-saturation is a finite number, what changed?

@fjetter
Copy link
Member

fjetter commented Sep 1, 2022

I was under the impression that the queuing PR was supposed to have no impact at all if you leave worker-saturation: .inf? If you take away the new code paths that are exclusively triggered when worker-saturation is a finite number, what changed?

https://github.com/dask/distributed/pull/6614/files/36a60a5e358ea2a5d16597651126ac5892203b01#r949739382

@fjetter
Copy link
Member

fjetter commented Sep 1, 2022

First of all, a bit of magic after running the computation once

def get_stim_logs(dask_worker):
    return list(dask_worker.state.stimulus_log)
stim_logs = client.run(get_stim_logs)

then reconstruct the worker state at a given point in time and have fun!

addr = 'tls://10.0.1.197:37555'
def reconstruct_state(addr, ix_max=0):
    """
    Reconstruct the worker state at a given time ``ix_max``
    """
    ws = WorkerState(threads={}, address=addr, nthreads=2)
    ws.validate = False
    instructions = []
    try:
        log = stim_logs[addr]
        # print(f"Available stimuli: {len(log)}")
        for ix, stim in enumerate(log):
            if ix_max and ix > ix_max:
                break
            instructions.append(
                ws.handle_stimulus(stim)
            )
    except:
        # There is some assertion error during forgetting
        # Story looks as if we tried to forget twice
        pass
    return ws
ws = reconstruct_state(addr)

I reconstructed the worker state for two of the workers and ran the following. The following returns the top priority task on the workers ready heap immediately after a sub task was received.
Assuming task co-assignment works reasonably well, perfect scheduling would require us to have this exact task on the top of our heap since it is very deep in our graph and should run asap. The only exception are things like dataframe-sum-chunk which are even further down the graph. If it is not scheduled asap this means either the priority is off or it needs to fetch dependencies.

The following function inspects the worker state for all ComputeTask messages instructing the worker to compute a sub task. It collects a snapshot of the top of the heap and counts the states of the dependencies

from collections import Counter
def get_top_ready_heap(addr):
    from distributed.worker_state_machine import ComputeTaskEvent
    log = stim_logs[addr]
    sub_stimuli = []
    for ix, stim in enumerate(log):
        if isinstance(stim, ComputeTaskEvent):
            if "sub" in stim.key:
                sub_stimuli.append(ix)
    top_ready = []
    for sub_stim_ix in sub_stimuli:
        ws = reconstruct_state(addr, ix_max=sub_stim_ix)
        if ws.ready:
            stimulus = log[sub_stim_ix]
            sub_ts = ws.tasks[stimulus.key]
            ts = ws.ready.peek()
            missing_deps = Counter({dep.state for dep in sub_ts.dependencies})
            top_ready.append((ts, missing_deps))
    return top_ready

I'm inspecting two workers

The first one is basically the "doesn't us any memory" worker of the above gif. The other one is just a random other worker.

Worker without significant memory

At the very least the first couple of tasks are scheduled perfectly. There are a couple of bad apples but overall this worker is getting great task assignments

[(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 0)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 331)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 330)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 332)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 333)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 334)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 335)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 336)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 337)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 338)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 339)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 340)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 341)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 342)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 343)" ready>, Counter({'memory': 2})),
 (<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 344)" ready>, Counter({'memory': 2})),
 (<TaskState "('dataframe-count-chunk-9fe16c8cf9f65c347b1a644e2de9a32f-4eabd21b86af5c4984f612322e91d13c', 343)" ready>, Counter({'memory': 2})),
...
 # There are also a couple of "bad apples". Another task is scheduled because dependencies need to be fetched
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 370)" ready>,
  Counter({'memory': 1, 'flight': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 370)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 375)" ready>, Counter({'memory': 1, 'fetch': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 375)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 380)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 384)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 386)" ready>, Counter({'fetch': 1, 'memory': 1})),
...

Worker with significant memory

Right from the start, this worker is having a bad time. Every sub computation needs to fetch data and instead the worker schedules another task.

[(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 501)" ready>, Counter({'flight': 1, 'memory': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 501)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 501)" ready>, Counter({'memory': 1, 'fetch': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 405)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 488)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 488)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 535)" ready>, Counter({'memory': 1, 'fetch': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 535)" ready>, Counter({'memory': 1, 'fetch': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 535)" ready>, Counter({'fetch': 1, 'memory': 1})),
 (<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 568)" ready>, Counter({'flight': 1, 'memory': 1})),

Edit: There was a bug in my script showing only a dependency count of one. I accidentally used a set on the states. It's corrected now.

@fjetter
Copy link
Member

fjetter commented Sep 1, 2022

If that's not interesting enough, the worker that is doing really well has the "smallest" IP address if we sort them all lexicographically. The workers dict on the scheduler is a SortedDict https://github.com/dask/distributed/blob/bfc5cfea80450954dba5b87a5858cb2e3bac1833/distributed/scheduler.py#L3327

somewhere in our decision making logic we prefer the "first worker" and we are apparently doing a great job of initial task placement for that worker but this quickly breaks down for other workers.


Edit: There was a bug in my script showing only a dependency count of one. I accidentally used a set on the states.

@gjoseph92
Copy link
Contributor Author

Why are the ready subs not computed asap? There always seems to be a couple of sub ready, i.e. grayed out in the bar. IIUC this is the processing state on scheduler side, i.e. there is already a message queued up to the worker. Do we really believe that scheduler latency is causing this?

This is the main thing I'm curious about.

Here's a screenshot from 2s before the side-by-side I posted. Both versions are at ~740 make-timeseries complete.

Before

Screen Shot 2022-08-31 at 7 19 18 PM

After

Screen Shot 2022-08-31 at 6 50 36 PM

Notice how 'before', 200 subs had already completed. 'after', only 31 have, and no more are in processing yet. I think this is the difference. Even though there are ~200 fewer repartition-merges complete, more of them are still in memory. And fewer of their downstream sub tasks have even been scheduled yet—even though clearly the scheduler knows that they're in memory, since it's displaying on the dashboard.

@gjoseph92
Copy link
Contributor Author

Replaying worker state like this is amazing. We should do this for the commit before and compare though. The difference in subs needing to fetch dependencies vs having them in memory is interesting, but see above—I think the problem might more be that we're traversing the graph in a slightly different order, which is making that so that repartition-merges are piling up which (somehow?) aren't input to subs.

I'm not sure it's only that subs are being scheduled on the "wrong" workers, requiring transfer—maybe they're not even being scheduled until too late?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants