Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up and document rearrange_column_by_tasks #4674

Merged
merged 1 commit into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 99 additions & 35 deletions dask/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pandas as pd
import toolz

from .methods import drop_columns
from .core import DataFrame, Series, _Frame, _concat, map_partitions
from .hashing import hash_pandas_object
from .utils import PANDAS_VERSION
Expand Down Expand Up @@ -205,7 +204,6 @@ def shuffle(df, index, shuffle=None, npartitions=None, max_branch=32,
set_index
set_partition
shuffle_disk
shuffle_tasks
"""
if not isinstance(index, _Frame):
index = df._select_columns_or_index(index)
Expand All @@ -219,20 +217,23 @@ def shuffle(df, index, shuffle=None, npartitions=None, max_branch=32,
df3 = rearrange_by_column(df2, '_partitions', npartitions=npartitions,
max_branch=max_branch, shuffle=shuffle,
compute=compute)
df4 = df3.map_partitions(drop_columns, '_partitions', df.columns.dtype)
return df4
del df3['_partitions']
return df3


def rearrange_by_divisions(df, column, divisions, max_branch=None, shuffle=None):
""" Shuffle dataframe so that column separates along divisions """
# Assign target output partitions to every row
partitions = df[column].map_partitions(set_partitions_pre,
divisions=divisions,
meta=pd.Series([0]))
df2 = df.assign(_partitions=partitions)

# Perform shuffle
df3 = rearrange_by_column(df2, '_partitions', max_branch=max_branch,
npartitions=len(divisions) - 1, shuffle=shuffle)
df4 = df3.map_partitions(drop_columns, '_partitions', df.columns.dtype)
return df4
del df3['_partitions']
return df3


def rearrange_by_column(df, col, npartitions=None, max_branch=None,
Expand All @@ -247,8 +248,9 @@ def rearrange_by_column(df, col, npartitions=None, max_branch=None,


class maybe_buffered_partd(object):
"""If serialized, will return non-buffered partd. Otherwise returns a
buffered partd"""
"""
If serialized, will return non-buffered partd. Otherwise returns a buffered partd
"""
def __init__(self, buffer=True, tempdir=None):
self.tempdir = tempdir or config.get('temporary_directory', None)
self.buffer = buffer
Expand All @@ -272,7 +274,14 @@ def __call__(self, *args, **kwargs):


def rearrange_by_column_disk(df, column, npartitions=None, compute=False):
""" Shuffle using local disk """
""" Shuffle using local disk

See Also
--------
rearrange_by_column_tasks:
Same function, but using tasks rather than partd
Has a more informative docstring
"""
if npartitions is None:
npartitions = df.npartitions

Expand Down Expand Up @@ -318,12 +327,40 @@ def rearrange_by_column_disk(df, column, npartitions=None, compute=False):
def rearrange_by_column_tasks(df, column, max_branch=32, npartitions=None):
""" Order divisions of DataFrame so that all values within column align

This enacts a task-based shuffle
This enacts a task-based shuffle. It contains most of the tricky logic
around the complex network of tasks. Typically before this function is
called a new column, ``"_partitions"`` has been added to the dataframe,
containing the output partition number of every row. This function
produces a new dataframe where every row is in the proper partition. It
accomplishes this by splitting each input partition into several pieces,
and then concatenating pieces from different input partitions into output
partitions. If there are enough partitions then it does this work in
stages to avoid scheduling overhead.

Parameters
----------
df: dask.dataframe.DataFrame
column: str
A column name on which we want to split, commonly ``"_partitions"``
which is assigned by functions upstream
max_branch: int
The maximum number of splits per input partition. Defaults to 32.
If there are more partitions than this then the shuffling will occur in
stages in order to avoid creating npartitions**2 tasks
Increasing this number increases scheduling overhead but decreases the
number of full-dataset transfers that we have to make.
npartitions: Optional[int]
The desired number of output partitions

Returns
-------
df3: dask.dataframe.DataFrame

See also:
rearrange_by_column_disk
set_partitions_tasks
shuffle_tasks
See also
--------
rearrange_by_column_disk: same operation, but uses partd
rearrange_by_column: parent function that calls this or rearrange_by_column_disk
shuffle_group: does the actual splitting per-partition
"""
max_branch = max_branch or 32
n = df.npartitions
Expand All @@ -343,33 +380,40 @@ def rearrange_by_column_tasks(df, column, max_branch=32, npartitions=None):

token = tokenize(df, column, max_branch)

start = dict((('shuffle-join-' + token, 0, inp),
(df._name, i) if i < df.npartitions else df._meta)
for i, inp in enumerate(inputs))
start = {('shuffle-join-' + token, 0, inp): (df._name, i) if i < df.npartitions else df._meta
for i, inp in enumerate(inputs)}

for stage in range(1, stages + 1):
group = dict((('shuffle-group-' + token, stage, inp),
(shuffle_group, ('shuffle-join-' + token, stage - 1, inp),
column, stage - 1, k, n))
for inp in inputs)

split = dict((('shuffle-split-' + token, stage, i, inp),
(getitem, ('shuffle-group-' + token, stage, inp), i))
for i in range(k)
for inp in inputs)

join = dict((('shuffle-join-' + token, stage, inp),
(_concat,
[('shuffle-split-' + token, stage, inp[stage - 1],
insert(inp, stage - 1, j)) for j in range(k)]))
for inp in inputs)
group = { # Convert partition into dict of dataframe pieces
('shuffle-group-' + token, stage, inp):
(shuffle_group, ('shuffle-join-' + token, stage - 1, inp), column, stage - 1, k, n)
for inp in inputs
}

split = { # Get out each individual dataframe piece from the dicts
('shuffle-split-' + token, stage, i, inp):
(getitem, ('shuffle-group-' + token, stage, inp), i)
for i in range(k)
for inp in inputs
}

join = { # concatenate those pieces together, with their friends
('shuffle-join-' + token, stage, inp):
(_concat, [
('shuffle-split-' + token, stage, inp[stage - 1], insert(inp, stage - 1, j))
for j in range(k)
])
for inp in inputs
}
groups.append(group)
splits.append(split)
joins.append(join)

end = dict((('shuffle-' + token, i),
('shuffle-join-' + token, stages, inp))
for i, inp in enumerate(inputs))
end = {
('shuffle-' + token, i):
('shuffle-join-' + token, stages, inp)
for i, inp in enumerate(inputs)
}

dsk = toolz.merge(start, end, *(groups + splits + joins))
graph = HighLevelGraph.from_collections('shuffle-' + token, dsk, dependencies=[df])
Expand Down Expand Up @@ -462,6 +506,26 @@ def shuffle_group(df, col, stage, k, npartitions):

The group is determined by their final partition, and which stage we are in
in the shuffle

Parameters
----------
df: DataFrame
col: str
Column name on which to split the dataframe
stage: int
We shuffle dataframes with many partitions we in a few stages to avoid
a quadratic number of tasks. This number corresponds to which stage
we're in, starting from zero up to some small integer
k: int
Desired number of splits from this dataframe
npartition: int
Total number of output partitions for the full dataframe

Returns
-------
out: Dict[int, DataFrame]
A dictionary mapping integers in {0..k} to dataframes such that the
hash values of ``df[col]`` are well partitioned.
"""
if col == '_partitions':
ind = df[col]
Expand Down
8 changes: 4 additions & 4 deletions dask/dataframe/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ def test_shuffle_sort(shuffle):
def test_rearrange(shuffle, scheduler):
df = pd.DataFrame({'x': np.random.random(10)})
ddf = dd.from_pandas(df, npartitions=4)
ddf2 = ddf.assign(y=ddf.x % 4)
ddf2 = ddf.assign(_partitions=ddf.x % 4)

result = rearrange_by_column(ddf2, 'y', max_branch=32, shuffle=shuffle)
result = rearrange_by_column(ddf2, '_partitions', max_branch=32, shuffle=shuffle)
assert result.npartitions == ddf.npartitions
assert set(ddf.dask).issubset(result.dask)

Expand All @@ -260,8 +260,8 @@ def test_rearrange(shuffle, scheduler):
get = dask.base.get_scheduler(scheduler=scheduler)
parts = get(result.dask, result.__dask_keys__())

for i in a.y.drop_duplicates():
assert sum(i in set(part.y) for part in parts) == 1
for i in a._partitions.drop_duplicates():
assert sum(i in set(part._partitions) for part in parts) == 1


def test_rearrange_by_column_with_narrow_divisions():
Expand Down