Skip to content

Commit

Permalink
Cythonized GroupBy any (pandas-dev#19722)
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd authored and jreback committed Mar 1, 2018
1 parent 96b8bb1 commit 4a27697
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 13 deletions.
16 changes: 14 additions & 2 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
from .pandas_vb_common import setup # noqa


method_blacklist = {
'object': {'median', 'prod', 'sem', 'cumsum', 'sum', 'cummin', 'mean',
'max', 'skew', 'cumprod', 'cummax', 'rank', 'pct_change', 'min',
'var', 'mad', 'describe', 'std'}
}


class ApplyDictReturn(object):
goal_time = 0.2

Expand Down Expand Up @@ -153,6 +160,7 @@ def time_frame_nth_any(self, df):
def time_frame_nth(self, df):
df.groupby(0).nth(0)


def time_series_nth_any(self, df):
df[1].groupby(df[0]).nth(0, dropna='any')

Expand Down Expand Up @@ -369,23 +377,27 @@ class GroupByMethods(object):
goal_time = 0.2

param_names = ['dtype', 'method']
params = [['int', 'float'],
params = [['int', 'float', 'object'],
['all', 'any', 'bfill', 'count', 'cumcount', 'cummax', 'cummin',
'cumprod', 'cumsum', 'describe', 'ffill', 'first', 'head',
'last', 'mad', 'max', 'min', 'median', 'mean', 'nunique',
'pct_change', 'prod', 'rank', 'sem', 'shift', 'size', 'skew',
'std', 'sum', 'tail', 'unique', 'value_counts', 'var']]

def setup(self, dtype, method):
if method in method_blacklist.get(dtype, {}):
raise NotImplementedError # skip benchmark
ngroups = 1000
size = ngroups * 2
rng = np.arange(ngroups)
values = rng.take(np.random.randint(0, ngroups, size=size))
if dtype == 'int':
key = np.random.randint(0, size, size=size)
else:
elif dtype == 'float':
key = np.concatenate([np.random.random(ngroups) * 0.1,
np.random.random(ngroups) * 10.0])
elif dtype == 'object':
key = ['foo'] * size

df = DataFrame({'values': values, 'key': key})
self.df_groupby_method = getattr(df.groupby('key')['values'], method)
Expand Down
5 changes: 5 additions & 0 deletions doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2179,8 +2179,12 @@ Computations / Descriptive Stats
.. autosummary::
:toctree: generated/

GroupBy.all
GroupBy.any
GroupBy.bfill
GroupBy.count
GroupBy.cumcount
GroupBy.ffill
GroupBy.first
GroupBy.head
GroupBy.last
Expand All @@ -2192,6 +2196,7 @@ Computations / Descriptive Stats
GroupBy.nth
GroupBy.ohlc
GroupBy.prod
GroupBy.rank
GroupBy.size
GroupBy.sem
GroupBy.std
Expand Down
5 changes: 3 additions & 2 deletions doc/source/whatsnew/v0.23.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,10 @@ Performance Improvements
- Improved performance of :func:`DataFrame.median` with ``axis=1`` when bottleneck is not installed (:issue:`16468`)
- Improved performance of :func:`MultiIndex.get_loc` for large indexes, at the cost of a reduction in performance for small ones (:issue:`18519`)
- Improved performance of pairwise ``.rolling()`` and ``.expanding()`` with ``.cov()`` and ``.corr()`` operations (:issue:`17917`)
- Improved performance of :func:`DataFrameGroupBy.rank` (:issue:`15779`)
- Improved performance of :func:`pandas.core.groupby.GroupBy.rank` (:issue:`15779`)
- Improved performance of variable ``.rolling()`` on ``.min()`` and ``.max()`` (:issue:`19521`)
- Improved performance of ``GroupBy.ffill`` and ``GroupBy.bfill`` (:issue:`11296`)
- Improved performance of :func:`pandas.core.groupby.GroupBy.ffill` and :func:`pandas.core.groupby.GroupBy.bfill` (:issue:`11296`)
- Improved performance of :func:`pandas.core.groupby.GroupBy.any` and :func:`pandas.core.groupby.GroupBy.all` (:issue:`15435`)

.. _whatsnew_0230.docs:

Expand Down
57 changes: 57 additions & 0 deletions pandas/_libs/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -310,5 +310,62 @@ def group_fillna_indexer(ndarray[int64_t] out, ndarray[int64_t] labels,
filled_vals = 0


@cython.boundscheck(False)
@cython.wraparound(False)
def group_any_all(ndarray[uint8_t] out,
ndarray[int64_t] labels,
ndarray[uint8_t] values,
ndarray[uint8_t] mask,
object val_test,
bint skipna):
"""Aggregated boolean values to show truthfulness of group elements
Parameters
----------
out : array of values which this method will write its results to
labels : array containing unique label for each group, with its
ordering matching up to the corresponding record in `values`
values : array containing the truth value of each element
mask : array indicating whether a value is na or not
val_test : str {'any', 'all'}
String object dictating whether to use any or all truth testing
skipna : boolean
Flag to ignore nan values during truth testing
Notes
-----
This method modifies the `out` parameter rather than returning an object.
The returned values will either be 0 or 1 (False or True, respectively).
"""
cdef:
Py_ssize_t i, N=len(labels)
int64_t lab
uint8_t flag_val

if val_test == 'all':
# Because the 'all' value of an empty iterable in Python is True we can
# start with an array full of ones and set to zero when a False value
# is encountered
flag_val = 0
elif val_test == 'any':
# Because the 'any' value of an empty iterable in Python is False we
# can start with an array full of zeros and set to one only if any
# value encountered is True
flag_val = 1
else:
raise ValueError("'bool_func' must be either 'any' or 'all'!")

out.fill(1 - flag_val)

with nogil:
for i in range(N):
lab = labels[i]
if lab < 0 or (skipna and mask[i]):
continue

if values[i] == flag_val:
out[lab] = flag_val


# generated from template
include "groupby_helper.pxi"
127 changes: 118 additions & 9 deletions pandas/core/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,53 @@ class GroupBy(_GroupBy):
"""
_apply_whitelist = _common_apply_whitelist

def _bool_agg(self, val_test, skipna):
"""Shared func to call any / all Cython GroupBy implementations"""

def objs_to_bool(vals):
try:
vals = vals.astype(np.bool)
except ValueError: # for objects
vals = np.array([bool(x) for x in vals])

return vals.view(np.uint8)

def result_to_bool(result):
return result.astype(np.bool, copy=False)

return self._get_cythonized_result('group_any_all', self.grouper,
aggregate=True,
cython_dtype=np.uint8,
needs_values=True,
needs_mask=True,
pre_processing=objs_to_bool,
post_processing=result_to_bool,
val_test=val_test, skipna=skipna)

@Substitution(name='groupby')
@Appender(_doc_template)
def any(self, skipna=True):
"""Returns True if any value in the group is truthful, else False
Parameters
----------
skipna : bool, default True
Flag to ignore nan values during truth testing
"""
return self._bool_agg('any', skipna)

@Substitution(name='groupby')
@Appender(_doc_template)
def all(self, skipna=True):
"""Returns True if all values in the group are truthful, else False
Parameters
----------
skipna : bool, default True
Flag to ignore nan values during truth testing
"""
return self._bool_agg('all', skipna)

@Substitution(name='groupby')
@Appender(_doc_template)
def count(self):
Expand Down Expand Up @@ -1485,6 +1532,8 @@ def _fill(self, direction, limit=None):

return self._get_cythonized_result('group_fillna_indexer',
self.grouper, needs_mask=True,
cython_dtype=np.int64,
result_is_index=True,
direction=direction, limit=limit)

@Substitution(name='groupby')
Expand Down Expand Up @@ -1873,33 +1922,81 @@ def cummax(self, axis=0, **kwargs):

return self._cython_transform('cummax', numeric_only=False)

def _get_cythonized_result(self, how, grouper, needs_mask=False,
needs_ngroups=False, **kwargs):
def _get_cythonized_result(self, how, grouper, aggregate=False,
cython_dtype=None, needs_values=False,
needs_mask=False, needs_ngroups=False,
result_is_index=False,
pre_processing=None, post_processing=None,
**kwargs):
"""Get result for Cythonized functions
Parameters
----------
how : str, Cythonized function name to be called
grouper : Grouper object containing pertinent group info
aggregate : bool, default False
Whether the result should be aggregated to match the number of
groups
cython_dtype : default None
Type of the array that will be modified by the Cython call. If
`None`, the type will be inferred from the values of each slice
needs_values : bool, default False
Whether the values should be a part of the Cython call
signature
needs_mask : bool, default False
Whether boolean mask needs to be part of the Cython call signature
Whether boolean mask needs to be part of the Cython call
signature
needs_ngroups : bool, default False
Whether number of groups part of the Cython call signature
Whether number of groups is part of the Cython call signature
result_is_index : bool, default False
Whether the result of the Cython operation is an index of
values to be retrieved, instead of the actual values themselves
pre_processing : function, default None
Function to be applied to `values` prior to passing to Cython
Raises if `needs_values` is False
post_processing : function, default None
Function to be applied to result of Cython function
**kwargs : dict
Extra arguments to be passed back to Cython funcs
Returns
-------
`Series` or `DataFrame` with filled values
"""
if result_is_index and aggregate:
raise ValueError("'result_is_index' and 'aggregate' cannot both "
"be True!")
if post_processing:
if not callable(pre_processing):
raise ValueError("'post_processing' must be a callable!")
if pre_processing:
if not callable(pre_processing):
raise ValueError("'pre_processing' must be a callable!")
if not needs_values:
raise ValueError("Cannot use 'pre_processing' without "
"specifying 'needs_values'!")

labels, _, ngroups = grouper.group_info
output = collections.OrderedDict()
base_func = getattr(libgroupby, how)

for name, obj in self._iterate_slices():
indexer = np.zeros_like(labels, dtype=np.int64)
func = partial(base_func, indexer, labels)
if aggregate:
result_sz = ngroups
else:
result_sz = len(obj.values)

if not cython_dtype:
cython_dtype = obj.values.dtype

result = np.zeros(result_sz, dtype=cython_dtype)
func = partial(base_func, result, labels)
if needs_values:
vals = obj.values
if pre_processing:
vals = pre_processing(vals)
func = partial(func, vals)

if needs_mask:
mask = isnull(obj.values).view(np.uint8)
func = partial(func, mask)
Expand All @@ -1908,9 +2005,19 @@ def _get_cythonized_result(self, how, grouper, needs_mask=False,
func = partial(func, ngroups)

func(**kwargs) # Call func to modify indexer values in place
output[name] = algorithms.take_nd(obj.values, indexer)

return self._wrap_transformed_output(output)
if result_is_index:
result = algorithms.take_nd(obj.values, result)

if post_processing:
result = post_processing(result)

output[name] = result

if aggregate:
return self._wrap_aggregated_output(output)
else:
return self._wrap_transformed_output(output)

@Substitution(name='groupby')
@Appender(_doc_template)
Expand All @@ -1930,7 +2037,9 @@ def shift(self, periods=1, freq=None, axis=0):
return self.apply(lambda x: x.shift(periods, freq, axis))

return self._get_cythonized_result('group_shift_indexer',
self.grouper, needs_ngroups=True,
self.grouper, cython_dtype=np.int64,
needs_ngroups=True,
result_is_index=True,
periods=periods)

@Substitution(name='groupby')
Expand Down
25 changes: 25 additions & 0 deletions pandas/tests/groupby/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pandas import (date_range, bdate_range, Timestamp,
Index, MultiIndex, DataFrame, Series,
concat, Panel, DatetimeIndex, read_csv)
from pandas.core.dtypes.missing import isna
from pandas.errors import UnsupportedFunctionCall, PerformanceWarning
from pandas.util.testing import (assert_frame_equal, assert_index_equal,
assert_series_equal, assert_almost_equal)
Expand Down Expand Up @@ -2116,6 +2117,30 @@ def interweave(list_obj):
exp = DataFrame({'key': keys, 'val': _exp_vals})
assert_frame_equal(result, exp)

@pytest.mark.parametrize("agg_func", ['any', 'all'])
@pytest.mark.parametrize("skipna", [True, False])
@pytest.mark.parametrize("vals", [
['foo', 'bar', 'baz'], ['foo', '', ''], ['', '', ''],
[1, 2, 3], [1, 0, 0], [0, 0, 0],
[1., 2., 3.], [1., 0., 0.], [0., 0., 0.],
[True, True, True], [True, False, False], [False, False, False],
[np.nan, np.nan, np.nan]
])
def test_groupby_bool_aggs(self, agg_func, skipna, vals):
df = DataFrame({'key': ['a'] * 3 + ['b'] * 3, 'val': vals * 2})

# Figure out expectation using Python builtin
exp = getattr(compat.builtins, agg_func)(vals)

# edge case for missing data with skipna and 'any'
if skipna and all(isna(vals)) and agg_func == 'any':
exp = False

exp_df = DataFrame([exp] * 2, columns=['val'], index=pd.Index(
['a', 'b'], name='key'))
result = getattr(df.groupby('key'), agg_func)(skipna=skipna)
assert_frame_equal(result, exp_df)

def test_dont_clobber_name_column(self):
df = DataFrame({'key': ['a', 'a', 'a', 'b', 'b', 'b'],
'name': ['foo', 'bar', 'baz'] * 2})
Expand Down

0 comments on commit 4a27697

Please sign in to comment.