Skip to content

Commit

Permalink
Merge 01f9dcb into 2f308e7
Browse files Browse the repository at this point in the history
  • Loading branch information
gsakkis committed May 24, 2019
2 parents 2f308e7 + 01f9dcb commit 3062ce9
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 47 deletions.
202 changes: 157 additions & 45 deletions dask/dataframe/core.py
Expand Up @@ -23,7 +23,7 @@
from .. import array as da
from .. import core

from ..utils import partial_by_order, Dispatch, IndexCallable
from ..utils import parse_bytes, partial_by_order, Dispatch, IndexCallable
from .. import threaded
from ..compatibility import (apply, operator_div, bind_method, string_types,
Iterator, Sequence)
Expand Down Expand Up @@ -966,17 +966,28 @@ def partitions(self):

# Note: iloc is implemented only on DataFrame

def repartition(self, divisions=None, npartitions=None, freq=None, force=False):
def repartition(self, divisions=None, npartitions=None, partition_size=None,
freq=None, force=False):
""" Repartition dataframe along new divisions
Parameters
----------
divisions : list, optional
List of partitions to be used. If specified npartitions will be
ignored.
List of partitions to be used. Only used if npartitions and
partition_size isn't specified.
npartitions : int, optional
Number of partitions of output. Only used if divisions isn't
specified.
Number of partitions of output. Only used if partition_size
isn't specified.
partition_size: int or string, optional
Max number of bytes of memory for each partition. Use numbers or
strings like 5MB. If specified npartitions and divisions will be
ignored.
.. warning::
This keyword argument triggers computation to determine
the memory size of each partition, which may be expensive.
freq : str, pd.Timedelta
A period on which to partition timeseries data like ``'7D'`` or
``'12h'`` or ``pd.Timedelta(hours=12)``. Assumes a datetime index.
Expand All @@ -985,25 +996,37 @@ def repartition(self, divisions=None, npartitions=None, freq=None, force=False):
If False then the new divisions lower and upper bounds must be
the same as the old divisions.
Notes
-----
Exactly one of `divisions`, `npartitions`, `partition_size`, or `freq`
should be specified. A ``ValueError`` will be raised when that is
not the case.
Examples
--------
>>> df = df.repartition(npartitions=10) # doctest: +SKIP
>>> df = df.repartition(divisions=[0, 5, 10, 20]) # doctest: +SKIP
>>> df = df.repartition(freq='7d') # doctest: +SKIP
"""
if npartitions is not None and divisions is not None:
warnings.warn("When providing both npartitions and divisions to "
"repartition only npartitions is used.")
if sum([
partition_size is not None,
divisions is not None,
npartitions is not None,
freq is not None,
]) != 1:
raise ValueError(
"Please provide exactly one of ``npartitions=``, ``freq=``, "
"``divisisions=``, ``partitions_size=`` keyword arguments"
)

if npartitions is not None:
if partition_size is not None:
return repartition_size(self, partition_size)
elif npartitions is not None:
return repartition_npartitions(self, npartitions)
elif divisions is not None:
return repartition(self, divisions, force=force)
elif freq is not None:
return repartition_freq(self, freq=freq)
else:
raise ValueError(
"Provide either divisions= or npartitions= to repartition")

@derived_from(pd.DataFrame)
def fillna(self, value=None, method=None, limit=None, axis=None):
Expand Down Expand Up @@ -4544,6 +4567,70 @@ def repartition_freq(df, freq=None):
return df.repartition(divisions=divisions)


def repartition_size(df, size):
"""
Repartition dataframe so that new partitions have approximately `size` memory usage each
"""
if isinstance(size, string_types):
size = parse_bytes(size)
size = int(size)

mem_usages = df.map_partitions(total_mem_usage).compute()

# 1. split each partition that is larger than partition_size
nsplits = 1 + mem_usages // size
if np.any(nsplits > 1):
split_name = 'repartition-split-{}-{}'.format(size, tokenize(df))
df = _split_partitions(df, nsplits, split_name)
# update mem_usages to account for the split partitions
split_mem_usages = []
for n, usage in zip(nsplits, mem_usages):
split_mem_usages.extend([usage / n] * n)
mem_usages = pd.Series(split_mem_usages, dtype=mem_usages.dtype)

# 2. now that all partitions are less than size, concat them up to size
assert np.all(mem_usages <= size)
new_npartitions = list(map(len, iter_chunks(mem_usages, size)))
new_partitions_boundaries = np.cumsum(new_npartitions)
new_name = 'repartition-{}-{}'.format(size, tokenize(df))
return _repartition_from_boundaries(df, new_partitions_boundaries, new_name)


def total_mem_usage(df):
mem_usage = df.memory_usage(deep=True)
if is_series_like(mem_usage):
mem_usage = mem_usage.sum()
return mem_usage


def iter_chunks(sizes, max_size):
"""Split sizes into chunks of total max_size each
Parameters
----------
sizes : iterable of numbers
The sizes to be chunked
max_size : number
Maximum total size per chunk.
It must be greater or equal than each size in sizes
"""
chunk, chunk_sum = [], 0
iter_sizes = iter(sizes)
size = next(iter_sizes, None)
while size is not None:
assert size <= max_size
if chunk_sum + size <= max_size:
chunk.append(size)
chunk_sum += size
size = next(iter_sizes, None)
else:
assert chunk
yield chunk
chunk, chunk_sum = [], 0
if chunk:
yield chunk


def repartition_npartitions(df, npartitions):
""" Repartition dataframe to a smaller number of partitions """
new_name = 'repartition-%d-%s' % (npartitions, tokenize(df))
Expand All @@ -4553,18 +4640,7 @@ def repartition_npartitions(df, npartitions):
npartitions_ratio = df.npartitions / npartitions
new_partitions_boundaries = [int(new_partition_index * npartitions_ratio)
for new_partition_index in range(npartitions + 1)]
dsk = {}
for new_partition_index in range(npartitions):
value = (methods.concat,
[(df._name, old_partition_index) for old_partition_index in
range(new_partitions_boundaries[new_partition_index],
new_partitions_boundaries[new_partition_index + 1])])
dsk[new_name, new_partition_index] = value
divisions = [df.divisions[new_partition_index]
for new_partition_index in new_partitions_boundaries]

graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[df])
return new_dd_object(graph, new_name, df._meta, divisions)
return _repartition_from_boundaries(df, new_partitions_boundaries, new_name)
else:
original_divisions = divisions = pd.Series(df.divisions)
if (df.known_divisions and (np.issubdtype(divisions.dtype, np.datetime64) or
Expand Down Expand Up @@ -4593,26 +4669,62 @@ def repartition_npartitions(df, npartitions):

return df.repartition(divisions=divisions)
else:
ratio = npartitions / df.npartitions
split_name = 'split-%s' % tokenize(df, npartitions)
dsk = {}
last = 0
j = 0
for i in range(df.npartitions):
new = last + ratio
if i == df.npartitions - 1:
k = npartitions - j
else:
k = int(new - last)
dsk[(split_name, i)] = (split_evenly, (df._name, i), k)
for jj in range(k):
dsk[(new_name, j)] = (getitem, (split_name, i), jj)
j += 1
last = new

divisions = [None] * (npartitions + 1)
graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[df])
return new_dd_object(graph, new_name, df._meta, divisions)
div, mod = divmod(npartitions, df.npartitions)
nsplits = [div] * df.npartitions
nsplits[-1] += mod
return _split_partitions(df, nsplits, new_name)


def _repartition_from_boundaries(df, new_partitions_boundaries, new_name):
if not isinstance(new_partitions_boundaries, list):
new_partitions_boundaries = list(new_partitions_boundaries)
if new_partitions_boundaries[0] > 0:
new_partitions_boundaries.insert(0, 0)
if new_partitions_boundaries[-1] < df.npartitions:
new_partitions_boundaries.append(df.npartitions)
dsk = {}
for i, (start, end) in enumerate(zip(new_partitions_boundaries, new_partitions_boundaries[1:])):
dsk[new_name, i] = (methods.concat, [(df._name, j) for j in range(start, end)])
divisions = [df.divisions[i] for i in new_partitions_boundaries]
graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[df])
return new_dd_object(graph, new_name, df._meta, divisions)


def _split_partitions(df, nsplits, new_name):
""" Split a Dask dataframe into new partitions
Parameters
----------
df: DataFrame or Series
nsplits: List[int]
Number of target dataframes for each partition
The length of nsplits should be the same as df.npartitions
new_name: str
See Also
--------
repartition_npartitions
repartition_size
"""
if len(nsplits) != df.npartitions:
raise ValueError('nsplits should have len={}'.format(df.npartitions))

dsk = {}
split_name = 'split-{}'.format(tokenize(df, nsplits))
j = 0
for i, k in enumerate(nsplits):
if k == 1:
dsk[new_name, j] = (df._name, i)
j += 1
else:
dsk[split_name, i] = (split_evenly, (df._name, i), k)
for jj in range(k):
dsk[new_name, j] = (getitem, (split_name, i), jj)
j += 1

divisions = [None] * (1 + sum(nsplits))
graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[df])
return new_dd_object(graph, new_name, df._meta, divisions)


def repartition(df, divisions=None, force=False):
Expand Down
37 changes: 35 additions & 2 deletions dask/dataframe/tests/test_dataframe.py
Expand Up @@ -17,7 +17,7 @@
from dask.utils import put_lines, M

from dask.dataframe.core import (repartition_divisions, aca, _concat, Scalar,
has_parallel_type)
has_parallel_type, iter_chunks, total_mem_usage)
from dask.dataframe import methods
from dask.dataframe.utils import (assert_eq, make_meta, assert_max_deps,
PANDAS_VERSION)
Expand Down Expand Up @@ -1543,7 +1543,7 @@ def test_repartition_on_pandas_dataframe():
@pytest.mark.parametrize('use_index', [True, False])
@pytest.mark.parametrize('n', [1, 2, 4, 5])
@pytest.mark.parametrize('k', [1, 2, 4, 5])
@pytest.mark.parametrize('dtype', [int, float, 'M8[ns]'])
@pytest.mark.parametrize('dtype', [float, 'M8[ns]'])
@pytest.mark.parametrize('transform', [lambda df: df, lambda df: df.x])
def test_repartition_npartitions(use_index, n, k, dtype, transform):
df = pd.DataFrame({'x': [1, 2, 3, 4, 5, 6] * 10,
Expand All @@ -1558,6 +1558,30 @@ def test_repartition_npartitions(use_index, n, k, dtype, transform):
assert all(map(len, parts))


@pytest.mark.parametrize('use_index', [True, False])
@pytest.mark.parametrize('n', [1, 5])
@pytest.mark.parametrize('partition_size', ['1kiB'])
@pytest.mark.parametrize('transform', [lambda df: df, lambda df: df.x])
def test_repartition_partition_size(use_index, n, partition_size, transform):
df = pd.DataFrame({'x': [1, 2, 3, 4, 5, 6] * 10,
'y': list('abdabd') * 10},
index=pd.Series([10, 20, 30, 40, 50, 60] * 10))
df = transform(df)
a = dd.from_pandas(df, npartitions=n, sort=use_index)
b = a.repartition(partition_size=partition_size)
assert_eq(a, b, check_divisions=False)
assert np.alltrue(b.map_partitions(total_mem_usage).compute() <= 1024)
parts = dask.get(b.dask, b.__dask_keys__())
assert all(map(len, parts))


def test_iter_chunks():
sizes = [14, 8, 5, 9, 7, 9, 1, 19, 8, 19]
assert list(iter_chunks(sizes, 19)) == [[14], [8, 5], [9, 7], [9, 1], [19], [8], [19]]
assert list(iter_chunks(sizes, 28)) == [[14, 8, 5], [9, 7, 9, 1], [19, 8], [19]]
assert list(iter_chunks(sizes, 67)) == [[14, 8, 5, 9, 7, 9, 1], [19, 8, 19]]


def test_repartition_npartitions_same_limits():
df = pd.DataFrame({'x': [1, 2, 3]},
index=[pd.Timestamp('2017-05-09 00:00:00.006000'),
Expand Down Expand Up @@ -1632,6 +1656,15 @@ def test_repartition_freq_month():
assert 2 < ddf.npartitions <= 6


def test_repartition_input_errors():
df = pd.DataFrame({'x': [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
with pytest.raises(ValueError):
ddf.repartition(npartitions=5, divisions=[None, None])
with pytest.raises(ValueError):
ddf.repartition(npartitions=5, partition_size='5MiB')


def test_embarrassingly_parallel_operations():
df = pd.DataFrame({'x': [1, 2, 3, 4, None, 6], 'y': list('abdabd')},
index=[10, 20, 30, 40, 50, 60])
Expand Down

0 comments on commit 3062ce9

Please sign in to comment.