diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 61dcd778ef..be4211a2fc 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1000,6 +1000,31 @@ def test_apply_series_scalar_callable( pandas.testing.assert_series_equal(bf_result, pd_result) +def test_df_pipe( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(x: int, y: int, df): + return (df + x) % y + + bf_result = ( + scalars_df_index[columns] + .pipe((foo, "df"), x=7, y=9) + .pipe(lambda x: x**2) + .to_pandas() + ) + + pd_result = ( + scalars_pandas_df_index[columns] + .pipe((foo, "df"), x=7, y=9) + .pipe(lambda x: x**2) + ) + + pandas.testing.assert_frame_equal(bf_result, pd_result) + + def test_df_keys( scalars_df_index, scalars_pandas_df_index, diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 37b4f8c1de..f5c5b1c216 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -3203,3 +3203,28 @@ def test_apply_not_supported(scalars_dfs, col, lambda_, exception): bf_col = scalars_df[col] with pytest.raises(exception): bf_col.apply(lambda_, by_row=False) + + +def test_series_pipe( + scalars_df_index, + scalars_pandas_df_index, +): + column = "int64_too" + + def foo(x: int, y: int, df): + return (df + x) % y + + bf_result = ( + scalars_df_index[column] + .pipe((foo, "df"), x=7, y=9) + .pipe(lambda x: x**2) + .to_pandas() + ) + + pd_result = ( + scalars_pandas_df_index[column] + .pipe((foo, "df"), x=7, y=9) + .pipe(lambda x: x**2) + ) + + assert_series_equal(bf_result, pd_result) diff --git a/third_party/bigframes_vendored/pandas/core/common.py b/third_party/bigframes_vendored/pandas/core/common.py new file mode 100644 index 0000000000..ded5a22b8f --- /dev/null +++ b/third_party/bigframes_vendored/pandas/core/common.py @@ -0,0 +1,42 @@ +# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/common.py +from __future__ import annotations + +from typing import Callable, TYPE_CHECKING + +if TYPE_CHECKING: + from bigframes_vendored.pandas.pandas._typing import T + + +def pipe( + obj, func: Callable[..., T] | tuple[Callable[..., T], str], *args, **kwargs +) -> T: + """ + Apply a function ``func`` to object ``obj`` either by passing obj as the + first argument to the function or, in the case that the func is a tuple, + interpret the first element of the tuple as a function and pass the obj to + that function as a keyword argument whose key is the value of the second + element of the tuple. + + Args: + func (callable or tuple of (callable, str)): + Function to apply to this object or, alternatively, a + ``(callable, data_keyword)`` tuple where ``data_keyword`` is a + string indicating the keyword of ``callable`` that expects the + object. + args (iterable, optional): + Positional arguments passed into ``func``. + kwargs (dict, optional): + A dictionary of keyword arguments passed into ``func``. + + Returns: + object: the return type of ``func``. + """ + if isinstance(func, tuple): + func, target = func + if target in kwargs: + msg = f"{target} is both the pipe target and a keyword argument" + raise ValueError(msg) + kwargs[target] = obj + return func(*args, **kwargs) + else: + return func(obj, *args, **kwargs) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 01d8f7a174..7f8e1f7b53 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -1,12 +1,16 @@ # Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/generic.py from __future__ import annotations -from typing import Iterator, Literal, Optional +from typing import Callable, Iterator, Literal, Optional, TYPE_CHECKING from bigframes_vendored.pandas.core import indexing +import bigframes_vendored.pandas.core.common as common from bigframes import constants +if TYPE_CHECKING: + from bigframes_vendored.pandas.pandas._typing import T + class NDFrame(indexing.IndexingMixin): """ @@ -963,6 +967,105 @@ def expanding(self, min_periods=1): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def pipe( + self, + func: Callable[..., T] | tuple[Callable[..., T], str], + *args, + **kwargs, + ) -> T: + """ + Apply chainable functions that expect Series or DataFrames. + + **Examples:** + + Constructing a income DataFrame from a dictionary. + + >>> import bigframes.pandas as bpd + >>> import numpy as np + >>> bpd.options.display.progress_bar = None + + >>> data = [[8000, 1000], [9500, np.nan], [5000, 2000]] + >>> df = bpd.DataFrame(data, columns=['Salary', 'Others']) + >>> df + Salary Others + 0 8000 1000.0 + 1 9500 + 2 5000 2000.0 + + [3 rows x 2 columns] + + Functions that perform tax reductions on an income DataFrame. + + >>> def subtract_federal_tax(df): + ... return df * 0.9 + >>> def subtract_state_tax(df, rate): + ... return df * (1 - rate) + >>> def subtract_national_insurance(df, rate, rate_increase): + ... new_rate = rate + rate_increase + ... return df * (1 - new_rate) + + Instead of writing + + >>> subtract_national_insurance( + ... subtract_state_tax(subtract_federal_tax(df), rate=0.12), + ... rate=0.05, + ... rate_increase=0.02) # doctest: +SKIP + + You can write + + >>> ( + ... df.pipe(subtract_federal_tax) + ... .pipe(subtract_state_tax, rate=0.12) + ... .pipe(subtract_national_insurance, rate=0.05, rate_increase=0.02) + ... ) + Salary Others + 0 5892.48 736.56 + 1 6997.32 + 2 3682.8 1473.12 + + [3 rows x 2 columns] + + If you have a function that takes the data as (say) the second + argument, pass a tuple indicating which keyword expects the + data. For example, suppose ``national_insurance`` takes its data as ``df`` + in the second argument: + + >>> def subtract_national_insurance(rate, df, rate_increase): + ... new_rate = rate + rate_increase + ... return df * (1 - new_rate) + >>> ( + ... df.pipe(subtract_federal_tax) + ... .pipe(subtract_state_tax, rate=0.12) + ... .pipe( + ... (subtract_national_insurance, 'df'), + ... rate=0.05, + ... rate_increase=0.02 + ... ) + ... ) + Salary Others + 0 5892.48 736.56 + 1 6997.32 + 2 3682.8 1473.12 + + [3 rows x 2 columns] + + Args: + func (function): + Function to apply to this object. + ``args``, and ``kwargs`` are passed into ``func``. + Alternatively a ``(callable, data_keyword)`` tuple where + ``data_keyword`` is a string indicating the keyword of + ``callable`` that expects this object. + args (iterable, optional): + Positional arguments passed into ``func``. + kwargs (mapping, optional): + A dictionary of keyword arguments passed into ``func``. + + Returns: + same type as caller + """ + return common.pipe(self, func, *args, **kwargs) + def __nonzero__(self): raise ValueError( f"The truth value of a {type(self).__name__} is ambiguous. "