Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Allow the groupby by param to handle columns and index levels #2636

Merged
merged 4 commits into from
Aug 31, 2017

Conversation

jonmmease
Copy link
Contributor

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of small comments. I suspect that @TomAugspurger might want to take a look here


# Compute on dask DataFrame with divisions (no shuffling)
result = ddf.groupby(['idx', 'a']).apply(func)
assert_eq(expected, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to test the no-shuffling assertion by checking the size of the task graph, len(result.dask). This should be much smaller in the efficient case, particularly if you shuffle under a with dask.set_options(shuffle='tasks') block

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably apply above as well

and/or the name of the DataFrame's index
:return: Dask DataFrame with columns corresponding to each column or
index level in columns_or_index. If included, the column corresponding
to the index level is named _index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dask tends to use numpydoc style docstrings. http://dask.pydata.org/en/latest/develop.html#docstrings

@jonmmease
Copy link
Contributor Author

Well, looks like I managed to break everything :-) Looking into it. Some (but not all) of these failures are because these changes won't work with versions of pandas before 0.20. What's the best way to inform the test suite (and users) of this dependency?

@mrocklin
Copy link
Member

I guess we first have to ask ourselves "do we want to continue supporting Pandas 0.19?"

If the answer is yes then presumably things would still work in either case, but the checks about efficiency would only be run if the pandas version met some criterion. I think that there are dask.dataframe tests that use LooseVersion for checks, although given the release cycle I suspect that a straight check on the value of pd.__version__ would also work fine.

@jcrist
Copy link
Member

jcrist commented Aug 29, 2017

I guess we first have to ask ourselves "do we want to continue supporting Pandas 0.19?"

0.19.0 was released in October of 2016. I'm not sure if we can drop it yet, would probably need to poll users. No strong opinions here, except that if we do drop 0.19.* it should be a major point release.

although given the release cycle I suspect that a straight check on the value of pd.version would also work fine.

You can import PANDAS_VERSION from dask.dataframe.utils. For consistency, would prefer you use that. There are several places where we check for PANDAS_VERSION >= 0.20.0

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only partway through, I'll take another look later, but overall this is looking good. Thanks!

"""
# Ensure columns_or_index is a list
columns_or_index = (columns_or_index
if isinstance(columns_or_index, list)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rewrite this to check if columns_or_index is a sequence, or maybe more easily reverse the logic and check for isinstance(columns_or_index, pd.compat.string_types)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on some updates now. I did the check this way because strings, integers, and tuples can all be valid column keys for pandas and I'm aiming to end up with a List of column keys. Does that make sense?

if isinstance(columns_or_index, list)
else [columns_or_index])

column_names = [n for n in columns_or_index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does self.columns & columns_or_index achieve the desired result? I'm wondering about cases where n is not a scalar, and why those should be excluded from column_names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good catch. The scalar check was there to filter out cases where you're grouping on something like a series. I'll extract the scalar check into a method that handles tuples and is a bit clearer.


def _contains_index_name(self, columns_or_index):
if isinstance(columns_or_index, list):
return (self.index.name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check self.index.name is not None, otherwise we exclude falsey index names False or 0.

for n in columns_or_index
if np.isscalar(n)))
else:
return (columns_or_index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about is not None

# Test aggregate strings
if agg_func in {'sum', 'mean', 'var', 'size', 'std', 'count'}:
result = ddf_no_divs.groupby(['a', 'idx']).agg(agg_func)
assert_eq(expected, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a case that just groups by the index, and no columns?

@jonmmease
Copy link
Contributor Author

Ok, just pushed changes that I believe address the review comments thus far. The new tests are all working with pandas' master (I forgot that's what I was using when I was testing the changes yesterday).

Unfortunately, I'm hitting up against a bug in pandas 0.20.3 (pandas-dev/pandas#16843) that breaks metadata calculations in a lot of cases when grouping on combinations of columns and the index. This bug was fixed last week and will be in 0.21 though I'm not familiar with the pandas release schedule timing.

To be clear, these changes don't (shouldn't!) break any existing tests with pandas 0.19 or 0.20. They just aren't going to be fully useful before pandas 0.21 lands.

Let me know how you'd like to proceed. Thanks!

@TomAugspurger
Copy link
Member

This bug was fixed last week and will be in 0.21 though I'm not familiar with the pandas release schedule timing

0.21 will be released at the end of September.

It looks like the last commit had some formatting issues: https://travis-ci.org/dask/dask/jobs/270025325#L4608

@jonmmease
Copy link
Contributor Author

@mrocklin @TomAugspurger Looks like tests passed with the exception of the "PYTHON=3.5 NUMPY=1.12.1 PANDAS=0.19.2" configuration.

Wading through the log, the failures seem to be caused by exceptions stating "Exception: Tried sending message after closing. Status: closed". Next to the first failure in the summary I see:

[gw1] node down: Not properly terminated
[gw1] FAILED dask/dataframe/tests/test_multi.py::test_merge_by_index_patterns[left-disk] 
Replacing crashed slave gw1

Do you think this is something related to my changes or did something flakey happen on the CI servers?

@TomAugspurger
Copy link
Member

TomAugspurger commented Aug 30, 2017

Hmm, one thing I just thought of. The new np.isscalar calls may trigger some unnescessary computations when the grouper is a dask.Series.

In [21]: df = pd.DataFrame({"A": [1] * 5 + [2] * 5, "B": ['a', 'b'] * 5, 'C': range(10)}, index=pd.Index(range(10), name='A'))

In [22]: a = dd.from_pandas(df, 2).set_index("A")

In [23]: a.groupby(a.B).C.apply(np.mean)  # will trigger an `np.isscalar(a.B)`, computing a.B

I don't know how big of a problem this is, but I think it could be avoided. I'll take another look later.

@jonmmease
Copy link
Contributor Author

Oh, good point. I see what you mean. I could always check for dask collections before calling np.isscalar(). Let me know if you think of anything more elegant.

@TomAugspurger
Copy link
Member

TomAugspurger commented Aug 30, 2017 via email

@TomAugspurger TomAugspurger merged commit d32e8b7 into dask:master Aug 31, 2017
@TomAugspurger
Copy link
Member

Thanks @jmmease

@jonmmease
Copy link
Contributor Author

Sure thing. Thanks for the quick feedback @TomAugspurger and @mrocklin

@mrocklin
Copy link
Member

mrocklin commented Aug 31, 2017 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants