Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Initial version of non-groupby _take_last #4736

Merged

Conversation

Projects
None yet
2 participants
@beckernick
Copy link
Contributor

commented Apr 25, 2019

Summary of Changes

  • Refactors _take_last to remove the groupby.last() usage

    • Uses a two-tiered approach to find the last valid value in order to maintain speed. New approach is significantly faster than the groupby.last() approach for large data sizes and comparable at smaller sizes.
  • Replaces explicit pandas references with a check for whether the object is dataframe-like and uses the appropriate Series constructor based on the types.

  • Existing tests passed

  • Added and passed test for empty partitions scenario

  • Passes flake8 dask

This closes #4732 and will also close #4743.

@mrocklin
Copy link
Member

left a comment

In principle this looks fine to me. I did make a bunch of small nitpicky requests though (hope you don't mind) mostly around reducing indirection for ease of future review.

Show resolved Hide resolved dask/dataframe/core.py Outdated
Show resolved Hide resolved dask/dataframe/core.py Outdated
Show resolved Hide resolved dask/dataframe/core.py Outdated
Show resolved Hide resolved dask/dataframe/core.py Outdated
Show resolved Hide resolved dask/dataframe/core.py Outdated
Show resolved Hide resolved dask/dataframe/core.py Outdated
Show resolved Hide resolved dask/dataframe/core.py Outdated
@beckernick

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

I'd still like to remove the explicit pd.Series if possible as I think it may cause cumulative aggregations to break for cudf eventually when we create DataFrame methods. For now, I believe it shouldn't break anything in cudf as cumulative aggregations are only at the Series level.

Is there a general way to do this in dask or do you think the best approach would be to look at _cum_agg (the only place this function is used) to see if it really needs this information as a Series?

Also making a change per your most recent suggestion.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

I'd still like to remove the explicit pd.Series if possible as I think it may cause cumulative aggregations to break for cudf eventually when we create DataFrame methods

Right, so it sounds like we either need a way to get a Series type from a DataFrame or Series. This might be with something like

if is_series_like(obj):
    typ = type(obj)
elif is_dataframe_like(obj):
    typ = type(obj.iloc[0])

Alternatively, we could ask cudf to support the addition of Pandas series objects, which probably isn't a bad idea anyway.

@beckernick

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

Would the first approach you've listed make cudf a dependency of dask? I'd think this is not something we would want to do. I'm not sure if storing the type in a variable lets us do anything downstream to create the correct Series (cudf or pandas) without bringing in cudf, but I may be unaware of a function/method that would help in this (perhaps some kind of constructor wrapper based on the dataframe backend, maybe?). Would love to learn more.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

Would the first approach you've listed make cudf a dependency of dask? I'd think this is not something we would want to do. I'm not sure if storing the type in a variable lets us do anything downstream to create the correct Series (cudf or pandas) without bringing in cudf, but I may be unaware of a function/method that would help in this

No, it would not force a dependency of cudf. This would grab the type of the object we were given dynamically at runtime. The only way the type will be a cudf.Series object is if the input was a cudf object, so we'll obviously have already imported cudf at that time.

That being said, making cudf robust to interacting with pandas series objects is probably a good thing to do anyway.

@beckernick

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

I agree. Making cudf robust to interacting with pandas does sound like a good idea.

To make sure I'm following what you mean, Are you saying that something like:

if is_dataframe_like(obj):
    typ = typ(obj.iloc[0])
    if typ is cudf like:
        cudf.Series(obj)

Will be fine because the only situation in which the type would be cudf.Series is when we already have cudf and can thus use cudf.Series, which is also why we couldn't use isinstance(obj, cudf.Series) here because we will not have cudf.Series in every scenario. If so, we're now on the same page!

@mrocklin

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

I'm saying this

if is_series_like(obj):
    typ = type(obj)
elif is_dataframe_like(obj):
    typ = type(obj.iloc[0])

return typ(out, index=a.columns)
@beckernick

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

Wow. Learned something great here. I was not aware that type had this kind of behavior.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

s = pd.Series([1, 2 ,3])
typ = type(s)
assert typ is pd.Series

so this is the same:

pd.Series([1, 2, 3]) == typ([1, 2, 3])
increase range end by 1 to reach first row, switch explicit pandas re…
…ference to dynamically grab type and construct an appropriate Series

@beckernick beckernick changed the title [WIP] Initial version of non-groupby _take_last [REVIEW] Initial version of non-groupby _take_last Apr 25, 2019

# in each column
if is_dataframe_like(a):
# create Series of appropriate backend dataframe library
series_typ = type(a.iloc[0])

This comment has been minimized.

Copy link
@mrocklin

mrocklin Apr 26, 2019

Member

Hrm, this will fail if there are empty partitions, as in this example.

import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'x': [1, 2, 3, 4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=4)
ddf[ddf.x < 5].cumsum().compute()

(sorry for not noticing this before)

Maybe type(a.head(0).sum()) ? Actually no, cudf doesn't support this. We'll need to find some cheap way to get the Series type of a DataFrame object that always works :/

This comment has been minimized.

Copy link
@beckernick

beckernick Apr 26, 2019

Author Contributor

Good catch. Will think about this.

This comment has been minimized.

Copy link
@beckernick

beckernick Apr 26, 2019

Author Contributor

Could we leverage the fact that slices behave differently? Instead of grabbing with a.iloc[0] let's perhaps grab with a.iloc[:0] or a.iloc[0:1]? This appears to solve the indexing issue but results in a downstream issue with the meta check in is_dataframe_like. Looking into it. This will gives us a dataframe actually, so not quite what we need and what causes the meta check issue.

This comment has been minimized.

Copy link
@beckernick

beckernick Apr 26, 2019

Author Contributor

Also, as a note, it looks like the original _take_last also fails when there are empty partitions so this will be good to fix.

...
/Users/nickbecker/NVIDIA/dask/dask/dataframe/core.py in _take_last(a, skipna)
   4213         last_row = a.groupby(group_dummy).last()
   4214         if isinstance(a, pd.DataFrame):  # TODO: handle explicit pandas reference
-> 4215             return pd.Series(last_row.values[0], index=a.columns)
   4216         else:
   4217             return last_row.values[0]

IndexError: index 0 is out of bounds for axis 0 with size 0

This comment has been minimized.

Copy link
@beckernick

beckernick Apr 26, 2019

Author Contributor

@mrocklin , the following snippet appears to solve the empty partition issue:

if is_dataframe_like(a):
    # create Series of appropriate backend dataframe library
    series_typ = type(a.loc[0:1, a.columns[0]]) # if it's dataframe like, it must have at least one column
    if a.empty:
        return series_typ([]) # we dont want to index this since its empty
    return series_typ({col: _last_valid(a[col]) for col in a.columns}, index=a.columns)

This comment has been minimized.

Copy link
@mrocklin

mrocklin Apr 26, 2019

Member

If this failed before then we can probably leave this as is and raise an issue.

This comment has been minimized.

Copy link
@mrocklin

mrocklin Apr 26, 2019

Member

Ah, seems like you just did this in #4743

This comment has been minimized.

Copy link
@beckernick

beckernick Apr 26, 2019

Author Contributor

Yep. Defer to you on whether you want to leave this as is since it was already failing on empty partition dataframes or try to fix it in this PR.

The above snippet does solve the problem for cumsum and cumprod, but still causes issues with cummin and cummax when it tries to compare a non-empty series with an empty series:

...

/dask/dask/dataframe/methods.py in cummin_aggregate(x, y)
    147 def cummin_aggregate(x, y):
    148     if isinstance(x, (pd.Series, pd.DataFrame)):
--> 149         return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
    150     else:       # scalar
    151         return x if x < y else y

/conda/envs/cudf/lib/python3.7/site-packages/pandas/core/ops.py in wrapper(self, other, axis)
   1674
   1675         elif isinstance(other, ABCSeries) and not self._indexed_same(other):
-> 1676             raise ValueError("Can only compare identically-labeled "
   1677                              "Series objects")
   1678

ValueError: Can only compare identically-labeled Series objects
ipdb> self
x    0
dtype: int64
ipdb> other
Series([], dtype: float64)
ipdb>

Maybe we could name the empty series like the column name, but that doesn't solve the situation you described when the dataframe is empty and has no columns.

This comment has been minimized.

Copy link
@mrocklin

mrocklin Apr 26, 2019

Member

I'd be happy if you wanted to include it. You might also include a test with the currently passing cum-reductions (and, if you wanted to get fancy, xfailed parameters for the failing ones).

@pytest.mark.parametrize('func', [
    M.cumsum,
    M.cumprod,
    pytest.param(M.cummin, marks=[pytest.mark.xfail(reason="...")]),
    ...
])
def test_cum_reduction_empty_partitions(func):
    df = ...
    ...

(I could be totally wrong about the pytest.param syntax by the way. I would check existing tests.

This comment has been minimized.

Copy link
@beckernick

beckernick Apr 26, 2019

Author Contributor

I'll include the snippet and add tests for the empty partition case so that we still can at least pass cumsum/cumprod with empty partitions, which is an improvement on the current situation. I'll match the xfail style for the cummin/cummax on empty partitions to the style in the other tests.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Apr 26, 2019

Merging this tomorrow if there are no further comments

@mrocklin mrocklin merged commit 8ce1ab7 into dask:master Apr 27, 2019

2 checks passed

continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@mrocklin

This comment has been minimized.

Copy link
Member

commented Apr 27, 2019

This is in. Thanks @beckernick !

jorge-pessoa pushed a commit to jorge-pessoa/dask that referenced this pull request May 14, 2019

Change cum-aggregation last-nonnull-value algorithm (dask#4736)
Previously we would use groupby.last in order to get the last non-null value of a partition.  This was suboptimal in two ways:

-  Less advanced dataframe libraries, like cudf, might not implement groupby-last, and so couldn't use this algorithm
-  It was somewhat expensive
-  It failed if there were empty partitions

Our new approach tries two paths:

-  First, use a normal Python for loop and `.iloc` on the last ten rows of a column.  This is faster than the groupby.last call and only uses iloc, which is somewhat more likely to be around.
-  If that didn't work (there are lots of nulls) then use `s[s.notna()][-1]`, which does a full scan, but is doesn't iterate with Python

Thomas-Z added a commit to Thomas-Z/dask that referenced this pull request May 17, 2019

Change cum-aggregation last-nonnull-value algorithm (dask#4736)
Previously we would use groupby.last in order to get the last non-null value of a partition.  This was suboptimal in two ways:

-  Less advanced dataframe libraries, like cudf, might not implement groupby-last, and so couldn't use this algorithm
-  It was somewhat expensive
-  It failed if there were empty partitions

Our new approach tries two paths:

-  First, use a normal Python for loop and `.iloc` on the last ten rows of a column.  This is faster than the groupby.last call and only uses iloc, which is somewhat more likely to be around.
-  If that didn't work (there are lots of nulls) then use `s[s.notna()][-1]`, which does a full scan, but is doesn't iterate with Python
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.