Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fillna bfill pad ffill function in groupby #640

Merged
merged 11 commits into from Aug 20, 2019
26 changes: 11 additions & 15 deletions databricks/koalas/frame.py
Expand Up @@ -3142,15 +3142,14 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
2 0.0 1.0 2.0 5
3 0.0 3.0 1.0 4
"""
if axis is None:
axis = 0
if not (axis == 0 or axis == "index"):
raise NotImplementedError("fillna currently only works for axis=0 or axis='index'")
if (value is None) and (method is None):
raise ValueError("Must specify a fill 'value' or 'method'.")

sdf = self._sdf
if value is not None:
if axis is None:
axis = 0
if not (axis == 0 or axis == "index"):
raise NotImplementedError("fillna currently only works for axis=0 or axis='index'")
if (value is None) and (method is None):
raise ValueError("Must specify a fillna 'value' or 'method' parameter.")
if not isinstance(value, (float, int, str, bool, dict, pd.Series)):
raise TypeError("Unsupported type %s" % type(value))
if isinstance(value, pd.Series):
Expand All @@ -3164,15 +3163,12 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
sdf = sdf.fillna(value)
internal = self._internal.copy(sdf=sdf)
else:
if method not in ['pad', 'ffill', 'backfill', 'bfill']:
raise ValueError("Expecting pad, ffill, backfill or bfill.")
applied = []
for column in self._internal.data_columns:
applied.append(self[column].fillna(value=value, method=method,
axis=axis, limit=limit))
sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
internal = self._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
for col in self._internal.data_columns:
applied.append(self[col].fillna(value=value, method=method, axis=axis,
inplace=False, limit=limit))
sdf = self._sdf.select(self._internal.index_columns + [col._scol for col in applied])
internal = self._internal.copy(sdf=sdf, data_columns=[col.name for col in applied])
if inplace:
self._internal = internal
else:
Expand Down
183 changes: 183 additions & 0 deletions databricks/koalas/groupby.py
Expand Up @@ -1010,6 +1010,172 @@ def rank(self, method='average', ascending=True):
"""
return self._rank(method, ascending)

def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
"""Fill NA/NaN values in group.

Parameters
----------
value : scalar, dict, Series
Value to use to fill holes. alternately a dict/Series of values
specifying which value to use for each column.
DataFrame is not supported.
method : {'backfill', 'bfill', 'pad', 'ffill', None}, default None
Method to use for filling holes in reindexed Series pad / ffill: propagate last valid
observation forward to next valid backfill / bfill:
use NEXT valid observation to fill gap
axis : {0 or `index`}
1 and `columns` are not supported.
inplace : boolean, default False
Fill in place (do not create a new object)
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values to
forward/backward fill. In other words, if there is a gap with more than this number of
consecutive NaNs, it will only be partially filled. If method is not specified,
this is the maximum number of entries along the entire axis where NaNs will be filled.
Must be greater than 0 if not None

Returns
-------
DataFrame
DataFrame with NA entries filled.

Examples
--------
>>> df = ks.DataFrame({
... 'A': [1, 1, 2, 2],
... 'B': [2, 4, None, 3],
... 'C': [None, None, None, 1],
... 'D': [0, 1, 5, 4]
... },
... columns=['A', 'B', 'C', 'D'])
>>> df
A B C D
0 1 2.0 NaN 0
1 1 4.0 NaN 1
2 2 NaN NaN 5
3 2 3.0 1.0 4

We can also propagate non-null values forward or backward in group.

>>> df.groupby(['A'])['B'].fillna(method='ffill')
0 2.0
1 4.0
2 NaN
3 3.0
Name: B, dtype: float64

>>> df.groupby(['A']).fillna(method='bfill')
B C D
0 2.0 NaN 0
1 4.0 NaN 1
2 3.0 1.0 5
3 3.0 1.0 4
"""
return self._fillna(value, method, axis, inplace, limit)

def bfill(self, limit=None):
"""
Synonym for `DataFrame.fillna()` with ``method=`bfill```.

Parameters
----------
axis : {0 or `index`}
1 and `columns` are not supported.
inplace : boolean, default False
Fill in place (do not create a new object)
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values to
forward/backward fill. In other words, if there is a gap with more than this number of
consecutive NaNs, it will only be partially filled. If method is not specified,
this is the maximum number of entries along the entire axis where NaNs will be filled.
Must be greater than 0 if not None

Returns
-------
DataFrame
DataFrame with NA entries filled.

Examples
--------
>>> df = ks.DataFrame({
... 'A': [1, 1, 2, 2],
... 'B': [2, 4, None, 3],
... 'C': [None, None, None, 1],
... 'D': [0, 1, 5, 4]
... },
... columns=['A', 'B', 'C', 'D'])
>>> df
A B C D
0 1 2.0 NaN 0
1 1 4.0 NaN 1
2 2 NaN NaN 5
3 2 3.0 1.0 4

Propagate non-null values backward.

>>> df.groupby(['A']).bfill()
B C D
0 2.0 NaN 0
1 4.0 NaN 1
2 3.0 1.0 5
3 3.0 1.0 4
"""
return self._fillna(method='bfill', limit=limit)

backfill = bfill

def ffill(self, limit=None):
"""
Synonym for `DataFrame.fillna()` with ``method=`ffill```.

Parameters
----------
axis : {0 or `index`}
1 and `columns` are not supported.
inplace : boolean, default False
Fill in place (do not create a new object)
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values to
forward/backward fill. In other words, if there is a gap with more than this number of
consecutive NaNs, it will only be partially filled. If method is not specified,
this is the maximum number of entries along the entire axis where NaNs will be filled.
Must be greater than 0 if not None

Returns
-------
DataFrame
DataFrame with NA entries filled.

Examples
--------
>>> df = ks.DataFrame({
... 'A': [1, 1, 2, 2],
... 'B': [2, 4, None, 3],
... 'C': [None, None, None, 1],
... 'D': [0, 1, 5, 4]
... },
... columns=['A', 'B', 'C', 'D'])
>>> df
A B C D
0 1 2.0 NaN 0
1 1 4.0 NaN 1
2 2 NaN NaN 5
3 2 3.0 1.0 4

Propagate non-null values forward.

>>> df.groupby(['A']).ffill()
B C D
0 2.0 NaN 0
1 4.0 NaN 1
2 NaN NaN 5
3 3.0 1.0 4
"""
return self._fillna(method='ffill', limit=limit)

pad = ffill

# TODO: Series support is not implemented yet.
def transform(self, func):
"""
Apply function column-by-column to the GroupBy object.
Expand Down Expand Up @@ -1386,6 +1552,19 @@ def _cum(self, func):
internal = kdf._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
return DataFrame(internal)

def _fillna(self, *args, **kwargs):
applied = []
kdf = self._kdf
groupkey_columns = [s.name for s in self._groupkeys]

for column in kdf._internal.data_columns:
if column not in groupkey_columns:
applied.append(kdf[column].groupby(self._groupkeys)._fillna(*args, **kwargs))

sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
internal = kdf._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
return DataFrame(internal)


class SeriesGroupBy(GroupBy):

Expand Down Expand Up @@ -1418,6 +1597,10 @@ def _rank(self, *args, **kwargs):
groupkey_scols = [s._scol for s in self._groupkeys]
return Series._rank(self._ks, *args, **kwargs, part_cols=groupkey_scols)

def _fillna(self, *args, **kwargs):
groupkey_scols = [s._scol for s in self._groupkeys]
return Series._fillna(self._ks, *args, **kwargs, part_cols=groupkey_scols)

@property
def _kdf(self) -> DataFrame:
return self._ks._kdf
Expand Down
10 changes: 0 additions & 10 deletions databricks/koalas/missing/groupby.py
Expand Up @@ -34,7 +34,6 @@ class _MissingPandasLikeDataFrameGroupBy(object):
corrwith = unsupported_property('corrwith')
cov = unsupported_property('cov')
dtypes = unsupported_property('dtypes')
fillna = unsupported_property('fillna')
groups = unsupported_property('groups')
hist = unsupported_property('hist')
idxmax = unsupported_property('idxmax')
Expand All @@ -51,20 +50,16 @@ class _MissingPandasLikeDataFrameGroupBy(object):
take = unsupported_property('take', deprecated=True)

# Functions
backfill = unsupported_function('backfill')
bfill = unsupported_function('bfill')
boxplot = unsupported_function('boxplot')
cumcount = unsupported_function('cumcount')
describe = unsupported_function('describe')
expanding = unsupported_function('expanding')
ffill = unsupported_function('ffill')
get_group = unsupported_function('get_group')
head = unsupported_function('head')
median = unsupported_function('median')
ngroup = unsupported_function('ngroup')
nth = unsupported_function('nth')
ohlc = unsupported_function('ohlc')
pad = unsupported_function('pad')
pct_change = unsupported_function('pct_change')
pipe = unsupported_function('pipe')
prod = unsupported_function('prod')
Expand All @@ -81,7 +76,6 @@ class _MissingPandasLikeSeriesGroupBy(object):
corr = unsupported_property('corr')
cov = unsupported_property('cov')
dtype = unsupported_property('dtype')
fillna = unsupported_property('fillna')
groups = unsupported_property('groups')
hist = unsupported_property('hist')
idxmax = unsupported_property('idxmax')
Expand All @@ -103,19 +97,15 @@ class _MissingPandasLikeSeriesGroupBy(object):
take = unsupported_property('take', deprecated=True)

# Functions
backfill = unsupported_function('backfill')
bfill = unsupported_function('bfill')
cumcount = unsupported_function('cumcount')
describe = unsupported_function('describe')
expanding = unsupported_function('expanding')
ffill = unsupported_function('ffill')
get_group = unsupported_function('get_group')
head = unsupported_function('head')
median = unsupported_function('median')
ngroup = unsupported_function('ngroup')
nth = unsupported_function('nth')
ohlc = unsupported_function('ohlc')
pad = unsupported_function('pad')
pct_change = unsupported_function('pct_change')
pipe = unsupported_function('pipe')
prod = unsupported_function('prod')
Expand Down
18 changes: 10 additions & 8 deletions databricks/koalas/series.py
Expand Up @@ -1247,13 +1247,17 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
4 c
Name: x, dtype: object
"""
return self._fillna(value, method, axis, inplace, limit)

def _fillna(self, value=None, method=None, axis=None, inplace=False, limit=None, part_cols=()):
if axis is None:
axis = 0
if not (axis == 0 or axis == "index"):
raise NotImplementedError("fillna currently only works for axis=0 or axis='index'")
if (value is None) and (method is None):
raise ValueError("Must specify a fill 'value' or 'method'.")

raise ValueError("Must specify a fillna 'value' or 'method' parameter.")
if (method is not None) and (method not in ['ffill', 'pad', 'backfill', 'bfill']):
raise ValueError("Expecting 'pad', 'ffill', 'backfill' or 'bfill'.")
if self.isnull().sum() == 0:
if inplace:
self._internal = self._internal.copy()
Expand Down Expand Up @@ -1285,13 +1289,11 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
end = Window.currentRow + limit
else:
end = Window.unboundedFollowing
else:
raise ValueError('Expecting pad, ffill, backfill or bfill.')
window = Window.orderBy(self._internal.index_scols).rowsBetween(begin, end)
scol = F.when(scol.isNull(), func(scol, True).over(window)).otherwise(scol)

kseries = Series(self._kdf._internal.copy(scol=scol), anchor=self._kdf)\
.rename(column_name)
window = Window.partitionBy(*part_cols).orderBy(self._internal.index_scols)\
.rowsBetween(begin, end)
scol = F.when(scol.isNull(), func(scol, True).over(window)).otherwise(scol)
kseries = Series(self._kdf._internal.copy(scol=scol), anchor=self._kdf).rename(column_name)
if inplace:
self._internal = kseries._internal
self._kdf = kseries._kdf
Expand Down
5 changes: 3 additions & 2 deletions databricks/koalas/tests/test_dataframe.py
Expand Up @@ -452,9 +452,10 @@ def test_fillna(self):
kdf.fillna(pd.DataFrame({'x': [-1], 'y': [-1], 'z': [-1]}))
with self.assertRaisesRegex(TypeError, "Unsupported.*numpy.int64"):
kdf.fillna({'x': np.int64(-6), 'y': np.int64(-4), 'z': -5})
with self.assertRaisesRegex(ValueError, "Expecting pad, ffill, backfill or bfill."):
with self.assertRaisesRegex(ValueError, "Expecting 'pad', 'ffill', 'backfill' or 'bfill'."):
kdf.fillna(method='xxx')
with self.assertRaisesRegex(ValueError, "Must specify a fill 'value' or 'method'."):
with self.assertRaisesRegex(ValueError,
"Must specify a fillna 'value' or 'method' parameter."):
kdf.fillna()

def test_isnull(self):
Expand Down