From 0a863849aa6ef5bb38d50704f3c03c7f7ca4e68d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 4 Nov 2025 19:11:48 +0000 Subject: [PATCH 1/3] feat: Add bigframes.pandas.crosstab --- bigframes/core/reshape/api.py | 3 +- bigframes/core/reshape/pivot.py | 89 +++++++++++++++++++ bigframes/dataframe.py | 31 ++++++- bigframes/pandas/__init__.py | 3 +- bigframes/session/__init__.py | 15 ++++ tests/system/small/test_pandas.py | 66 ++++++++++++++ .../pandas/core/reshape/pivot.py | 57 ++++++++++++ 7 files changed, 261 insertions(+), 3 deletions(-) create mode 100644 bigframes/core/reshape/pivot.py create mode 100644 third_party/bigframes_vendored/pandas/core/reshape/pivot.py diff --git a/bigframes/core/reshape/api.py b/bigframes/core/reshape/api.py index 56dbdae77e..adb33427f9 100644 --- a/bigframes/core/reshape/api.py +++ b/bigframes/core/reshape/api.py @@ -15,6 +15,7 @@ from bigframes.core.reshape.concat import concat from bigframes.core.reshape.encoding import get_dummies from bigframes.core.reshape.merge import merge +from bigframes.core.reshape.pivot import crosstab from bigframes.core.reshape.tile import cut, qcut -__all__ = ["concat", "get_dummies", "merge", "cut", "qcut"] +__all__ = ["concat", "get_dummies", "merge", "cut", "qcut", "crosstab"] diff --git a/bigframes/core/reshape/pivot.py b/bigframes/core/reshape/pivot.py new file mode 100644 index 0000000000..8b83cb0fc7 --- /dev/null +++ b/bigframes/core/reshape/pivot.py @@ -0,0 +1,89 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional, TYPE_CHECKING + +import bigframes_vendored.pandas.core.reshape.pivot as vendored_pandas_pivot +import pandas as pd + +import bigframes +from bigframes.core import convert, utils +from bigframes.core.reshape import concat +from bigframes.dataframe import DataFrame + +if TYPE_CHECKING: + import bigframes.session + + +def crosstab( + index, + columns, + values=None, + rownames=None, + colnames=None, + aggfunc=None, + *, + session: Optional[bigframes.session.Session] = None, +) -> DataFrame: + if _is_list_of_lists(index): + index = [ + convert.to_bf_series(subindex, default_index=None, session=session) + for subindex in index + ] + else: + index = [convert.to_bf_series(index, default_index=None, session=session)] + if _is_list_of_lists(columns): + columns = [ + convert.to_bf_series(subcol, default_index=None, session=session) + for subcol in columns + ] + else: + columns = [convert.to_bf_series(columns, default_index=None, session=session)] + + df = concat.concat([*index, *columns], join="inner", axis=1) + # for uniqueness + tmp_index_names = [f"_crosstab_index_{i}" for i in range(len(index))] + tmp_col_names = [f"_crosstab_columns_{i}" for i in range(len(columns))] + df.columns = pd.Index([*tmp_index_names, *tmp_col_names]) + + values = ( + convert.to_bf_series(values, default_index=df.index, session=session) + if values is not None + else 0 + ) + + df["_crosstab_values"] = values + pivot_table = df.pivot_table( + values="_crosstab_values", + index=tmp_index_names, + columns=tmp_col_names, + aggfunc=aggfunc or "count", + sort=False, + ) + pivot_table.index.names = rownames or [i.name for i in index] + pivot_table.columns.names = colnames or [c.name for c in columns] + if aggfunc is None: + # TODO: Push this into pivot_table itself + pivot_table = pivot_table.fillna(0) + return pivot_table + + +def _is_list_of_lists(item) -> bool: + if not utils.is_list_like(item): + return False + return all(convert.can_convert_to_series(subitem) for subitem in item) + + +crosstab.__doc__ = vendored_pandas_pivot.crosstab.__doc__ diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index df8c87416f..8eaea56f34 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3479,7 +3479,34 @@ def pivot_table( ] = None, columns: typing.Union[blocks.Label, Sequence[blocks.Label]] = None, aggfunc: str = "mean", + fill_value=None, + margins: bool = False, + dropna: bool = True, + margins_name: Hashable = "All", + observed: bool = False, + sort: bool = True, ) -> DataFrame: + if fill_value is not None: + raise NotImplementedError( + "DataFrame.pivot_table fill_value arg not supported. {constants.FEEDBACK_LINK}" + ) + if margins: + raise NotImplementedError( + "DataFrame.pivot_table margins arg not supported. {constants.FEEDBACK_LINK}" + ) + if not dropna: + raise NotImplementedError( + "DataFrame.pivot_table dropna arg not supported. {constants.FEEDBACK_LINK}" + ) + if margins_name != "All": + raise NotImplementedError( + "DataFrame.pivot_table margins_name arg not supported. {constants.FEEDBACK_LINK}" + ) + if observed: + raise NotImplementedError( + "DataFrame.pivot_table observed arg not supported. {constants.FEEDBACK_LINK}" + ) + if isinstance(index, Iterable) and not ( isinstance(index, blocks.Label) and index in self.columns ): @@ -3521,7 +3548,9 @@ def pivot_table( columns=columns, index=index, values=values if len(values) > 1 else None, - ).sort_index() + ) + if sort: + pivoted = pivoted.sort_index() # TODO: Remove the reordering step once the issue is resolved. # The pivot_table method results in multi-index columns that are always ordered. diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 6fcb71f0d8..7b633f6dc8 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -31,7 +31,7 @@ import bigframes.core.blocks import bigframes.core.global_session as global_session import bigframes.core.indexes -from bigframes.core.reshape.api import concat, cut, get_dummies, merge, qcut +from bigframes.core.reshape.api import concat, crosstab, cut, get_dummies, merge, qcut import bigframes.core.tools import bigframes.dataframe import bigframes.enums @@ -372,6 +372,7 @@ def reset_session(): _functions = [ clean_up_by_session_id, concat, + crosstab, cut, deploy_remote_function, deploy_udf, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 6418f2b78f..3cb9d2bb68 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -2312,6 +2312,21 @@ def cut(self, *args, **kwargs) -> bigframes.series.Series: **kwargs, ) + def crosstab(self, *args, **kwargs) -> dataframe.DataFrame: + """Compute a simple cross tabulation of two (or more) factors. + + Included for compatibility between bpd and Session. + + See :func:`bigframes.pandas.crosstab` for full documentation. + """ + import bigframes.core.reshape.pivot + + return bigframes.core.reshape.pivot.crosstab( + *args, + session=self, + **kwargs, + ) + def DataFrame(self, *args, **kwargs): """Constructs a DataFrame. diff --git a/tests/system/small/test_pandas.py b/tests/system/small/test_pandas.py index d2cde59729..a729fac4d3 100644 --- a/tests/system/small/test_pandas.py +++ b/tests/system/small/test_pandas.py @@ -454,6 +454,72 @@ def test_merge_raises_error_when_left_right_on_set(scalars_dfs): ) +def test_crosstab_aligned_series(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + pd_result = pd.crosstab( + scalars_pandas_df["int64_col"], scalars_pandas_df["int64_too"] + ) + bf_result = bpd.crosstab( + scalars_df["int64_col"], scalars_df["int64_too"] + ).to_pandas() + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + +def test_crosstab_nondefault_func(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + pd_result = pd.crosstab( + scalars_pandas_df["int64_col"], + scalars_pandas_df["int64_too"], + values=scalars_pandas_df["float64_col"], + aggfunc="mean", + ) + bf_result = bpd.crosstab( + scalars_df["int64_col"], + scalars_df["int64_too"], + values=scalars_df["float64_col"], + aggfunc="mean", + ).to_pandas() + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + +def test_crosstab_multi_cols(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + pd_result = pd.crosstab( + [scalars_pandas_df["int64_col"], scalars_pandas_df["bool_col"]], + [scalars_pandas_df["int64_too"], scalars_pandas_df["string_col"]], + rownames=["a", "b"], + colnames=["c", "d"], + ) + bf_result = bpd.crosstab( + [scalars_df["int64_col"], scalars_df["bool_col"]], + [scalars_df["int64_too"], scalars_df["string_col"]], + rownames=["a", "b"], + colnames=["c", "d"], + ).to_pandas() + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + +def test_crosstab_unaligned_series(scalars_dfs, session): + scalars_df, scalars_pandas_df = scalars_dfs + other_pd_series = pd.Series( + [10, 20, 10, 30, 10], index=[5, 4, 1, 2, 3], dtype="Int64", name="nums" + ) + other_bf_series = session.Series( + [10, 20, 10, 30, 10], index=[5, 4, 1, 2, 3], name="nums" + ) + + pd_result = pd.crosstab(scalars_pandas_df["int64_col"], other_pd_series) + bf_result = bpd.crosstab(scalars_df["int64_col"], other_bf_series).to_pandas() + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + def _convert_pandas_category(pd_s: pd.Series): """ Transforms a pandas Series with Categorical dtype into a bigframes-compatible diff --git a/third_party/bigframes_vendored/pandas/core/reshape/pivot.py b/third_party/bigframes_vendored/pandas/core/reshape/pivot.py new file mode 100644 index 0000000000..8cc33525a4 --- /dev/null +++ b/third_party/bigframes_vendored/pandas/core/reshape/pivot.py @@ -0,0 +1,57 @@ +# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/reshape/pivot.py +from __future__ import annotations + +from bigframes import constants + + +def crosstab( + index, + columns, + values=None, + rownames=None, + colnames=None, + aggfunc=None, +): + """ + Compute a simple cross tabulation of two (or more) factors. + + By default, computes a frequency table of the factors unless an + array of values and an aggregation function are passed. + + **Examples:** + >>> a = np.array(["foo", "foo", "foo", "foo", "bar", "bar", + ... "bar", "bar", "foo", "foo", "foo"], dtype=object) + >>> b = np.array(["one", "one", "one", "two", "one", "one", + ... "one", "two", "two", "two", "one"], dtype=object) + >>> c = np.array(["dull", "dull", "shiny", "dull", "dull", "shiny", + ... "shiny", "dull", "shiny", "shiny", "shiny"], + ... dtype=object) + >>> bpd.crosstab(a, [b, c], rownames=['a'], colnames=['b', 'c']) + b one two + c dull shiny dull shiny + a + bar 1 2 1 0 + foo 2 2 1 2 + + [2 rows x 4 columns] + + Args: + index (array-like, Series, or list of arrays/Series): + Values to group by in the rows. + columns (array-like, Series, or list of arrays/Series): + Values to group by in the columns. + values (array-like, optional): + Array of values to aggregate according to the factors. + Requires `aggfunc` be specified. + rownames (sequence, default None): + If passed, must match number of row arrays passed. + colnames (sequence, default None): + If passed, must match number of column arrays passed. + aggfunc (function, optional): + If specified, requires `values` be specified as well. + + Returns: + DataFrame: + Cross tabulation of the data. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 68e3d69c0559d3adfad5fdfc7ea2076250bd8cd4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 7 Nov 2025 21:34:15 +0000 Subject: [PATCH 2/3] modify finicky test --- tests/unit/session/test_io_bigquery.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 57ac3d88f7..6affa4fb49 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -158,15 +158,21 @@ def test_create_job_configs_labels_length_limit_met(): df.max() api_methods = log_adapter._api_methods + assert set(api_methods) == { + "dataframe-max", + "dataframe-head", + "dataframe-__init__", + "session-__init__", + } labels = io_bq.create_job_configs_labels( job_configs_labels=cur_labels, api_methods=api_methods ) assert labels is not None - assert len(labels) == 56 assert "dataframe-max" in labels.values() assert "dataframe-head" not in labels.values() assert "bigframes-api" in labels.keys() assert "source" in labels.keys() + assert len(labels) == 56 def test_add_and_trim_labels_length_limit_met(): From c2e445084a883684b108c9ce347ca107926fd45a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 7 Nov 2025 21:52:34 +0000 Subject: [PATCH 3/3] remove failing test --- tests/unit/session/test_io_bigquery.py | 36 -------------------------- 1 file changed, 36 deletions(-) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 6affa4fb49..585cdd5073 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -139,42 +139,6 @@ def test_create_job_configs_labels_length_limit_met_and_labels_is_none(): assert "dataframe-head" in labels.values() -def test_create_job_configs_labels_length_limit_met(): - log_adapter.get_and_reset_api_methods() - cur_labels = { - "bigframes-api": "read_pandas", - "source": "bigquery-dataframes-temp", - } - for i in range(53): - key = f"bigframes-api-test-{i}" - value = f"test{i}" - cur_labels[key] = value - # If cur_labels length is 62, we can only add one label from api_methods - df = bpd.DataFrame( - {"col1": [1, 2], "col2": [3, 4]}, session=mocks.create_bigquery_session() - ) - # Test running two methods - df.head() - df.max() - api_methods = log_adapter._api_methods - - assert set(api_methods) == { - "dataframe-max", - "dataframe-head", - "dataframe-__init__", - "session-__init__", - } - labels = io_bq.create_job_configs_labels( - job_configs_labels=cur_labels, api_methods=api_methods - ) - assert labels is not None - assert "dataframe-max" in labels.values() - assert "dataframe-head" not in labels.values() - assert "bigframes-api" in labels.keys() - assert "source" in labels.keys() - assert len(labels) == 56 - - def test_add_and_trim_labels_length_limit_met(): log_adapter.get_and_reset_api_methods() cur_labels = {