Skip to content

Commit

Permalink
Add DataFrame and Series median method (#9483)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Sep 13, 2022
1 parent d20c729 commit 142de26
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 15 deletions.
50 changes: 50 additions & 0 deletions dask/dataframe/core.py
Expand Up @@ -2162,6 +2162,34 @@ def mean(
result.divisions = (self.columns.min(), self.columns.max())
return handle_out(out, result)

def median_approximate(
self,
axis=None,
method="default",
):
"""Return the approximate median of the values over the requested axis.
Parameters
----------
axis : {0, 1, "index", "columns"} (default 0)
0 or ``"index"`` for row-wise, 1 or ``"columns"`` for column-wise
method : {'default', 'tdigest', 'dask'}, optional
What method to use. By default will use Dask's internal custom
algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest
for floats and ints and fallback to the ``"dask"`` otherwise.
"""
return self.quantile(q=0.5, axis=axis, method=method).rename(None)

@derived_from(pd.DataFrame)
def median(self, axis=None, method="default"):
if axis in (1, "columns") or self.npartitions == 1:
# Can provide an exact median in these cases
return self.median_approximate(axis=axis, method=method)
raise NotImplementedError(
"Dask doesn't implement an exact median in all cases as this is hard to do in parallel. "
"See the `median_approximate` method instead, which uses an approximate algorithm."
)

@_numeric_only
@derived_from(pd.DataFrame)
def var(
Expand Down Expand Up @@ -3631,6 +3659,28 @@ def quantile(self, q=0.5, method="default"):
"""
return quantile(self, q, method=method)

def median_approximate(self, method="default"):
"""Return the approximate median of the values over the requested axis.
Parameters
----------
method : {'default', 'tdigest', 'dask'}, optional
What method to use. By default will use Dask's internal custom
algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest
for floats and ints and fallback to the ``"dask"`` otherwise.
"""
return self.quantile(q=0.5, method=method)

@derived_from(pd.Series)
def median(self, method="default"):
if self.npartitions == 1:
# Can provide an exact median in these cases
return self.median_approximate(method=method)
raise NotImplementedError(
"Dask doesn't implement an exact median in all cases as this is hard to do in parallel. "
"See the `median_approximate` method instead, which uses an approximate algorithm."
)

def _repartition_quantiles(self, npartitions, upsample=1.0):
"""Approximate quantiles of Series used for repartitioning"""
from dask.dataframe.partitionquantiles import partition_quantiles
Expand Down
105 changes: 90 additions & 15 deletions dask/dataframe/tests/test_dataframe.py
Expand Up @@ -41,6 +41,11 @@
from dask.utils import M, is_dataframe_like, is_series_like, put_lines
from dask.utils_test import _check_warning, hlg_layer

try:
import crick
except ImportError:
crick = None

dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
Expand Down Expand Up @@ -393,11 +398,17 @@ def test_rename_series_method_2():


@pytest.mark.parametrize(
"method,test_values", [("tdigest", (6, 10)), ("dask", (4, 20))]
"method,test_values",
[
pytest.param(
"tdigest",
(6, 10),
marks=pytest.mark.skipif(not crick, reason="Requires crick"),
),
("dask", (4, 20)),
],
)
def test_describe_numeric(method, test_values):
if method == "tdigest":
pytest.importorskip("crick")
# prepare test case which approx quantiles will be the same as actuals
s = pd.Series(list(range(test_values[1])) * test_values[0])
df = pd.DataFrame(
Expand Down Expand Up @@ -1365,11 +1376,16 @@ def test_nbytes():

@pytest.mark.parametrize(
"method,expected",
[("tdigest", (0.35, 3.80, 2.5, 6.5, 2.0)), ("dask", (0.0, 4.0, 1.2, 6.2, 2.0))],
[
pytest.param(
"tdigest",
(0.35, 3.80, 2.5, 6.5, 2.0),
marks=pytest.mark.skipif(not crick, reason="Requires crick"),
),
("dask", (0.0, 4.0, 1.2, 6.2, 2.0)),
],
)
def test_quantile(method, expected):
if method == "tdigest":
pytest.importorskip("crick")
# series / multiple
result = d.b.quantile([0.3, 0.7], method=method)

Expand Down Expand Up @@ -1408,10 +1424,16 @@ def test_quantile(method, expected):
assert result == expected[4]


@pytest.mark.parametrize("method", ["tdigest", "dask"])
@pytest.mark.parametrize(
"method",
[
pytest.param(
"tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick")
),
"dask",
],
)
def test_quantile_missing(method):
if method == "tdigest":
pytest.importorskip("crick")
df = pd.DataFrame({"A": [0, np.nan, 2]})
# TODO: Test npartitions=2
# (see https://github.com/dask/dask/issues/9227)
Expand All @@ -1425,10 +1447,16 @@ def test_quantile_missing(method):
assert_eq(result, expected)


@pytest.mark.parametrize("method", ["tdigest", "dask"])
@pytest.mark.parametrize(
"method",
[
pytest.param(
"tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick")
),
"dask",
],
)
def test_empty_quantile(method):
if method == "tdigest":
pytest.importorskip("crick")
result = d.b.quantile([], method=method)
exp = full.b.quantile([])
assert result.divisions == (None, None)
Expand All @@ -1445,7 +1473,7 @@ def test_empty_quantile(method):
@pytest.mark.parametrize(
"method,expected",
[
(
pytest.param(
"tdigest",
(
pd.Series([9.5, 29.5, 19.5], index=["A", "X", "B"]),
Expand All @@ -1455,6 +1483,7 @@ def test_empty_quantile(method):
columns=["A", "X", "B"],
),
),
marks=pytest.mark.skipif(not crick, reason="Requires crick"),
),
(
"dask",
Expand All @@ -1470,8 +1499,6 @@ def test_empty_quantile(method):
],
)
def test_dataframe_quantile(method, expected):
if method == "tdigest":
pytest.importorskip("crick")
# column X is for test column order and result division
df = pd.DataFrame(
{
Expand Down Expand Up @@ -4335,6 +4362,54 @@ def test_dataframe_mode():
assert_eq(ddf.mode(), df.mode(), check_index=False)


def test_median():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.1, 2.2, 3.3, 4.4, 5.5]})
ddf = dd.from_pandas(df, npartitions=3)

# Exact medians work when `axis=1` or when there's only have a single partition
assert_eq(ddf.median(axis=1), df.median(axis=1))
ddf_single = dd.from_pandas(df, npartitions=1)
assert_eq(ddf_single.median(axis=1), df.median(axis=1))
assert_eq(ddf_single.median(axis=0), df.median(axis=0))
assert_eq(ddf_single.x.median(), df.x.median())

# Ensure `median` redirects to `median_approximate` appropriately
for axis in [None, 0, "rows"]:
with pytest.raises(
NotImplementedError, match="See the `median_approximate` method instead"
):
ddf.median(axis=axis)

with pytest.raises(
NotImplementedError, match="See the `median_approximate` method instead"
):
ddf.x.median()


@pytest.mark.parametrize(
"method",
[
"dask",
pytest.param(
"tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick")
),
],
)
def test_median_approximate(method):
df = pd.DataFrame({"x": range(100), "y": range(100, 200)})
ddf = dd.from_pandas(df, npartitions=10)
if PANDAS_GT_110:
assert_eq(
ddf.median_approximate(method=method),
df.median(),
atol=1,
)
else:
result = ddf.median_approximate(method=method)
expected = df.median()
assert ((result - expected).abs() < 1).all().compute()


def test_datetime_loc_open_slicing():
dtRange = pd.date_range("01.01.2015", "05.05.2015")
df = pd.DataFrame(np.random.random((len(dtRange), 2)), index=dtRange)
Expand Down
4 changes: 4 additions & 0 deletions docs/source/dataframe-api.rst
Expand Up @@ -73,6 +73,8 @@ Dataframe
DataFrame.mask
DataFrame.max
DataFrame.mean
DataFrame.median
DataFrame.median_approximate
DataFrame.melt
DataFrame.memory_usage
DataFrame.memory_usage_per_partition
Expand Down Expand Up @@ -203,6 +205,8 @@ Series
Series.mask
Series.max
Series.mean
Series.median
Series.median_approximate
Series.memory_usage
Series.memory_usage_per_partition
Series.min
Expand Down

0 comments on commit 142de26

Please sign in to comment.