WIP: Feature/dataframe aggregate (implements #1619)#1678
WIP: Feature/dataframe aggregate (implements #1619)#1678mrocklin merged 23 commits intodask:masterfrom chmp:feature/dataframe-aggregate
Conversation
dask/dataframe/groupby.py
Outdated
| next_id, simple_impl[func]) | ||
|
|
||
| elif func == 'var': | ||
| raise NotImplementedError() |
There was a problem hiding this comment.
The functions in dask/dataframe/groupby.py like _var_chunk may be of value here.
There was a problem hiding this comment.
Thanks, for the pointer. I will probably reimplement it to be able to reuse intermediates across different reductions, if this is ok for you?
There was a problem hiding this comment.
As long as single-groupby-aggregation performance isn't seriously affected or made significanty more complex then, yes, modifying existing code sounds great.
dask/dataframe/groupby.py
Outdated
|
|
||
|
|
||
| AggArgs = collections.namedtuple('AggArgs', ['chunk_funcs', 'aggregate_funcs', | ||
| 'finalizers']) |
There was a problem hiding this comment.
Would a dictionary do ok here? Most dask development has relatively few custom classes. Namedtuples are pretty benign, I agree, but the majority of the codebase is pretty barebones.
There was a problem hiding this comment.
I'll replace the namedtuple with a dict.
dask/dataframe/groupby.py
Outdated
| partial(_apply_func_to_column, int_sum, int_sum, M.sum), | ||
| partial(_apply_func_to_column, int_count, int_count, M.sum) | ||
| ], | ||
| finalizers=[(result_column, partial(_finalize_mean, int_sum, int_count))], |
There was a problem hiding this comment.
There is a non-trivial cost to using partial functions in a distributed setting. They are harder to serialize and harder to reason about when people need to debug the graphs themselves (which sometimes core developers do need to do).
This isn't a blocking request (fine if it's too hard) but if its possible to put these arguments directly into the graph rather than implicitly with a partial then that would be better.
There was a problem hiding this comment.
Thanks for the explanation. To be honest, I was wondering when reading the dask code base about this design choice, makes perfect sense.
I'll put the arguments directly into the graph. My proposal would be to reverse the arguments to _apply_func_to_column, convert all partial arguments to keyword arguments, and to replace the partials with pairs of function and kwargs.
dask/dataframe/tests/test_groupby.py
Outdated
| assert eq(pdf.groupby('a').agg(spec), ddf.groupby('a').agg(spec)) | ||
|
|
||
| spec = 'mean' | ||
| assert eq(pdf.groupby('a').agg(spec), ddf.groupby('a').agg(spec)) |
There was a problem hiding this comment.
It might be nice to put all of the specs into a list and then loop over them. Or perhaps use a @pytest.parametrize testing fixture.
There was a problem hiding this comment.
I will go with pytest.parametrize (and fix the failing tests due to the recent changes on master).
| 'max': (M.max, M.max), | ||
| 'count': (M.count, M.sum), | ||
| 'size': (M.size, M.sum), | ||
| } |
There was a problem hiding this comment.
You might want to see the very recent work of tree reductions here: #1663
This ends up having a significant performance impact for some groupbys. It would be nice (eventually) to have the same logic in aggregate. This would require adding another intermediate combine function to each of these tuples.
There was a problem hiding this comment.
Perfect, I wasn't aware that it has been already merged. The dask development has quite a rapid pace to it, if I may say.
I'll rebase and add tree reductions to aggregate.
There was a problem hiding this comment.
I'm wondering: after the rebase, aren't I using tree reductions already? Since I do not set the split_every=False in apply_concat_apply, the tree reduction code should be run as is. In any case, I will expose the split_every argument on aggregate and double-check whether the default of combine==aggregate works for the reductions used in agg.
There was a problem hiding this comment.
Right, according to the docstring it looks like the intermediate combine function defaults to the final aggregate function if none is provided. @jcrist thoughts on this default? An alternative default would be to use split_every=False if users don't provide a combine function, which might be safer for user-defined functions.
Regardless, I think it would be safer here to explicitly include a combine function if one is known, so for example:
# 'count': (M.count, M.sum) # before
'count': (M.count, M.sum, M.sum) # afterThere was a problem hiding this comment.
I went with that default because it matches the interface of tree reductions in bag and array. Many of the reductions defined here also have identical aggregate and combine functions, so it's slightly more concise to use this default :). My thought is that custom reductions are advanced enough that we can but the burden on users to properly use this method.
There was a problem hiding this comment.
For all aggregation supported combine == aggregate. While, I would be happy to add the corresponding tuple elements, dictionary entries in the _build_ ... functions, I feel it would needlessly clutter the code. However, I think adding the corresponding keyword arguments for apply_concat_apply would make the assumption of combine == aggregate explicit.
|
I rebased to master and addressed your comments (namedtuples, partial, pytest parametrize). Further, I changed the semantics of the the funcs argument in Next, I will tackle tree reductions. |
dask/dataframe/groupby.py
Outdated
| # generated it is reused in subsequent id requests. | ||
| ids = it.count() | ||
| # TODO: add func, input_column to intermediate name for easier debugging | ||
| id_map = collections.defaultdict(lambda: 'intermediate-{}'.format(next(ids))) |
There was a problem hiding this comment.
Currently, the skipping of recomputation is implemented by a somewhat arcane combination of itertools.count and defaultdict. What do you think about implementing it as a separate class? This would also make it simpler to generate easy to interpret intermediate names.
There was a problem hiding this comment.
Are these names used as keys in the task graph or just as column names? Is there a convenient deterministic way to create these names?
Generally I avoid classes, probably somewhat irrationally.
There was a problem hiding this comment.
Rightly so: now that I think about it, this functionality can also be easily implemented with a closure.
The keys are only used as column names in intermediate data frames.
With regard to generating these names deterministically. I am somewhat certain that prefixing the repr of the column key by the function should be a unique key, as long as the separator is not used inside the function name. However, I thought better to make sure by adding a unique component to the name.
A possible implementation could be:
def _make_agg_id_factory():
integers = it.count()
ids = {}
def factory(func, column):
try:
return ids[func, column]
except KeyError:
new_id = '{}-{!s}-{!r}'.format(next(integers), func, column)
return ids.setdefault((func, column), new_id)
return factory|
Regarding the failed travis check: I cannot' see any place where I would have changed code that is relevant to that test. Are there any issues known? Maybe it is some very unlikely combination of the random number generated. Update: it seems to be a stochastic failure of the |
| assert len(no_mean_aggs) == len(with_mean_aggs) | ||
|
|
||
| assert len(no_mean_finalizers) == len(no_mean_spec) | ||
| assert len(with_mean_finalizers) == len(with_mean_spec) |
There was a problem hiding this comment.
It would be nice to check some additional things about the graphs produced here:
- Do they work well with
split_every. Thedask.dataframe.utils.assert_max_depsfunction might be useful here - Do they produce deterministic key names. Is
df.groupby(...).aggregate(...).dask == df.groupby(...).aggregate(...).daskfor equivalent values of...? - Are intermediates shared? Perhaps by checking that the lengths of the resulting dask graphs are shorter than one might expect from a naive handling.
There was a problem hiding this comment.
I will add these tests. Check 3 should always be satisfied. Irrespective of the chosen aggregations only a single dataframe will be generated per partition. Therefore, the number of elements inside the dask graphs should always be same. Only the columns added to the dataframes do depend on the argument to aggregate.
There was a problem hiding this comment.
Check 3 should always be satisfied. Irrespective of the chosen aggregations only a single dataframe will be generated per partition. Therefore, the number of elements inside the dask graphs should always be same.
Very cool. Besides being more convenient I anticipate that this will have some very nice performance benefits :)
|
|
||
| assert_eq(pdf.groupby(['a', 'd']).agg(spec), | ||
| ddf.groupby(['a', 'd']).agg(spec)) | ||
|
|
There was a problem hiding this comment.
After adapting the tests. I found a couple of problems:
- using series as groupby arguments fails miserably
- pandas is using additional magic where it tries to cast the result back to its original type. The relevant code can be found in _GroupBy._try_cast. (see also Intermittent failure in pivot_table test #1685). While it is definitely possible to ensure the final data frame has the correct types after
.compute(). Such a step would also mean, that there is no way of knowing the correct meta object before performing the actual aggregation .groupby(...).agg('size')returns a series for inconsistence sake
There was a problem hiding this comment.
My recent commits address issues 1 and 3. For Issue 2 however, i.e., the recasting of the result, I don't see any way to fix it.
dask/dataframe/groupby.py
Outdated
| else: | ||
| levels = 0 | ||
|
|
||
| # TODO: add normed spec as an additional part to the token? |
There was a problem hiding this comment.
Are there any specifics I need to watch out when generating the token or does apply-concat-apply ensures that the resulting dask keys are unique?
There was a problem hiding this comment.
The apply_concat_apply function should handle things here
| index = list(index.columns) | ||
|
|
||
| self.index = index | ||
| self.index = _normalize_index(df, index) |
There was a problem hiding this comment.
Here, I added support for lists of series as a grouper, which was previously not supported. This logic is required to handle df.groupby([df['a'], df['b']]) consistent with pandas. The recursion in _normalize_index is not truly necessary (since lists of lists should not be valid groupers), but it was easiest way to implement it.
dask/dataframe/groupby.py
Outdated
| meta = _aggregate_meta(self.obj._meta, meta_groupby, 0, chunk_funcs, | ||
| aggregate_funcs, finalizers) | ||
|
|
||
| if not isinstance(self.index, list): |
There was a problem hiding this comment.
Here, I'm adding multiple series as varargs to the cunk functions. This way no additional logic for aligning the different series is required. The chunk functions have been adapted accordingly.
| agg_dask1 = get_agg_dask(result1) | ||
| agg_dask2 = get_agg_dask(result2) | ||
|
|
||
| core_agg_dask1 = {k: v for (k, v) in agg_dask1.dask.items() |
There was a problem hiding this comment.
map_partitions used in the finalize step passes the meta dataframe. Therefore, == can no longer be used for comparison.
|
In travis.ci it looks like there is an encoding issue with one of the files and some flake8 issues generally. Nothing major though. |
dask/dataframe/groupby.py
Outdated
|
|
||
| meta_groupby = pd.Series([], dtype=bool, index=self.obj._meta.index) | ||
| meta = _aggregate_meta(self.obj._meta, meta_groupby, 0, chunk_funcs, | ||
| aggregate_funcs, finalizers) |
There was a problem hiding this comment.
It looks like this function is only a few lines and is only used here. It might be easier to read if this function was just inlined here instead.
dask/dataframe/groupby.py
Outdated
|
|
||
| elif isinstance(self.index, list): | ||
| group_columns = {i for i in self.index | ||
| if isinstance(i, tuple) or np.isscalar(i)} |
There was a problem hiding this comment.
Does the input order not matter here?
There was a problem hiding this comment.
The group_columns columns are only used to create the list of non_group_columns. Since only __contains__ is checked, the order of group_columns is irrelevant.
dask/dataframe/groupby.py
Outdated
|
|
||
| # TODO: add normed spec as an additional part to the token? | ||
| token = 'aggregate' | ||
| finalize_token = '{}-finalize'.format(token) |
There was a problem hiding this comment.
Also, it might be nicer to just inline 'aggregate-finalize' directly in the map_partitions call.
Removing small logic like this should make it easier for other developers to maintain this function in the future.
There was a problem hiding this comment.
I assumed additional logic was required. Since this is not the case, I removed both tokens and inlined them directly into the aca and map_partitions calls.
|
|
||
| def _normalize_spec(spec, non_group_columns): | ||
| """ | ||
| Return a list of ``(result_column, func, input_column)`` tuples. |
There was a problem hiding this comment.
Can I ask for a quick Examples section here with a couple of worked examples to help future developers quickly understand how this function operates.
Examples
--------
>>> _normalize_spec(simple, inputs)
... simple outputs ...
There was a problem hiding this comment.
I added further explanation to the docstring and different examples that should cover the functionality.
dask/dataframe/groupby.py
Outdated
| # | ||
| ############################################################### | ||
|
|
||
| def _make_agg_id_factory(): |
There was a problem hiding this comment.
Should this function be tested independently?
There was a problem hiding this comment.
I added an extra test.
dask/dataframe/groupby.py
Outdated
|
|
||
| def _groupby_apply_funcs(df, *index, **kwargs): | ||
| """ | ||
| TODO: document args |
There was a problem hiding this comment.
Add full description of this function with all its arguments.
dask/dataframe/groupby.py
Outdated
| -------- | ||
|
|
||
| >>> _normalize_spec('mean', ['a', 'b', 'c']) | ||
| [('a', 'mean', 'a'), ('b', 'mean', 'b'), ('c', 'mean', 'c')] |
There was a problem hiding this comment.
These should be unindented to line up with the normal text on the left. We should also drop the newline after the ----.
http://dask.readthedocs.io/en/latest/develop.html#docstrings
dask/dataframe/groupby.py
Outdated
| ... 'b': {'e': 'count', 'f': 'var'}}, | ||
| ... ['a', 'b', 'c']) | ||
| [(('a', 'mean'), 'mean', 'a'), (('a', 'size'), 'size', 'a'), | ||
| (('b', 'e'), 'count', 'b'), (('b', 'f'), 'var', 'b')] |
There was a problem hiding this comment.
These are super-informative by the way. Thanks!
dask/dataframe/groupby.py
Outdated
| funcs: list of result-colum, function, keywordargument triples | ||
| The list of functions that are applied on the grouped data frame. | ||
| Has to be passed as a keyword argument. | ||
|
|
|
This is looking pretty good to me. It would be nice to have someone more familiar with dask.dataframe take a look at it before merging. @jcrist if you have time tomorrow this could use your review. |
Example:
# previously supported:
df['a'] -> 'a'
# additionally supported
[df['a'], df['b]] -> ['a', 'b']
- inline _aggregate_meta - add arg docs, examples for _normalize_spec - add arg docs for _groupy_apply_funcs - inline token arguments in aggregate
|
first (test) usage, seemed to work nicely. intentional to support only DataFrameGroupby ATM? (fine by me), maybe put in a |
so the rename [32] is not propogating So I am not sure this is directly in this PR |
|
Regarding the missing aggregate for |
dask/dataframe/groupby.py
Outdated
| integers = it.count() | ||
| ids = {} | ||
|
|
||
| def factory(func, column): |
There was a problem hiding this comment.
Why not just use dask.base.tokenize? This provides deterministic keys, without the need for cache.
tokenize(func, column)
There was a problem hiding this comment.
Missing knowledge about the dask internals. Using tokenize is a good idea, I will change this part.
|
The rename is not related to my code. dask does not seem to handle multilevel columns too well (for example also df['A', 'mean] does not work). A failing test for the rename functionality could be: def test_renames_multilevelcolumns():
pdf = pd.DataFrame({('A', '0') : [1,2,2], ('B', 1) : [1, 2, 3]})
ddf = dd.from_pandas(pdf, npartitions=3)
pdf.columns = ['foo', 'bar']
ddf.columns = ['foo', 'bar']
assert_eq(pdf, ddf)
# E AssertionError: Index are different
# E
# E Index classes are not equivalent
# E [left]: Index(['foo', 'bar'], dtype='object')
# E [right]: MultiIndex(levels=[['A', 'B'], [1, '0']],
# E labels=[[0, 1], [1, 0]])The culprit is the def test_renames_pandas():
pdf = pd.DataFrame({('A', '0') : [1,2,2], ('B', '1') : [1, 2, 3]})
expected = pdf.copy()
expected.columns = ['foo', 'bar']
# df.rename(columns=dict(zip(df.columns, columns))) in core._rename
actual = pdf.rename(columns=dict(zip(pdf.columns, ['foo', 'bar'])))
assert_eq(expected, actual)
# result:
# E AssertionError: DataFrame.columns are different
# E
# E DataFrame.columns classes are not equivalent
# E [left]: Index(['foo', 'bar'], dtype='object')
# E [right]: MultiIndex(levels=[['A', 'B'], ['0', '1']],
# E labels=[[0, 1], [0, 1]])However, as of now, I have no good idea how to fix this. |
|
@chmp in pandas you do this as |
|
I posted a comment on the |
|
I think that we can safely ignore the multi-named columns issue for this PR. It looks to me like the only outstanding issue it to use tokenize rather than the factory naming solution. Any other comments? |
|
My most recent change addresses the two open comments:
However, currently series-aggregate does not work with multiple groupers, i.e., For reference when I remove all checks in |
| # | ||
| ############################################################### | ||
| def _make_agg_id(func, column): | ||
| return '{!s}-{!s}-{}'.format(func, column, tokenize(func, column)) |
There was a problem hiding this comment.
Are func and column always strings? If so then perhaps just func + '-' + column?
There was a problem hiding this comment.
Not necessarily. In principle columns can be any valid python type (that is hash-able as far as I know). In particular integers, booleans, and tuples come to my mind.
mrocklin
left a comment
There was a problem hiding this comment.
This looks pretty good to me
| ddf = dd.from_pandas(pdf, npartitions=10) | ||
|
|
||
| assert_eq(pdf.groupby(grouper(pdf)).agg(spec), | ||
| ddf.groupby(grouper(ddf)).agg(spec, split_every=split_every)) |
There was a problem hiding this comment.
Thanks. After your little nudge towards pytest.mark.parametrize, I became I fan of it :)
dask/dataframe/tests/test_groupby.py
Outdated
| ['sum', 'mean', 'min', 'max', 'count', 'size', 'std', 'var'], | ||
| 'sum', 'size', | ||
| ]) | ||
| @pytest.mark.parametrize('split_every', [None])#[False, None]) |
There was a problem hiding this comment.
Why not test split_every = False here?
There was a problem hiding this comment.
An oversight: I commented it out to increase the test speed and forgot to comment it in. I'll run the travis tests on a branch in my repo and push the changes after everything is green.
|
Feel free to use travis.ci here. There isn't a big need to isolate your On Tue, Oct 25, 2016 at 12:32 PM, Christopher Prohm <
|
|
After I enabled travis for my fork, I only have to push into a branch that is not connected to this PR. Then travis is running the tests isolated from the dask repo. Since, dask is already integrated with travis, this whole procedure is very convenient :) |
|
Any further comments on this PR? |
|
Alrighty. Merging. Thanks for all of the effort on this one @chmp ! Nice work. |
After the great experience with #1666, I started to implement
dd.DataFrame(...).groupby(...).agg(...)on a recent, particularly long, train ride. The implementation is pretty much feature-complete. However, support forvarandstdis still missing. As of now, there are no changes to the existing dask code base for implementing this feature. Since this fact results in some code duplication and there are further design consideration to be discussed, I wanted to open a PR early to ask for feedback. I hope opening such a big issue as a PR without prior announcement is OK.Implementation notes
The implementation supports all documented argument values of pandas aggregate:
.agg('mean').agg(['mean', 'sum'])).agg({'b': 'mean', 'c': 'max'})).agg({'b': {'c': 'mean'}, 'c': {'a': 'max', 'a': 'min'}}))The aggregation functions
sum,min,max,count,size, andmeanare currently supported. When function objects are used, they are matched by name against known aggregates. Support forstdandvaris still missing.I followed closely the implementation of the existing aggregate functions. However, I added an additional finalize-step to make it easy to support tree aggregations (since this topic seems to be discussed). This design results in three stages: chunk functions, aggregate functions, and finalizers. In
aggregate,aca(chunk functions + aggregate functions) is followed bymap_partitions(finalizers).Each stage applies multiple function objects to the (grouped) input dataframe and returns a fragment of the result dataframe. The full results is obtained by concatenating all fragments (
_groupby_apply_funcsand_agg_finalize).Most of the heavy lifting is performed by generating function objects that are applied to the dataframe and return fragments of the result dataframe (
_build_agg_args). Since aggregate and finalize steps work on intermediate results of prior phases,aggforms an implicit expression graph. As of now, each aggregate generates its own independent graph by writing anonymous columns in intermediate dataframes (next_id). An alternative would be to use fixed names for intermediates and thereby reuse them between aggregates. For example,.agg(['sum', 'mean', 'count'])would only compute one sum and one count per input-column.Currently, all aggregations are implemented by calling individual aggregate functions on grouped dataframes (following existing code). Another option would be to call the pandas
aggimplementation to generate intermediates. Depending on the perfomance of the pandas implementation this change could improve overall performance.Implements #1619.