New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel variance computation for dataframes #4865
Conversation
cc also @jcrist who implemented the array version of this algorithm |
Sorry about the delay. Will take another look tonight or tomorrow. |
@TomAugspurger no problem, thank you for your feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For homogenous dataframes, or for mixes of int / float this looks quite good. It's the trying to support Timedelta that's complicating things.
@almaleksia what do you think about continuing to (incorrectly) drop Timedelta columns? Then I think your implementation is just fine, and doesn't make supporting time deltas any harder than in the future. The benefit to ignoring Timedeltas for now is that we maintain the previous behavior, rather than raising at graph construction time.
dask/array/reductions.py
Outdated
denominator = n.sum(axis=axis, **kwargs) - ddof | ||
|
||
# taking care of the edge case with empty or all-nans array with ddof > 0 | ||
if isinstance(denominator, np.ndarray): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this may end up being something like a CuPy array, (not necessarily a NumPy ndarray). Maybe flip the cases of the if
block and do something like
if isinstance(denominator, numbers.Number):
denominator = np.nan
else:
denominator[denominator < 0] = np.nan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger thank you, didn't think about that.
dask/dataframe/core.py
Outdated
name = self._token_prefix + 'var' | ||
result = map_partitions(methods.var_aggregate, x2, x, n, | ||
token=name, meta=meta, ddof=ddof) | ||
num = self.select_dtypes(include=['number', np.timedelta64, 'bool']) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about this suggestion, turned out to be more complicated than I thought. Glad you found a solution.
dask/dataframe/core.py
Outdated
result = map_partitions(methods.var_aggregate, x2, x, n, | ||
token=name, meta=meta, ddof=ddof) | ||
num = self.select_dtypes(include=['number', np.timedelta64, 'bool']) \ | ||
if is_dataframe_like(self) else self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if self.ndim > 1
? I had to pause for a second to think about why is_dataframe_like(self)
was needed.
dask/dataframe/core.py
Outdated
num = self.select_dtypes(include=['number', np.timedelta64, 'bool']) \ | ||
if is_dataframe_like(self) else self | ||
|
||
values_dtype = num.values.dtype |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is concerning me slightly. What happens when we have a heterogenous dataframe?
In [6]: df = pd.DataFrame({"A": [1, 2, 3], "B": pd.to_timedelta([1, 2, 3])})
In [7]: ddf = dd.from_pandas(df, 2)
In [8]: df.var()
Out[8]:
A 1.0
B 1.0
dtype: float64
In [9]: ddf.var()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-9-d17ffc2129b0> in <module>
----> 1 ddf.var()
~/sandbox/dask/dask/dataframe/core.py in var(self, axis, skipna, ddof, split_every, dtype, out)
1467 cols = num._meta.columns if is_dataframe_like(num) else None
1468
-> 1469 var_shape = num._meta_nonempty.values.var(axis=0).shape
1470 array_var_name = (array_var._name,) + (0,) * len(var_shape)
1471
~/Envs/dask-dev/lib/python3.7/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims)
119 x = um.multiply(x, um.conjugate(x), out=x).real
120 else:
--> 121 x = um.multiply(x, x, out=x)
122 ret = umr_sum(x, axis, dtype, out, keepdims)
123
TypeError: unsupported operand type(s) for *: 'Timedelta' and 'Timedelta'
On master, it seems we incorrectly dropped timedeltas.
I think the more correct thing to do is to avoid the .values
and apply things columnwise. That has a negative performance impact for very wide dataframe, as we'll have one var
computation per column, rather than per dtype. I'll need to think a bit more about what to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger omg, seems like I missed that test_reductions only includes tests for series and not for dataframe :'( Sorry for that. It is a good catch. I can do the same way as it's done in describe - if there are timedelta columns in dataframe, apply columnwise, if not - per dtype. This way it's not going to affect performance for numeric dataframes. timedeltas first can be converted to int64 and then after computation is applied - back to timedelta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger I've added per-column variance for dataframes with timedelta columns. For all-numerical everything remains the same.
42a496b
to
9e8b539
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks quite close I think.
dask/dataframe/core.py
Outdated
is_timedelta = is_timedelta64_dtype(column._meta) | ||
|
||
if is_timedelta: | ||
column = column.dropna().astype('i8') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this working around pandas-dev/pandas#18880? Ideally we'd like to follow skipna, but that bug may prevent it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger looks like. I've looked into your PR for this bug and this what ideally should be done here as well:
if is_timedelta64_dtype(values) and not skipna:
values = values.astype('float64')
values[mask] = np.nan
But I suspect it's going to slow down things. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized, we aren't using pandas' nanstd
, so we don't need to worry about pandas-dev/pandas#18880 hopefully.
I'm not too concerned about the cost of computing the mask. Pandas does that internally as well, so it won't be outrageously slower (and this branch is only for not skipna
, so it's not the default). Let's do something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Thanks @almaleksia! |
@TomAugspurger thank you for the review :) |
Fixes #4233
flake8 dask
Re-using array variance computation logic in dataframe.