Skip to content

Change Dask Bag partitioning scheme#10294

Merged
jacobtomlinson merged 5 commits intodask:mainfrom
jacobtomlinson:bag-partitioning
May 24, 2023
Merged

Change Dask Bag partitioning scheme#10294
jacobtomlinson merged 5 commits intodask:mainfrom
jacobtomlinson:bag-partitioning

Conversation

@jacobtomlinson
Copy link
Copy Markdown
Member

@jacobtomlinson jacobtomlinson commented May 17, 2023

in #10291 we discuss two challenges we are seeing with scaling Dask Bag to large numbers of workers (200+). This PR addresses both of them.

Problem 1

The default partitioning scheme of dask.bag.from_sequence limits the number of potential tasks at 199 (if the number of items in the bag is 199) and trends towards 100.

image

As discussed with @fjetter it would be nice if this default scheme continued to increase the number of tasks, just at a non-exponential rate.

This change uses math.sqrt to grow the number of tasks a the rate of a square root.

This allows any number of workers to be saturated with tasks given enough items in the bag.

image

Problem 2

Setting the number of partitions equal to the number of workers feels like it would be a good way to explicitly ensure Dask Bag scales to utilize all workers. However, due to the rounding done in the current scheme, this doesn't actually saturate all of the workers unless nitems % workers == 0.

image

This change switches the partition_size calculation to use floor instead of ceil to ensure that the number of tasks overshoots the desired partition size and trends towards the desired size, rather than undershooting by up to 50%. It feels better to have too many tasks rather than not enough tasks.

image

@jacobtomlinson jacobtomlinson marked this pull request as draft May 17, 2023 14:32
@jacobtomlinson
Copy link
Copy Markdown
Member Author

Marking as draft while I look at the test failures.

evens = db.from_sequence(range(0, hi, 2), npartitions=npartitions)
odds = db.from_sequence(range(1, hi, 2), npartitions=npartitions)
pairs = db.zip(evens, odds)
assert pairs.npartitions == npartitions
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion was only true because the length of the bag happened to be a multiple of 100. Updated it to be a little more robust.

Comment on lines +1775 to +1778
if len(seq) <= 100:
partition_size = int(math.ceil(len(seq) / npartitions))
else:
partition_size = max(1, int(math.floor(len(seq) / npartitions)))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tweaked this so that the new heuristic is only applied on bags with more than 100 items, similar to automatic partitioning. We could remove this and just use floor/max for everything but a number of tests would need to be changed as it would break some assumptions we have with small bags today.

@jacobtomlinson
Copy link
Copy Markdown
Member Author

cc @rjzamora

Copy link
Copy Markdown
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ping here @jacobtomlinson!

I spent some time thinking about this today, and found myself surprisingly distressed by the sawtooth behavior this PR is addressing.

My personal opinion is that the output partition count should always satisfy two criteria:

  1. It should be exactly equal to npartitions when the argument is not None, and is <=len(seq).
  2. When neither npartitions nor partition_size is specified, we should calculate a default npartitions value that monotonically increases with respect to len(seq).

For example, I believe the following code (while a bit rough) will satisfy both of these criteria:

    remainder = 0
    seq = list(seq)
    seq_len = len(seq)
    if partition_size is None:

        if npartitions is None:
            if seq_len <= 100:
                npartitions = seq_len
            else:
                npartitions = math.floor(100.0 * math.sqrt(seq_len/ 100.0))
            npartitions = max(1, npartitions)
        
        partition_size = int(math.floor(seq_len / npartitions))
        remainder = seq_len % npartitions

    if remainder:
        start_size = partition_size + 1
        start_count = start_size * remainder
        parts = list(partition_all(start_size, seq[:start_count])) + list(partition_all(partition_size, seq[start_count:]))
    else:
        parts = list(partition_all(partition_size, seq))

@jacobtomlinson
Copy link
Copy Markdown
Member Author

Thanks @rjzamora I did look into trying to achieve that but it does mean that partitions will be different sizes. I don't know if that is a problem or not.

With your proposed code problem 1 becomes:

image

And problem 2 becomes:

image

However, if you specify something like npartitions=200 for a bag with 300 items in you get 100 partitions with 2 items and 100 partitions with 1 item. Are we ok with this?

In [1]: import dask.bag as db
   ...: from collections import Counter
   ...: Counter(db.from_sequence(range(300), npartitions=200).map_partitions(len).compute())
Out[1]: Counter({2: 100, 1: 100})

I feel like intuitively from dask.array I would expect chunks to be equal an shape apart from the last one. But maybe that's me mapping my dask.array assumptions onto dask.bag and that doesn't make sense here. WDTY?

@fjetter
Copy link
Copy Markdown
Member

fjetter commented May 24, 2023

However, if you specify something like npartitions=200 for a bag with 300 items in you get 100 partitions with 2 items and 100 partitions with 1 item. Are we ok with this?

If the user specifies this, sure.

I feel like intuitively from dask.array I would expect chunks to be equal an shape apart from the last one. But maybe that's me mapping my dask.array assumptions onto dask.bag and that doesn't make sense here. WDTY?

I think this is not a hard requirement just the ordinary case. I think you can reshape/rechunk an array to have all sorts of weird shapes.

And problem 2 becomes:

I'm a bit confused why your plot ends at 200. Shouldn't this also grow without bounds?


I agree with @rjzamora that npartitions should be respected exactly if possible. I think there was some kind of misunderstanding in #10291
However, I think we're debating two different problems here. The one problem is the sawtooth growth and the other one is the actually harmful default.
I'd be fine if we fixed both separately. I think the current PR fixes the harmful default behavior but doesn't fix the awkward sawtooth growth. We can follow up with this but the changes as proposed would already fix @jacobtomlinson problem, wouldn't they?

@jacobtomlinson
Copy link
Copy Markdown
Member Author

If the user specifies this, sure.

👍

I'm a bit confused why your plot ends at 200

That plot is the case where the user specifies 200 partitions. So if there are only 10 items in the bag it will have 10 partitions, nothing we can do about that. But when they get above 200 items then it plateaus there because that's what the user specified.

I think the current PR fixes the harmful default behavior but doesn't fix the awkward sawtooth growth.

I agree I think we should merge this as is, then follow up with another PR with @rjzamora's suggestion.

@rjzamora
Copy link
Copy Markdown
Member

Yes. Agree that this PR is an improvement “as is”, and that my suggestions can be considered in a follow-up.

@jacobtomlinson
Copy link
Copy Markdown
Member Author

Thanks @rjzamora @fjetter!

In that case, I'll go ahead and merge.

@jacobtomlinson jacobtomlinson merged commit de2278c into dask:main May 24, 2023
@jacobtomlinson jacobtomlinson deleted the bag-partitioning branch May 24, 2023 12:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dask Bag limits parallelism when scaling beyond 100-200 workers

3 participants