Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler.get_comm_cost a significant portion of runtime in merge benchmarks #6899

Closed
wence- opened this issue Aug 17, 2022 · 10 comments
Closed

Comments

@wence-
Copy link
Contributor

wence- commented Aug 17, 2022

I've been profiling distributed workflows in an effort to understand where there are potential performance improvements to be made (this is ongoing with @gjoseph92 amongst others). I'm particularly interested in scale-out scenarios, where the number of workers is large. As well as that scenario, I've also been looking at cases where the number of works is quite small, but dataframes have many partitions: this produces many tasks at a scale where debugging/profiling is a bit more manageable.

The benchmark setup I have builds two dataframes and then merges them on a key column with a specified matching fraction. Each worker gets P partitions with N rows per partition. I use 8 workers. I'm using cudf dataframes (so the merge itself is fast, which means that I notice sequential overheads sooner).

Attached two speedscope plots (and data) of py-spy based profiling of the scheduler in a scenario with eight workers, P=100, and N=500,000. In a shuffle, the total number of tasks peaks at about 150,000 per the dashboard. The second profile is very noisy since I'm using benfred/py-spy#497 to avoid filtering out python builtins (so that we can see in more detail what is happening). Interestingly, at this scale we don't see much of a pause in GC (but I am happy to try out more scenarios that might be relevant to #4987).

In this scenario, a single merge takes around 90s, if I do the minimal thing of letting Scheduler.get_comm_cost return 0 immediately, this drops to around 50s (using pandas it drops from 170s to around 130s). From the detailed profile, we can see that the majority of this time is spent in set.difference. I'm sure there's a more reasonable fix that isn't quite such a large hammer.

py-spy-scheduler-100-chunks-per-worker

py-spy-scheduler-100-chunks-per-worker-detailed

merge-scheduler-100-chunks-per-worker-no-filter.json.gz
merge-scheduler-100-chunks-per-worker.json.gz

(cc @pentschev, @quasiben, and @rjzamora)

@fjetter
Copy link
Member

fjetter commented Aug 17, 2022

Thats clearly this line

deps: set = ts.dependencies.difference(ws.has_what)

which redirects to

@property
def has_what(self) -> Set[TaskState]:
"""An insertion-sorted set-like of tasks which currently reside on this worker.
All the tasks here are in the "memory" state.
This is the reverse mapping of :attr:`TaskState.who_has`.
This is a read-only public accessor. The data is implemented as a dict without
values, because rebalance() relies on dicts being insertion-sorted.
"""
return self._has_what.keys()

essentially this performs a set difference between an actual set and a key-view of a dictionary. I assume the key view is converted internally to an actual set such that all keys are rehashed. Just guessing here, though

@fjetter
Copy link
Member

fjetter commented Aug 17, 2022

Might actually be better off simply looping here

for dts in ts.dependencies:
    if dts not in ws.has_what:
        nbytes += dts.nbytes

len(ts.dependencies) is typically reasonably small but ws.has_what can be huge, i.e. rehashing is a bad idea but getitem / isin should be fast

@gjoseph92
Copy link
Collaborator

@wence- off-topic, but you might find this handy for sharing speedscope profiles: https://gist.github.com/gjoseph92/7bfed4d5c372c619af03f9d22e260353

@wence-
Copy link
Contributor Author

wence- commented Aug 18, 2022

Might actually be better off simply looping here

for dts in ts.dependencies:
    if dts not in ws.has_what:
        nbytes += dts.nbytes

len(ts.dependencies) is typically reasonably small but ws.has_what can be huge, i.e. rehashing is a bad idea but getitem / isin should be fast

I will try this out.

@gjoseph92
Copy link
Collaborator

It also could be reasonable to have a cutoff for both values, and if they're too large, we switch to some O(1) estimate.

@wence-
Copy link
Contributor Author

wence- commented Aug 22, 2022

OK, here's the result of my experiments (this is with --protocol ucx):

Total rows/worker: 50_000_000 = (ROWS_PER_CHUNK * CHUNKS_PER_WORKER)

Current distributed deps = ts.dependencies - ws.has_what

CHUNKS_PER_WORKER = 1

925.13 ms                 | 12.89 GiB/s
604.21 ms                 | 19.73 GiB/s
521.40 ms                 | 22.86 GiB/s
500.70 ms                 | 23.81 GiB/s
589.96 ms                 | 20.21 GiB/s

CHUNKS_PER_WORKER=10

1.97 s                    | 6.06 GiB/s
1.97 s                    | 6.05 GiB/s
2.08 s                    | 5.73 GiB/s
1.95 s                    | 6.12 GiB/s
2.10 s                    | 5.68 GiB/s

CHUNKS_PER_WORKER=100

88.97 s                   | 137.20 MiB/s
91.80 s                   | 132.97 MiB/s
99.05 s                   | 123.24 MiB/s
93.12 s                   | 131.09 MiB/s
91.67 s                   | 133.17 MiB/s

Removing get_comm_cost (return 0 immediately)

CHUNKS_PER_WORKER = 1

595.50 ms                 | 20.02 GiB/s
523.76 ms                 | 22.76 GiB/s
528.68 ms                 | 22.55 GiB/s
524.09 ms                 | 22.75 GiB/s
613.82 ms                 | 19.42 GiB/s

CHUNKS_PER_WORKER=10

2.08 s                    | 5.72 GiB/s
2.09 s                    | 5.71 GiB/s
2.07 s                    | 5.77 GiB/s
2.11 s                    | 5.64 GiB/s
2.09 s                    | 5.69 GiB/s

CHUNKS_PER_WORKER=100

53.13 s                   | 229.77 MiB/s
49.86 s                   | 244.84 MiB/s
50.96 s                   | 239.54 MiB/s
55.43 s                   | 220.24 MiB/s
51.33 s                   | 237.83 MiB/s

Assuming len(ts.dependencies) << len(ws.has_what)

deps = set(dep for dep in ts.dependencies if dep not in ws.has_what)

CHUNKS_PER_WORKER = 1

1.14 s                    | 10.49 GiB/s
518.40 ms                 | 23.00 GiB/s
647.83 ms                 | 18.40 GiB/s
506.97 ms                 | 23.51 GiB/s
508.15 ms                 | 23.46 GiB/s

CHUNKS_PER_WORKER=10

2.03 s                    | 5.87 GiB/s
2.06 s                    | 5.80 GiB/s
2.15 s                    | 5.55 GiB/s
1.96 s                    | 6.09 GiB/s
2.10 s                    | 5.68 GiB/s

CHUNKS_PER_WORKER=100

52.77 s                   | 231.34 MiB/s
52.40 s                   | 232.94 MiB/s
51.83 s                   | 235.50 MiB/s
53.03 s                   | 230.20 MiB/s
53.24 s                   | 229.26 MiB/s

@gjoseph92
Copy link
Collaborator

@wence- probably should be a separate issue, but I'd be curious to see what the next thing is that pops out on the profile once get_comm_cost is resolved.

@wence-
Copy link
Contributor Author

wence- commented Aug 23, 2022

but I'd be curious to see what the next thing is that pops out on the profile once get_comm_cost is resolved.

sizeof_pandas_dataframe is appreciable. This patch (which is not really safe, but WFM)

diff --git a/dask/sizeof.py b/dask/sizeof.py
index f31b0660e..a36874778 100644
--- a/dask/sizeof.py
+++ b/dask/sizeof.py
@@ -141,10 +141,12 @@ def register_pandas():
     @sizeof.register(pd.DataFrame)
     def sizeof_pandas_dataframe(df):
         p = sizeof(df.index)
-        for name, col in df.items():
-            p += col.memory_usage(index=False)
-            if col.dtype == object:
-                p += object_size(col._values)
+        mgr = df._mgr
+        blocks = mgr.blocks
+        n = len(df)
+        for i in mgr.blknos:
+            dtype = blocks[i].dtype
+            p += n * dtype.itemsize
         return int(p) + 1000
 
     @sizeof.register(pd.Series)

Produces the following results (this is now with the TCP rather than UCX protocol and all pandas dataframes)

chunks_per_worker rows_per_chunk before after sizeof-opt
100 50000 75s 48s 39s
10 500000 9s 9s 9s
1 5000000 8s 8s 8s

@jrbourbeau
Copy link
Member

Just checking in here, did #6931 close this issue, or is there more that folks would like to do?

@wence-
Copy link
Contributor Author

wence- commented Aug 23, 2022

Just checking in here, did #6931 close this issue, or is there more that folks would like to do?

Yes, it did, I'm about to follow up more coherently to @gjoseph92's last query with a separate issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants