Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise scheduler.get_comm_cost set difference #6931

Merged
merged 1 commit into from
Aug 23, 2022

Conversation

wence-
Copy link
Contributor

@wence- wence- commented Aug 22, 2022

Computing set(A).difference(B) is O(max(len(A), len(B))). When
estimating the communication cost of a task's dependencies it is usual
that the number of dependencies (A) will be small but the number of
tasks the worker has (B) is large. In this case it is better to
manually construct the set difference by iterating over A and checking
if each element is in B.

Performing a left.merge(right, on="key", how="inner) of a distributed
dataframe with eight workers with chunks_per_worker * rows_per_chunk
held constant, I observe the following timings using the tcp
communication protocol:

chunks_per_worker rows_per_chunk before after
100 50000 75s 48s
10 500000 ~9s ~9s
1 5000000 ~8s ~8s
  • Tests added / passed
  • Passes pre-commit run --all-files

@wence-
Copy link
Contributor Author

wence- commented Aug 22, 2022

This is one simple go at addressing #6899.

@github-actions
Copy link
Contributor

github-actions bot commented Aug 22, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±0         15 suites  ±0   6h 44m 23s ⏱️ - 1m 14s
  3 041 tests ±0    2 954 ✔️  - 1    84 💤 +1  3 ±0 
22 493 runs  ±0  21 515 ✔️  - 1  975 💤 +1  3 ±0 

For more details on these failures, see this check.

Results for commit ce87313. ± Comparison against base commit 2a2c3bb.

♻️ This comment has been updated with latest results.

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a simple and sensible improvement!

Just the highly-flaky #6896, so I think we could merge this.

Comment on lines +2625 to +2634
if 10 * len(ts.dependencies) < len(ws.has_what):
# In the common case where the number of dependencies is
# much less than the number of tasks that we have,
# construct the set of deps that require communication in
# O(len(dependencies)) rather than O(len(has_what)) time.
# Factor of 10 is a guess at the overhead of explicit
# iteration as opposed to just calling set.difference
deps = {dep for dep in ts.dependencies if dep not in ws.has_what}
else:
deps = ts.dependencies.difference(ws.has_what)
Copy link
Member

@fjetter fjetter Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micro benchmarking this, I get a factor of ~2 rather than 10

for dict_size in [100, 1_000, 10_000, 100_000, 1_000_000]:
    a_large_dict = {
        f"{ix}-{uuid.uuid4()}": "foo"
        for ix in range(dict_size)
    }    
    def timing(func):
        start = time.time_ns()
        iterations = 10
        for iteration in range(iterations):
            func()
        end = time.time_ns()
        return (end-start)/iterations
        
    for factor in [0.1, 0.4, 0.45, 0.5]:
        small_set = set(sample(a_large_dict.keys(), int(factor * dict_size)))
        intersect = timing(lambda: small_set.intersection(a_large_dict))
        iterate = timing(lambda: {k for k in small_set if k in a_large_dict})
        if iterate < intersect:
            print(f"Iterating faster for {dict_size=} and {factor=}")
Iterating faster for dict_size=100 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.4
Iterating faster for dict_size=1000 and factor=0.5
Iterating faster for dict_size=10000 and factor=0.1
Iterating faster for dict_size=10000 and factor=0.4
Iterating faster for dict_size=10000 and factor=0.45
Iterating faster for dict_size=100000 and factor=0.1
Iterating faster for dict_size=100000 and factor=0.4
Iterating faster for dict_size=100000 and factor=0.45
Iterating faster for dict_size=100000 and factor=0.5
Iterating faster for dict_size=1000000 and factor=0.1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversely, on my (admittedly slightly antediluvian) Broadwell box, Python 3.9.13

Iterating faster for dict_size=100 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.4
Iterating faster for dict_size=10000 and factor=0.1
Iterating faster for dict_size=100000 and factor=0.1
Iterating faster for dict_size=1000000 and factor=0.1

Computing set(A).difference(B) is O(max(len(A), len(B))). When
estimating the communication cost of a task's dependencies it is usual
that the number of dependencies (A) will be small but the number of
tasks the worker has (B) is large. In this case it is better to
manually construct the set difference by iterating over A and checking
if each element is in B.

Performing a left.merge(right, on="key", how="inner) of a distributed
dataframe with eight workers with chunks_per_worker * rows_per_chunk
held constant, I observe the following timings using the tcp
communication protocol:

| chunks_per_worker | rows_per_chunk | before | after |
|-------------------|----------------|--------|-------|
| 100               | 50000          | 75s    | 48s   |
| 10                | 500000         | ~9s    | ~9s   |
| 1                 | 5000000        | ~8s    | ~8s   |
@wence-
Copy link
Contributor Author

wence- commented Aug 23, 2022

Updated commit message/summary for timings with tcp rather than UCX comms protocol (otherwise no change in the force push).

I can adapt the heuristic for when to select between the two options but as above, the threshold varies depending on hardware.

@wence-
Copy link
Contributor Author

wence- commented Aug 23, 2022

I can adapt the heuristic for when to select between the two options but as above, the threshold varies depending on hardware.

In my benchmarking of the workflow, a factor of 10 or 2 didn't really make a difference, I guess because the dependencies set is really much smaller.

@fjetter
Copy link
Member

fjetter commented Aug 23, 2022

Thanks for checking about the factor again. I guess you are right and that's good enough

@fjetter fjetter merged commit c15a10e into dask:main Aug 23, 2022
@wence- wence- deleted the wence/get-comm-cost-opt branch August 23, 2022 14:58
gjoseph92 pushed a commit to gjoseph92/distributed that referenced this pull request Oct 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants