Skip to content

Bag repartition partition size#6371

Merged
jrbourbeau merged 25 commits intodask:masterfrom
joshreback:bag-repartition-partition_size
Aug 7, 2020
Merged

Bag repartition partition size#6371
jrbourbeau merged 25 commits intodask:masterfrom
joshreback:bag-repartition-partition_size

Conversation

@joshreback
Copy link
Copy Markdown
Contributor

@joshreback joshreback commented Jul 4, 2020

  • Tests added / passed
  • Passes black dask / flake8 dask

Closes #6197

Hope it's alright, didn't look like anyone was going to take up that issue so I just took a shot at it to get more familiar w/the codebase.

Couple refactors (moving iter_chunks to utils, making repartition_size and repartition_npartitions share code) but generally approach is adapted from how this is implemented for dataframes. Appreciate any feedback!

dask/bag/core.py Outdated

graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[self])
return Bag(graph, name=new_name, npartitions=npartitions)
if sum([npartitions is not None, partition_size is not None]) != 1:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if sum([npartitions is not None, partition_size is not None]) != 1:
if npartitions is not None and partition_size is not None:

Copy link
Copy Markdown

@sroet sroet Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger I don't think that suggestion actually catches the same things as the current sum.

IIUC, the current if is the not-XOR between npartitions is not None and partition_size is not None (letting the it be True when either both or neither are defined). While your suggestions only would catch the occasion when both are defined.

A possible shortening would be :

Suggested change
if sum([npartitions is not None, partition_size is not None]) != 1:
if (npartitions is None) == (partition_size is None):

(using casting is None to bool with the parentheses, the equal operator (==, as that is identical to not-XOR for bool) , removing the not (as that is unnecessary for XOR operations (as it exist on both sides))

I don't have enough knowledge on the codebase of dask to comment on this PR in general.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's right - the purpose of that check is to ensure only one of npartitions and partition_size is set. Checking that both are not none allows for both to be set. @sroets way or not (bool(npartitions) ^ bool(partition_size)) is another.

return _split_partitions(bag, nsplits, new_name)


def repartition_size(bag, size):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to share a lot of code with dask.dataframe.core.repartition_size. Do you see opportunities for deduplication?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good point. At first glance it looks like they could share most of the code in that method. The only differences are:

  • total_mem_usage
  • _split_partitions (the guts of which are the same but the way the return value is built differs, this could be easily handled)
  • the df code uses pandas Series & numpy in different places whereas bag uses functions from toolz...that could probably be handled easily as well.

So I think a way to deduplicate would be to move the repartition_size method to a shared space (is base.py the place for this?). The reason I didn't start w/that is that it seems like it might be awkward to have that one method be shared while it seems like most of the rest of the operations are not. Let me know what you think.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger definitely there is duplicated code b/t bag and dataframe w.r.t. repartitioning. I just am not sure of the best way to make them share code in the existing project structure.

It feels odd to put the shared code in the DaskMethodsMixin (which is already shared b/t bag and dataframe) as that seems to house very generic methods. Putting it in utils, which is also already shared, doesn't seem quite right either since those seem like general purpose methods not tied to needing a dask object.

So I guess a third option would be to put the various partitioning methods in a separate utils file, called partition_utils.py or something like that, which only df and bag import. Does that sound like a good path forward, or am I missing some other way to share the code?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm comfortable with some code duplication here. The complexity of factoring this out might be more than the complexity of duplication given that this is only going to happen twice.

dask/bag/core.py Outdated
Comment on lines +2534 to +2535
def total_mem_usage(bag):
return sys.getsizeof(bag)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to try dask.sizeof.sizeof instead here. I don't think that sys.getsizeof has the behavior that you want.

In [1]: import sys                                                                                 

In [2]: sys.getsizeof(list(range(1000000)))                                                        
Out[2]: 9000120

In [3]: sys.getsizeof({"x": list(range(1000000))})                                                 
Out[3]: 248
In [6]: from dask.sizeof import sizeof                                                             

In [7]: sizeof({"x": list(range(1000000))})                                                        
Out[7]: 37000482

@joshreback joshreback marked this pull request as draft July 14, 2020 12:21
dask/bag/core.py Outdated

graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[self])
return Bag(graph, name=new_name, npartitions=npartitions)
if not bool(npartitions) ^ bool(partition_size):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is a valid argument for npartitions so I think this would be more accurate as:

Suggested change
if not bool(npartitions) ^ bool(partition_size):
if not (npartitions is None) ^ (partition_size is None):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this hasn't been done yet correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW in other places where we're checking that exactly one input is given we use something like:

dask/dask/dataframe/core.py

Lines 1164 to 1178 in 2411b0a

if (
sum(
[
partition_size is not None,
divisions is not None,
npartitions is not None,
freq is not None,
]
)
!= 1
):
raise ValueError(
"Please provide exactly one of ``npartitions=``, ``freq=``, "
"``divisions=``, ``partitions_size=`` keyword arguments"
)

which I personally find more clear.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool - I went with that here.

dask/bag/core.py Outdated
This can be used to reduce or increase the number of partitions
of the bag.
"""
new_name = "repartition-%d-%s" % (npartitions, tokenize(bag, npartitions))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this down a few lines since it might not be needed?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops i thought I addressed that - i moved it down a couple lines as suggested.

@joshreback joshreback force-pushed the bag-repartition-partition_size branch 2 times, most recently from e9dd2b3 to 9635892 Compare July 17, 2020 01:42
@joshreback joshreback marked this pull request as ready for review July 19, 2020 01:26
dask/sizeof.py Outdated
)


@sizeof.register(chain)
Copy link
Copy Markdown
Contributor Author

@joshreback joshreback Jul 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this belongs here (since it doesn't compute the current size of a chain object, but rather how much size the object would take up after compute is run), or in the total_mem_usage method in bag/core.py.

But the point of adding this was to make sure that repartitioning a bag multiple times in a row work...on subsequent repartitions, all the partitions were an itertools.chain object, and dask.sizeof.sizeof didn't have a way to accurately gauge memory usage of that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. That is an interesting question. This seems like the right place to me. The point of the sizeof functions is to keep track of how many bytes the system would need to keep the object in memory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to make sure that this isn't burning a consumable, and that this data will still be around for future use. Probably before we repartition we'll want to reify all iterators into lists anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good point about not consuming the iterator. I decided to try just making a copy of the chain object to preserve the original, and using this same calculation.

@joshreback joshreback marked this pull request as draft July 22, 2020 02:00
@joshreback joshreback force-pushed the bag-repartition-partition_size branch from 33fd1f6 to 2dc48fa Compare July 22, 2020 12:43
@joshreback joshreback marked this pull request as ready for review July 22, 2020 20:03
@joshreback
Copy link
Copy Markdown
Contributor Author

OK i fixed what was making one of the tests flaky.

Separate question - the dask.sizeof method appears to overestimate (by approx 2x) the memory usage for lists (or any sequence). It computes the size of the sequence using getsizeof (which for a list of primitive types produces the right result) and then adds to that an estimate of the size of each of the elements of the list. Is that a bug or is there a reason that it was implemented that way?

@jsignell
Copy link
Copy Markdown
Member

It looks like getsizeof doesn't get the size of objects that are nested within other objects so dask is doing the right thing.

Here are the relevant docs: https://docs.python.org/3/library/sys.html#sys.getsizeof

@joshreback
Copy link
Copy Markdown
Contributor Author

Ah yeah, my bad, that's right. I got mixed up because I forgot integers inherited from object in Python.

@jsignell
Copy link
Copy Markdown
Member

Ok I think this is good to go! @dask/maintenance

dask/bag/core.py Outdated
Comment on lines +2534 to +2535
def total_mem_usage(bag):
return sizeof(bag)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this can be removed and we can just use sizeof instead.

dask/bag/core.py Outdated
# 1. split each partition that is larger than partition size
nsplits = [1 + mem_usage // size for mem_usage in mem_usages]
if any((nsplit > 1 for nsplit in nsplits)):
split_name = "repartition-split-{}-{}".format(size, tokenize(bag))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend that we exclude the full integer size here and instead put that in the tokenization call. These names are used in things like diagnostics and the dashboard. Having values like repartition-split-10248258572 show up in a progress bar is probably not ideal. If you want to include the text version I'm ok with that, but I would probably defer to just tokenizing everything.

@mrocklin
Copy link
Copy Markdown
Member

I added a couple of small comments. I haven't gone into the details of the algorithm though. I'm curious, has anyone tried this out in practice?

@joshreback
Copy link
Copy Markdown
Contributor Author

The failures seem unrelated to the changes here..

@joshreback joshreback force-pushed the bag-repartition-partition_size branch from 6938121 to 96b4673 Compare July 28, 2020 01:16
@joshreback
Copy link
Copy Markdown
Contributor Author

Hi - just wanted to check if there were any further comments or suggestions here?

@jsignell
Copy link
Copy Markdown
Member

Sorry for the delay @joshreback I am going to test this out locally and then it should be good to go!

@jsignell
Copy link
Copy Markdown
Member

Hmm. I am confused by this:

from dask.sizeof import sizeof
import dask.bag as db

b = db.from_sequence([1, 2, 3, 4, 5, 6])
size = sizeof(b)
# 48

new = b.repartition(partition_size=size)
new.npartitions
# 18

@joshreback
Copy link
Copy Markdown
Contributor Author

joshreback commented Jul 31, 2020

Looks like sizeof(b) is just returning the size of the bag object, not the objects inside. Each original partition of b is a 1 element list which is 124 bytes, so it tries to split each of the original 6 partitions into 3 partitions (1 + 124 // 48).

I agree those results are pretty odd looking, but I would guess the situation where someone wants a partition size that's smaller than an indivisible partition is kind of rare (edit: actually i have no idea if that situation comes up a lot, but I'd guess that repartitioning in that way would not that useful in practice?). Not sure what the best way forward is...i could try repartitioning followed by culling partitions that would come out as empty? Let me know if that seems reasonable.

EDIT: From what I can tell, that seems to require calling compute to see which partitions would be empty, and then deleting them, which sounds kind of expensive

@joshreback joshreback force-pushed the bag-repartition-partition_size branch from 8c5f84e to 69a647f Compare August 4, 2020 14:05
@jsignell
Copy link
Copy Markdown
Member

jsignell commented Aug 4, 2020

It sounds like the sizeof for bag is wrong then. I don't have enough context to determine what is sane behavior, but maybe @jrbourbeau does.

@joshreback
Copy link
Copy Markdown
Contributor Author

I don't think there is a sizeof implementation for bag, so it just falls back to sys.getsizeof. The code now just computes the memory usage of a bag by summing the memory usage of each of its partitions (which is how repartition_size is implemented for dataframes - i basically just copied that).

@jsignell
Copy link
Copy Markdown
Member

jsignell commented Aug 6, 2020

Ah! Thanks for pointing that out @joshreback. In that case I think this is good to go!

Copy link
Copy Markdown
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @joshreback! This looks close to done and I'm looking forward to seeing it merge : ) In particular, it was good to see the several tests you added to ensure repartitioning is acting as expected

I've left a few small comments and it looks like there are a couple of comments from @jsignell that are still TODO



def test_repartition_partition_size_complex_dtypes():
import numpy as np
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since NumPy isn't a required dependency for Dask bag, we'll want to skip this test is NumPy isn't installed. Pytest has a convenient pytest.importorskip function for this. Here's an example of it in use:

@pytest.mark.parametrize("npartitions", [1, 10])
def test_non_splittable_reductions(npartitions):
np = pytest.importorskip("numpy")
data = list(range(100))
c = db.from_sequence(data, npartitions=npartitions)
assert_eq(c.mean(), np.mean(data))
assert_eq(c.std(), np.std(data))

dask/bag/core.py Outdated
import operator
import uuid
import warnings
from dask.sizeof import sizeof
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Could you please make this a relative import and move it down a few lines to be with the rest of the Dask imports

dask/dask/bag/core.py

Lines 39 to 63 in 69a647f

from .. import config
from .avro import to_avro
from ..base import tokenize, dont_optimize, DaskMethodsMixin
from ..bytes import open_files
from ..context import globalmethod
from ..core import quote, istask, get_dependencies, reverse_dict, flatten
from ..delayed import Delayed, unpack_collections
from ..highlevelgraph import HighLevelGraph
from ..multiprocessing import get as mpget
from ..optimization import fuse, cull, inline
from ..utils import (
apply,
system_encoding,
takes_multiple_arguments,
funcname,
digit,
insert,
ensure_dict,
ensure_bytes,
ensure_unicode,
key_split,
parse_bytes,
iter_chunks,
)
from . import chunk

For reference, most modules in the codebase follow a pattern of stdlib imports, then third-party imports, then Dask imports. Not a big deal, but just wanted to point out this convention.

dask/bag/core.py Outdated

Notes
-----
Exactly one of `npartitions` or `partition_size` should be specified.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Exactly one of `npartitions` or `partition_size` should be specified.
Exactly one of ``npartitions`` or ``partition_size`` should be specified.

dask/bag/core.py Outdated

graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[self])
return Bag(graph, name=new_name, npartitions=npartitions)
if not bool(npartitions) ^ bool(partition_size):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW in other places where we're checking that exactly one input is given we use something like:

dask/dask/dataframe/core.py

Lines 1164 to 1178 in 2411b0a

if (
sum(
[
partition_size is not None,
divisions is not None,
npartitions is not None,
freq is not None,
]
)
!= 1
):
raise ValueError(
"Please provide exactly one of ``npartitions=``, ``freq=``, "
"``divisions=``, ``partitions_size=`` keyword arguments"
)

which I personally find more clear.

dask/bag/core.py Outdated
Comment on lines +2510 to +2511
if isinstance(bag, chain):
bag = reify(deepcopy(bag))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a small comment on why this isinstance check + deepcopy is needed. That'll help us later when we come across this : )

@joshreback
Copy link
Copy Markdown
Contributor Author

@jsignell my mistake i lost track of a couple of your comments. But i think I have addressed those now + the suggestions from @jrbourbeau. Let me know if I missed anything!

Copy link
Copy Markdown
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work on this @joshreback! This is in

@jrbourbeau jrbourbeau merged commit ff653b9 into dask:master Aug 7, 2020
kumarprabhu1988 pushed a commit to kumarprabhu1988/dask that referenced this pull request Oct 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: Bag Repartition "partition_size" keyword argument

6 participants