From 0220ef5f8c9d57b2b9ac6fe05b8be6d67efa225a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 20 May 2024 23:53:11 +0000 Subject: [PATCH 01/15] refactor: label all apis that use total ordering --- bigframes/_config/bigquery_options.py | 6 ++++ bigframes/core/api_helpers.py | 45 +++++++++++++++++++++++++++ bigframes/core/groupby/__init__.py | 18 +++++++++++ bigframes/dataframe.py | 36 +++++++++++++++++++++ bigframes/series.py | 30 ++++++++++++++++++ bigframes/session/__init__.py | 1 + 6 files changed, 136 insertions(+) create mode 100644 bigframes/core/api_helpers.py diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 6f841a36b3..f6178b856f 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -70,6 +70,7 @@ def __init__( application_name: Optional[str] = None, kms_key_name: Optional[str] = None, skip_bq_connection_check: bool = False, + strict_ordering: bool = True, ): self._credentials = credentials self._project = project @@ -80,6 +81,7 @@ def __init__( self._kms_key_name = kms_key_name self._skip_bq_connection_check = skip_bq_connection_check self._session_started = False + self._strict_ordering = True @property def application_name(self) -> Optional[str]: @@ -235,3 +237,7 @@ def kms_key_name(self, value: str): raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="kms_key_name")) self._kms_key_name = value + + @property + def strict_ordering(self) -> bool: + return self._strict_ordering diff --git a/bigframes/core/api_helpers.py b/bigframes/core/api_helpers.py new file mode 100644 index 0000000000..5736244160 --- /dev/null +++ b/bigframes/core/api_helpers.py @@ -0,0 +1,45 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""DataFrame is a two dimensional data structure.""" + +from __future__ import annotations + +import functools +from typing import Protocol, TYPE_CHECKING + +if TYPE_CHECKING: + from bigframes import Session + + +class HasSession(Protocol): + @property + def _session(self) -> Session: + ... + + +def requires_strict_ordering(where={}): + def decorator(meth): + # TODO :SUpport where + @functools.wraps(meth) + def guarded_meth(object: HasSession, *args, **kwargs): + if not object._session._strict_ordering: + raise NotImplementedError( + f"Op {meth.__name__} not yet supported when strict ordering is disabled." + ) + return meth(object, *args, **kwargs) + + return guarded_meth + + return decorator diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 41d0750030..d8d4fc4eb4 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -23,6 +23,7 @@ import bigframes.constants as constants from bigframes.core import log_adapter import bigframes.core as core +from bigframes.core.api_helpers import requires_strict_ordering import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.ordering as order @@ -203,20 +204,25 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) + @requires_strict_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("cumsum") return self._apply_window_op(agg_ops.sum_op, numeric_only=True) + @requires_strict_ordering() def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only) + @requires_strict_ordering() def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only) + @requires_strict_ordering() def cumprod(self, *args, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.product_op, numeric_only=True) + @requires_strict_ordering() def shift(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -225,6 +231,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) + @requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -233,6 +240,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) + @requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -248,6 +256,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) + @requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), @@ -563,26 +572,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: aggregate = agg + @requires_strict_ordering() def cumsum(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.sum_op, ) + @requires_strict_ordering() def cumprod(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.product_op, ) + @requires_strict_ordering() def cummax(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.max_op, ) + @requires_strict_ordering() def cummin(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.min_op, ) + @requires_strict_ordering() def cumcount(self, *args, **kwargs) -> series.Series: return ( self._apply_window_op( @@ -592,6 +606,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: - 1 ) + @requires_strict_ordering() def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = window_specs.rows( @@ -601,6 +616,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) + @requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -609,6 +625,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) + @requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -628,6 +645,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: is_series=True, ) + @requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 105588de2f..ca03eabeda 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -51,6 +51,7 @@ import bigframes.constants as constants import bigframes.core from bigframes.core import log_adapter +from bigframes.core.api_helpers import requires_strict_ordering import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.convert @@ -276,10 +277,12 @@ def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property + @requires_strict_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property + @requires_strict_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @@ -336,13 +339,19 @@ def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property + @requires_strict_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) @requires_index + @requires_strict_ordering() def transpose(self) -> DataFrame: return self.T + @property + def _strict_ordering(self) -> bool: + return self._session._strict_ordering + def __len__(self): rows, _ = self.shape return rows @@ -1283,6 +1292,7 @@ def copy(self) -> DataFrame: def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n]) + @requires_strict_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) @@ -1317,6 +1327,8 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: ) return maybe_result.set_axis(self._block.column_labels, axis=1, copy=False) + # enforce keep == all + @requires_strict_ordering() def nlargest( self, n: int, @@ -1328,6 +1340,8 @@ def nlargest( column_ids = self._sql_names(columns) return DataFrame(block_ops.nlargest(self._block, n, column_ids, keep=keep)) + # enforce keep == all + @requires_strict_ordering() def nsmallest( self, n: int, @@ -1490,6 +1504,7 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) + @requires_strict_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): @@ -1887,6 +1902,7 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) + @requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": @@ -1912,10 +1928,12 @@ def replace( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) ) + @requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) + @requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -2181,13 +2199,16 @@ def agg( aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) @requires_index + @requires_strict_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) @requires_index + @requires_strict_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) + @requires_strict_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, @@ -2292,6 +2313,7 @@ def _pivot( return DataFrame(pivot_block) @requires_index + @requires_strict_ordering() def pivot( self, *, @@ -2306,6 +2328,7 @@ def pivot( return self._pivot(columns=columns, index=index, values=values) @requires_index + @requires_strict_ordering() def pivot_table( self, values: typing.Optional[ @@ -2366,6 +2389,7 @@ def pivot_table( # Sort and reorder. return pivoted[pivoted.columns.sort_values()] + @requires_strict_ordering() def stack(self, level: LevelsType = -1): if not isinstance(self.columns, pandas.MultiIndex): if level not in [0, -1, self.columns.name]: @@ -2405,6 +2429,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) @requires_index + @requires_strict_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2615,6 +2640,7 @@ def _perform_join_by_index( block, _ = self._block.join(other._block, how=how, block_identity_join=True) return DataFrame(block) + @requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_def = window_spec.rows( @@ -2624,6 +2650,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_def, self._block.value_columns ) + @requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = window_spec.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( @@ -2726,6 +2753,7 @@ def notna(self) -> DataFrame: notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna) + @requires_strict_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2738,6 +2766,7 @@ def cumsum(self): window_spec.cumulative_rows(), ) + @requires_strict_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2750,18 +2779,21 @@ def cumprod(self) -> DataFrame: window_spec.cumulative_rows(), ) + @requires_strict_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, window_spec.cumulative_rows(), ) + @requires_strict_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, window_spec.cumulative_rows(), ) + @requires_strict_ordering() def shift(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2769,6 +2801,7 @@ def shift(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) + @requires_strict_ordering() def diff(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2776,6 +2809,7 @@ def diff(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) + @requires_strict_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() @@ -2793,6 +2827,7 @@ def _apply_window_op( ) return DataFrame(block.select_columns(result_ids)) + @requires_strict_ordering() def sample( self, n: Optional[int] = None, @@ -3528,6 +3563,7 @@ def _optimize_query_complexity(self): _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") + @requires_strict_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError( diff --git a/bigframes/series.py b/bigframes/series.py index 4595164e80..523b05ddfa 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -34,6 +34,7 @@ import bigframes.constants as constants import bigframes.core from bigframes.core import log_adapter +from bigframes.core.api_helpers import requires_strict_ordering import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.expression as ex @@ -91,10 +92,12 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property + @requires_strict_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property + @requires_strict_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @@ -159,6 +162,7 @@ def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) @property + @requires_strict_ordering() def T(self) -> Series: return self.transpose() @@ -170,6 +174,7 @@ def _info_axis(self) -> indexes.Index: def _session(self) -> bigframes.Session: return self._get_block().expr.session + @requires_strict_ordering() def transpose(self) -> Series: return self @@ -265,6 +270,7 @@ def equals( return False return block_ops.equals(self._block, other._block) + @requires_strict_ordering() def reset_index( self, *, @@ -447,11 +453,13 @@ def case_when(self, caselist) -> Series: ignore_self=True, ) + @requires_strict_ordering() def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.cumulative_rows() ) + @requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -459,25 +467,30 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill) + @requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) + @requires_strict_ordering() def cummax(self) -> Series: return self._apply_window_op( agg_ops.max_op, bigframes.core.window_spec.cumulative_rows() ) + @requires_strict_ordering() def cummin(self) -> Series: return self._apply_window_op( agg_ops.min_op, bigframes.core.window_spec.cumulative_rows() ) + @requires_strict_ordering() def cumprod(self) -> Series: return self._apply_window_op( agg_ops.product_op, bigframes.core.window_spec.cumulative_rows() ) + @requires_strict_ordering() def shift(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -485,6 +498,7 @@ def shift(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) + @requires_strict_ordering() def diff(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -492,11 +506,13 @@ def diff(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) + @requires_strict_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods)) + @requires_strict_ordering() def rank( self, axis=0, @@ -588,6 +604,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) + @requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -610,12 +627,15 @@ def dropna( result = result.reset_index() return Series(result) + @requires_strict_ordering() def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n]) + @requires_strict_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) + @requires_strict_ordering(where={"keep": ["first", "last"]}) def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") @@ -623,6 +643,7 @@ def nlargest(self, n: int = 5, keep: str = "first") -> Series: block_ops.nlargest(self._block, n, [self._value_column], keep=keep) ) + @requires_strict_ordering(where={"keep": ["first", "last"]}) def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") @@ -1063,6 +1084,7 @@ def clip(self, lower, upper): ) return Series(block.select_column(result_id).with_column_labels([self.name])) + @requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1075,6 +1097,7 @@ def argmax(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) + @requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1140,12 +1163,14 @@ def idxmin(self) -> blocks.Label: return indexes.Index(block).to_pandas()[0] @property + @requires_strict_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property + @requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) @@ -1251,6 +1276,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: block = block.order_by(ordering) return Series(block) + @requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = bigframes.core.window_spec.rows( @@ -1260,6 +1286,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_spec, self._block.value_columns, is_series=True ) + @requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = bigframes.core.window_spec.cumulative_rows( min_periods=min_periods @@ -1527,9 +1554,11 @@ def drop_duplicates(self, *, keep: str = "first") -> Series: block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) + @requires_strict_ordering() def unique(self) -> Series: return self.drop_duplicates() + @requires_strict_ordering(where={"keep": ["first", "last"]}) def duplicated(self, keep: str = "first") -> Series: block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep @@ -1696,6 +1725,7 @@ def map( result_df = self_df.join(map_df, on="series") return result_df[self.name] + @requires_strict_ordering() def sample( self, n: Optional[int] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index ccdc3c5eeb..f40c430cde 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -267,6 +267,7 @@ def __init__( # at the same time in the same region self._session_id: str = "session" + secrets.token_hex(3) self._table_ids: List[str] = [] + self._strict_ordering = context._strict_ordering # store table ids and delete them when the session is closed @property From db91e4b4b6f65d7c245bc8582d2d9f2a2cfd809b Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 19 Jun 2024 00:48:35 +0000 Subject: [PATCH 02/15] reorganize flags --- bigframes/_config/bigquery_options.py | 10 ++++---- bigframes/core/groupby/__init__.py | 2 +- bigframes/core/indexes/base.py | 7 ++++++ .../core/{api_helpers.py => validate.py} | 9 +++++--- bigframes/dataframe.py | 3 +-- bigframes/exceptions.py | 4 ++++ bigframes/series.py | 2 +- bigframes/session/__init__.py | 11 +++++---- tests/system/conftest.py | 4 +--- tests/system/small/test_unordered.py | 23 +++++++++++++++++++ 10 files changed, 57 insertions(+), 18 deletions(-) rename bigframes/core/{api_helpers.py => validate.py} (79%) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index f6178b856f..24285cc931 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -70,7 +70,8 @@ def __init__( application_name: Optional[str] = None, kms_key_name: Optional[str] = None, skip_bq_connection_check: bool = False, - strict_ordering: bool = True, + *, + _strictly_ordered: bool = True, ): self._credentials = credentials self._project = project @@ -81,7 +82,7 @@ def __init__( self._kms_key_name = kms_key_name self._skip_bq_connection_check = skip_bq_connection_check self._session_started = False - self._strict_ordering = True + self._strictly_ordered_internal = _strictly_ordered @property def application_name(self) -> Optional[str]: @@ -239,5 +240,6 @@ def kms_key_name(self, value: str): self._kms_key_name = value @property - def strict_ordering(self) -> bool: - return self._strict_ordering + def _strictly_ordered(self) -> bool: + """Internal use only. Controls whether total row order is always maintained for DataFrame/Series.""" + return self._strictly_ordered_internal diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 9bf903a146..764841c082 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -23,11 +23,11 @@ import bigframes.constants as constants from bigframes.core import log_adapter import bigframes.core as core -from bigframes.core.api_helpers import requires_strict_ordering import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.ordering as order import bigframes.core.utils as utils +from bigframes.core.validate import requires_strict_ordering import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 8df6155591..9571760b20 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -30,6 +30,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as order import bigframes.core.utils as utils +from bigframes.core.validate import requires_strict_ordering import bigframes.dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops @@ -179,6 +180,7 @@ def empty(self) -> bool: return self.shape[0] == 0 @property + @requires_strict_ordering() def is_monotonic_increasing(self) -> bool: """ Return a boolean if the values are equal or increasing. @@ -192,6 +194,7 @@ def is_monotonic_increasing(self) -> bool: ) @property + @requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: """ Return a boolean if the values are equal or decreasing. @@ -341,6 +344,7 @@ def max(self) -> typing.Any: def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op) + @requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -353,6 +357,7 @@ def argmax(self) -> int: return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0]) + @requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -423,6 +428,8 @@ def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index: result = block_ops.dropna(self._block, self._block.index_columns, how=how) return Index(result) + # TODO: keep="all" does not require ordering + @requires_strict_ordering() def drop_duplicates(self, *, keep: str = "first") -> Index: block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep) return Index(block) diff --git a/bigframes/core/api_helpers.py b/bigframes/core/validate.py similarity index 79% rename from bigframes/core/api_helpers.py rename to bigframes/core/validate.py index 5736244160..4122e79991 100644 --- a/bigframes/core/api_helpers.py +++ b/bigframes/core/validate.py @@ -19,6 +19,9 @@ import functools from typing import Protocol, TYPE_CHECKING +import bigframes.constants +import bigframes.exceptions + if TYPE_CHECKING: from bigframes import Session @@ -31,12 +34,12 @@ def _session(self) -> Session: def requires_strict_ordering(where={}): def decorator(meth): - # TODO :SUpport where + # TODO: Support where clause to guard certain parameterizations @functools.wraps(meth) def guarded_meth(object: HasSession, *args, **kwargs): if not object._session._strict_ordering: - raise NotImplementedError( - f"Op {meth.__name__} not yet supported when strict ordering is disabled." + raise bigframes.exceptions.OrderRequiredError( + f"Op {meth.__name__} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" ) return meth(object, *args, **kwargs) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 5541c77ca0..f1394e4f2f 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -51,7 +51,6 @@ import bigframes.constants as constants import bigframes.core from bigframes.core import log_adapter -from bigframes.core.api_helpers import requires_strict_ordering import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.convert @@ -62,6 +61,7 @@ import bigframes.core.indexes as indexes import bigframes.core.ordering as order import bigframes.core.utils as utils +from bigframes.core.validate import requires_strict_ordering import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes @@ -2397,7 +2397,6 @@ def pivot_table( # Sort and reorder. return pivoted[pivoted.columns.sort_values()] - @requires_strict_ordering() def stack(self, level: LevelsType = -1): if not isinstance(self.columns, pandas.MultiIndex): if level not in [0, -1, self.columns.name]: diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index bae239b6da..e6e0141f13 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -47,5 +47,9 @@ class NullIndexError(ValueError): """Object has no index.""" +class OrderRequiredError(ValueError): + """""" + + class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" diff --git a/bigframes/series.py b/bigframes/series.py index f60573a6ab..47e38cb3d5 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -34,7 +34,6 @@ import bigframes.constants as constants import bigframes.core from bigframes.core import log_adapter -from bigframes.core.api_helpers import requires_strict_ordering import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.expression as ex @@ -44,6 +43,7 @@ import bigframes.core.ordering as order import bigframes.core.scalar as scalars import bigframes.core.utils as utils +from bigframes.core.validate import requires_strict_ordering import bigframes.core.window import bigframes.core.window_spec import bigframes.dataframe diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 4b7f19972b..eafab07843 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -275,7 +275,6 @@ def __init__( # at the same time in the same region self._session_id: str = "session" + secrets.token_hex(3) self._table_ids: List[str] = [] - self._strict_ordering = context._strict_ordering # store table ids and delete them when the session is closed self._objects: list[ @@ -295,9 +294,13 @@ def __init__( self._bytes_processed_sum = 0 self._slot_millis_sum = 0 self._execution_count = 0 - # Whether this session treats objects as totally ordered. - # Will expose as feature later, only False for internal testing - self._strictly_ordered = True + self._strictly_ordered: bool = context._strictly_ordered + # Sequential index needs total ordering to generate, so use null index with unstrict ordering. + self._default_index_type: bigframes.enums.DefaultIndexKind = ( + bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64 + if context._strictly_ordered + else bigframes.enums.DefaultIndexKind.NULL + ) @property def bqclient(self): diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 06ad73a702..7259e403b5 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -152,9 +152,7 @@ def unordered_session() -> Generator[bigframes.Session, None, None]: @pytest.fixture(scope="session") def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, None]: - context = bigframes.BigQueryOptions( - location=tokyo_location, - ) + context = bigframes.BigQueryOptions(location="US", _strictly_ordered=False) session = bigframes.Session(context=context) yield session session.close() # close generated session at cleanup type diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index 12c0d6e259..e5dedfc471 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd +import pytest +import bigframes.exceptions import bigframes.pandas as bpd from tests.system.utils import assert_pandas_df_equal @@ -26,3 +28,24 @@ def test_unordered_mode_cache_aggregate(unordered_session): pd_result = pd_df - pd_df.mean() assert_pandas_df_equal(bf_result, pd_result, ignore_order=True) + + +@pytest.mark.parametrize( + ("function"), + [ + pytest.param( + lambda x: x.cumsum(), + id="cumsum", + marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), + ), + pytest.param( + lambda x: x.idxmin(), + id="idxmin", + marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), + ), + ], +) +def test_unordered_mode_blocks_windowing(unordered_session, function): + pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + function(df) From 01888e5cf9a388180b308ae2dad6786e18b5d7d4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 20 Jun 2024 17:20:54 +0000 Subject: [PATCH 03/15] fix config reference and add another test --- bigframes/core/validate.py | 2 +- tests/system/small/test_unordered.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/bigframes/core/validate.py b/bigframes/core/validate.py index 4122e79991..982ffa695d 100644 --- a/bigframes/core/validate.py +++ b/bigframes/core/validate.py @@ -37,7 +37,7 @@ def decorator(meth): # TODO: Support where clause to guard certain parameterizations @functools.wraps(meth) def guarded_meth(object: HasSession, *args, **kwargs): - if not object._session._strict_ordering: + if not object._session._strictly_ordered: raise bigframes.exceptions.OrderRequiredError( f"Op {meth.__name__} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" ) diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index 6648df7960..acda1d25a2 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -76,6 +76,11 @@ def test_unordered_mode_read_gbq(unordered_session): id="idxmin", marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), ), + pytest.param( + lambda x: x.a.iloc[1::2], + id="series_iloc", + marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), + ), ], ) def test_unordered_mode_blocks_windowing(unordered_session, function): From e54cec034a6f1b3f217d20dafdd8da87e3e443ea Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 20 Jun 2024 17:49:17 +0000 Subject: [PATCH 04/15] fix broken attribute reference --- bigframes/dataframe.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3928f8a28c..17c3597528 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -349,10 +349,6 @@ def T(self) -> DataFrame: def transpose(self) -> DataFrame: return self.T - @property - def _strict_ordering(self) -> bool: - return self._session._strict_ordering - def __len__(self): rows, _ = self.shape return rows From 32bceacdbfe63205addc90b91c5dba66b7a27ae6 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 20 Jun 2024 20:21:01 +0000 Subject: [PATCH 05/15] add _session property to groupby objects --- bigframes/core/groupby/__init__.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 764841c082..15b67446d5 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -73,6 +73,10 @@ def __init__( if col_id not in self._by_col_ids ] + @property + def _session(self) -> core.Session: + return self._block.session + def __getitem__( self, key: typing.Union[ @@ -519,6 +523,10 @@ def __init__( self._value_name = value_name self._dropna = dropna # Applies to aggregations but not windowing + @property + def _session(self) -> core.Session: + return self._block.session + def head(self, n: int = 5) -> series.Series: block = self._block if self._dropna: From d1a123eaa94b5692357001433e8d723c11ef8781 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 20 Jun 2024 20:25:31 +0000 Subject: [PATCH 06/15] restore session_tokyo fixture --- tests/system/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 6419b68948..df4ff9aff0 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -149,7 +149,7 @@ def unordered_session() -> Generator[bigframes.Session, None, None]: @pytest.fixture(scope="session") def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, None]: - context = bigframes.BigQueryOptions(location="US", _strictly_ordered=False) + context = bigframes.BigQueryOptions(location=tokyo_location) session = bigframes.Session(context=context) yield session session.close() # close generated session at cleanup type From d745ca2d07e146b1249b76622ffaf76ce912b112 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 20 Jun 2024 21:17:35 +0000 Subject: [PATCH 07/15] add docstring for OrderRequiredError --- bigframes/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index e6e0141f13..bc0d83b4f6 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -48,7 +48,7 @@ class NullIndexError(ValueError): class OrderRequiredError(ValueError): - """""" + """Operation requires total row ordering to be enabled.""" class TimeTravelDisabledWarning(Warning): From cac44aa6c6493dd104c97ca9fb2b3041baa266ad Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 20 Jun 2024 21:20:14 +0000 Subject: [PATCH 08/15] add _session property to index object --- bigframes/core/indexes/base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 34a685a6de..89590d3732 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -115,6 +115,10 @@ def from_frame( index._linked_frame = frame return index + @property + def _session(self): + return self._block.session + @property def name(self) -> blocks.Label: names = self.names From bfcdeb926fe6e1dedec2e53e5880882ce2a7dd8a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 25 Jun 2024 17:18:28 +0000 Subject: [PATCH 09/15] handled methods where only some parameterizations need total order --- bigframes/core/__init__.py | 10 ++++++---- bigframes/core/indexes/base.py | 6 +++--- bigframes/core/validate.py | 15 +++++++++------ bigframes/core/window_spec.py | 5 +++++ bigframes/dataframe.py | 14 +++++++++----- bigframes/operations/aggregations.py | 10 ++++++++++ bigframes/series.py | 13 +++++++++---- tests/system/small/test_unordered.py | 22 ++++++++++++++++++++++ 8 files changed, 73 insertions(+), 22 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 2508814894..3b656fdd8c 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -344,10 +344,12 @@ def project_window_op( never_skip_nulls: will disable null skipping for operators that would otherwise do so skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ - if not self.session._strictly_ordered: - # TODO: Support unbounded windows with aggregate ops and some row-order-independent analytic ops - # TODO: Support non-deterministic windowing - raise ValueError("Windowed ops not supported in unordered mode") + # TODO: Support non-deterministic windowing + if window_spec.row_bounded or not op.order_independent: + if not not self.session._strictly_ordered: + raise ValueError( + "Order-dependent windowed ops not supported in unordered mode" + ) return ArrayValue( nodes.WindowOpNode( child=self.node, diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 89590d3732..a0e19006d0 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -30,7 +30,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as order import bigframes.core.utils as utils -from bigframes.core.validate import requires_strict_ordering +from bigframes.core.validate import enforce_ordered, requires_strict_ordering import bigframes.dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops @@ -432,9 +432,9 @@ def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index: result = block_ops.dropna(self._block, self._block.index_columns, how=how) return Index(result) - # TODO: keep="all" does not require ordering - @requires_strict_ordering() def drop_duplicates(self, *, keep: str = "first") -> Index: + if keep is not False: + enforce_ordered(self, "duplicated") block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep) return Index(block) diff --git a/bigframes/core/validate.py b/bigframes/core/validate.py index 982ffa695d..dc22047e3b 100644 --- a/bigframes/core/validate.py +++ b/bigframes/core/validate.py @@ -32,17 +32,20 @@ def _session(self) -> Session: ... -def requires_strict_ordering(where={}): +def requires_strict_ordering(): def decorator(meth): - # TODO: Support where clause to guard certain parameterizations @functools.wraps(meth) def guarded_meth(object: HasSession, *args, **kwargs): - if not object._session._strictly_ordered: - raise bigframes.exceptions.OrderRequiredError( - f"Op {meth.__name__} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" - ) + enforce_ordered(object, meth.__name__) return meth(object, *args, **kwargs) return guarded_meth return decorator + + +def enforce_ordered(object: HasSession, opname: str) -> None: + if not object._session._strictly_ordered: + raise bigframes.exceptions.OrderRequiredError( + f"Op {opname} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 71e88a4c3d..5617680919 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -152,3 +152,8 @@ class WindowSpec: ordering: Tuple[orderings.OrderingExpression, ...] = tuple() bounds: Union[RowsWindowBounds, RangeWindowBounds, None] = None min_periods: int = 0 + + @property + def row_bounded(self): + # relevant for determining if window requires total ordering for determinism. + return isinstance(self.bounds, RowsWindowBounds) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 17c3597528..7464a9019d 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -61,7 +61,7 @@ import bigframes.core.indexes as indexes import bigframes.core.ordering as order import bigframes.core.utils as utils -from bigframes.core.validate import requires_strict_ordering +from bigframes.core.validate import enforce_ordered, requires_strict_ordering import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes @@ -1304,8 +1304,6 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: ) return maybe_result.set_axis(self._block.column_labels, axis=1, copy=False) - # enforce keep == all - @requires_strict_ordering() def nlargest( self, n: int, @@ -1314,11 +1312,11 @@ def nlargest( ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nlargest(self._block, n, column_ids, keep=keep)) - # enforce keep == all - @requires_strict_ordering() def nsmallest( self, n: int, @@ -1327,6 +1325,8 @@ def nsmallest( ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nsmallest(self._block, n, column_ids, keep=keep)) @@ -3487,6 +3487,8 @@ def drop_duplicates( *, keep: str = "first", ) -> DataFrame: + if keep is not False: + enforce_ordered(self, "drop_duplicates") if subset is None: column_ids = self._block.value_columns elif utils.is_list_like(subset): @@ -3500,6 +3502,8 @@ def drop_duplicates( return DataFrame(block) def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Series: + if keep is not False: + enforce_ordered(self, "duplicated") if subset is None: column_ids = self._block.value_columns else: diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 783abfd788..f85f53c55c 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -42,6 +42,11 @@ def uses_total_row_ordering(self): def can_order_by(self): return False + @property + def order_independent(self): + """Whether the output of the operator depends on the ordering of input rows. Navigation functions are a notable case.""" + return False + @abc.abstractmethod def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: ... @@ -78,6 +83,11 @@ def name(self) -> str: def arguments(self) -> int: ... + @property + def order_independent(self): + # Almost all aggregation functions are order independent, excepting array and string agg + return not self.can_order_by + @dataclasses.dataclass(frozen=True) class NullaryAggregateOp(AggregateOp, NullaryWindowOp): diff --git a/bigframes/series.py b/bigframes/series.py index b6660772d7..9d03437889 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -43,7 +43,7 @@ import bigframes.core.ordering as order import bigframes.core.scalar as scalars import bigframes.core.utils as utils -from bigframes.core.validate import requires_strict_ordering +from bigframes.core.validate import enforce_ordered, requires_strict_ordering import bigframes.core.window import bigframes.core.window_spec import bigframes.dataframe @@ -642,18 +642,20 @@ def head(self, n: int = 5) -> Series: def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) - @requires_strict_ordering(where={"keep": ["first", "last"]}) def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + enforce_ordered(self, "nlargest") return Series( block_ops.nlargest(self._block, n, [self._value_column], keep=keep) ) - @requires_strict_ordering(where={"keep": ["first", "last"]}) def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + enforce_ordered(self, "nsmallest") return Series( block_ops.nsmallest(self._block, n, [self._value_column], keep=keep) ) @@ -1571,6 +1573,8 @@ def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None) return self.reindex(other.index, validate=validate) def drop_duplicates(self, *, keep: str = "first") -> Series: + if keep is not False: + enforce_ordered(self, "drop_duplicates") block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) @@ -1578,8 +1582,9 @@ def drop_duplicates(self, *, keep: str = "first") -> Series: def unique(self) -> Series: return self.drop_duplicates() - @requires_strict_ordering(where={"keep": ["first", "last"]}) def duplicated(self, keep: str = "first") -> Series: + if keep is not False: + enforce_ordered(self, "duplicated") block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep ) diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index acda1d25a2..aaaeb12bdd 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -63,6 +63,28 @@ def test_unordered_mode_read_gbq(unordered_session): assert_pandas_df_equal(df.to_pandas(), expected) +@pytest.mark.parametrize( + ("keep"), + [ + pytest.param( + "first", + marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), + ), + pytest.param( + False, + ), + ], +) +def test_unordered_drop_duplicates(unordered_session, keep): + pd_df = pd.DataFrame({"a": [1, 1, 3], "b": [4, 4, 6]}, dtype=pd.Int64Dtype()) + bf_df = bpd.DataFrame(pd_df, session=unordered_session) + + bf_result = bf_df.drop_duplicates(keep=keep) + pd_result = pd_df.drop_duplicates(keep=keep) + + assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True) + + @pytest.mark.parametrize( ("function"), [ From c37c6835b63ca30ec3cc7396cce02bb103c884e1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 25 Jun 2024 17:58:01 +0000 Subject: [PATCH 10/15] fix inverted validation --- bigframes/core/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 463f54f725..00a36b9c05 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -346,7 +346,7 @@ def project_window_op( """ # TODO: Support non-deterministic windowing if window_spec.row_bounded or not op.order_independent: - if not not self.session._strictly_ordered: + if not self.session._strictly_ordered: raise ValueError( "Order-dependent windowed ops not supported in unordered mode" ) From d933bda48488c544400b8f8f9855403495fe8d8f Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 26 Jun 2024 16:47:37 +0000 Subject: [PATCH 11/15] import validations module and not functions --- bigframes/core/groupby/__init__.py | 36 ++++++++--------- bigframes/core/indexes/base.py | 12 +++--- bigframes/dataframe.py | 62 +++++++++++++++--------------- bigframes/series.py | 62 +++++++++++++++--------------- 4 files changed, 86 insertions(+), 86 deletions(-) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 15b67446d5..b58a49e66b 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -27,7 +27,7 @@ import bigframes.core.blocks as blocks import bigframes.core.ordering as order import bigframes.core.utils as utils -from bigframes.core.validate import requires_strict_ordering +import bigframes.core.validate as validations import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df @@ -234,25 +234,25 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("cumsum") return self._apply_window_op(agg_ops.sum_op, numeric_only=True) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumprod(self, *args, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.product_op, numeric_only=True) - @requires_strict_ordering() + @validations.requires_strict_ordering() def shift(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -261,7 +261,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -270,7 +270,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -286,7 +286,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), @@ -644,31 +644,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: aggregate = agg - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumsum(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.sum_op, ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumprod(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.product_op, ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummax(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.max_op, ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummin(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.min_op, ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumcount(self, *args, **kwargs) -> series.Series: return ( self._apply_window_op( @@ -678,7 +678,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: - 1 ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = window_specs.rows( @@ -688,7 +688,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -697,7 +697,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -717,7 +717,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: is_series=True, ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index a0e19006d0..72833b5cd5 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -30,7 +30,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as order import bigframes.core.utils as utils -from bigframes.core.validate import enforce_ordered, requires_strict_ordering +import bigframes.core.validate as validations import bigframes.dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops @@ -184,7 +184,7 @@ def empty(self) -> bool: return self.shape[0] == 0 @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def is_monotonic_increasing(self) -> bool: """ Return a boolean if the values are equal or increasing. @@ -198,7 +198,7 @@ def is_monotonic_increasing(self) -> bool: ) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: """ Return a boolean if the values are equal or decreasing. @@ -348,7 +348,7 @@ def max(self) -> typing.Any: def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op) - @requires_strict_ordering() + @validations.requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -361,7 +361,7 @@ def argmax(self) -> int: return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0]) - @requires_strict_ordering() + @validations.requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -434,7 +434,7 @@ def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index: def drop_duplicates(self, *, keep: str = "first") -> Index: if keep is not False: - enforce_ordered(self, "duplicated") + validations.enforce_ordered(self, "drop_duplicates") block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep) return Index(block) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 7464a9019d..6c8df92637 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -61,7 +61,7 @@ import bigframes.core.indexes as indexes import bigframes.core.ordering as order import bigframes.core.utils as utils -from bigframes.core.validate import enforce_ordered, requires_strict_ordering +import bigframes.core.validate as validations import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes @@ -278,12 +278,12 @@ def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @@ -340,12 +340,12 @@ def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) @requires_index - @requires_strict_ordering() + @validations.requires_strict_ordering() def transpose(self) -> DataFrame: return self.T @@ -1269,7 +1269,7 @@ def copy(self) -> DataFrame: def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n]) - @requires_strict_ordering() + @validations.requires_strict_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) @@ -1313,7 +1313,7 @@ def nlargest( if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": - enforce_ordered(self, "nlargest") + validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nlargest(self._block, n, column_ids, keep=keep)) @@ -1326,7 +1326,7 @@ def nsmallest( if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": - enforce_ordered(self, "nlargest") + validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nsmallest(self._block, n, column_ids, keep=keep)) @@ -1509,7 +1509,7 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) - @requires_strict_ordering() + @validations.requires_strict_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): @@ -1907,7 +1907,7 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) - @requires_strict_ordering() + @validations.requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": @@ -1933,12 +1933,12 @@ def replace( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -2204,16 +2204,16 @@ def agg( aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) @requires_index - @requires_strict_ordering() + @validations.requires_strict_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) @requires_index - @requires_strict_ordering() + @validations.requires_strict_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) - @requires_strict_ordering() + @validations.requires_strict_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, @@ -2318,7 +2318,7 @@ def _pivot( return DataFrame(pivot_block) @requires_index - @requires_strict_ordering() + @validations.requires_strict_ordering() def pivot( self, *, @@ -2333,7 +2333,7 @@ def pivot( return self._pivot(columns=columns, index=index, values=values) @requires_index - @requires_strict_ordering() + @validations.requires_strict_ordering() def pivot_table( self, values: typing.Optional[ @@ -2433,7 +2433,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) @requires_index - @requires_strict_ordering() + @validations.requires_strict_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2644,7 +2644,7 @@ def _perform_join_by_index( block, _ = self._block.join(other._block, how=how, block_identity_join=True) return DataFrame(block) - @requires_strict_ordering() + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_def = window_spec.rows( @@ -2654,7 +2654,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_def, self._block.value_columns ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = window_spec.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( @@ -2757,7 +2757,7 @@ def notna(self) -> DataFrame: notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2770,7 +2770,7 @@ def cumsum(self): window_spec.cumulative_rows(), ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2783,21 +2783,21 @@ def cumprod(self) -> DataFrame: window_spec.cumulative_rows(), ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, window_spec.cumulative_rows(), ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, window_spec.cumulative_rows(), ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def shift(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2805,7 +2805,7 @@ def shift(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def diff(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2813,7 +2813,7 @@ def diff(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() @@ -2831,7 +2831,7 @@ def _apply_window_op( ) return DataFrame(block.select_columns(result_ids)) - @requires_strict_ordering() + @validations.requires_strict_ordering() def sample( self, n: Optional[int] = None, @@ -3488,7 +3488,7 @@ def drop_duplicates( keep: str = "first", ) -> DataFrame: if keep is not False: - enforce_ordered(self, "drop_duplicates") + validations.enforce_ordered(self, "drop_duplicates") if subset is None: column_ids = self._block.value_columns elif utils.is_list_like(subset): @@ -3503,7 +3503,7 @@ def drop_duplicates( def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Series: if keep is not False: - enforce_ordered(self, "duplicated") + validations.enforce_ordered(self, "duplicated") if subset is None: column_ids = self._block.value_columns else: @@ -3597,7 +3597,7 @@ def _optimize_query_complexity(self): _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") - @requires_strict_ordering() + @validations.requires_strict_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError( diff --git a/bigframes/series.py b/bigframes/series.py index 9d03437889..f800161d6a 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -43,7 +43,7 @@ import bigframes.core.ordering as order import bigframes.core.scalar as scalars import bigframes.core.utils as utils -from bigframes.core.validate import enforce_ordered, requires_strict_ordering +import bigframes.core.validate as validations import bigframes.core.window import bigframes.core.window_spec import bigframes.dataframe @@ -93,12 +93,12 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @@ -163,7 +163,7 @@ def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def T(self) -> Series: return self.transpose() @@ -175,7 +175,7 @@ def _info_axis(self) -> indexes.Index: def _session(self) -> bigframes.Session: return self._get_block().expr.session - @requires_strict_ordering() + @validations.requires_strict_ordering() def transpose(self) -> Series: return self @@ -271,7 +271,7 @@ def equals( return False return block_ops.equals(self._block, other._block) - @requires_strict_ordering() + @validations.requires_strict_ordering() def reset_index( self, *, @@ -460,13 +460,13 @@ def case_when(self, caselist) -> Series: ignore_self=True, ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.cumulative_rows() ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -474,30 +474,30 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill) - @requires_strict_ordering() + @validations.requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummax(self) -> Series: return self._apply_window_op( agg_ops.max_op, bigframes.core.window_spec.cumulative_rows() ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cummin(self) -> Series: return self._apply_window_op( agg_ops.min_op, bigframes.core.window_spec.cumulative_rows() ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def cumprod(self) -> Series: return self._apply_window_op( agg_ops.product_op, bigframes.core.window_spec.cumulative_rows() ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def shift(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -505,7 +505,7 @@ def shift(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def diff(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -513,13 +513,13 @@ def diff(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) - @requires_strict_ordering() + @validations.requires_strict_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods)) - @requires_strict_ordering() + @validations.requires_strict_ordering() def rank( self, axis=0, @@ -611,7 +611,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) - @requires_strict_ordering() + @validations.requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -634,11 +634,11 @@ def dropna( result = result.reset_index() return Series(result) - @requires_strict_ordering() + @validations.requires_strict_ordering() def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n]) - @requires_strict_ordering() + @validations.requires_strict_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) @@ -646,7 +646,7 @@ def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": - enforce_ordered(self, "nlargest") + validations.enforce_ordered(self, "nlargest") return Series( block_ops.nlargest(self._block, n, [self._value_column], keep=keep) ) @@ -655,7 +655,7 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": - enforce_ordered(self, "nsmallest") + validations.enforce_ordered(self, "nsmallest") return Series( block_ops.nsmallest(self._block, n, [self._value_column], keep=keep) ) @@ -1106,7 +1106,7 @@ def clip(self, lower, upper): ) return Series(block.select_column(result_id).with_column_labels([self.name])) - @requires_strict_ordering() + @validations.requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1119,7 +1119,7 @@ def argmax(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1185,14 +1185,14 @@ def idxmin(self) -> blocks.Label: return indexes.Index(block).to_pandas()[0] @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property - @requires_strict_ordering() + @validations.requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) @@ -1298,7 +1298,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: block = block.order_by(ordering) return Series(block) - @requires_strict_ordering() + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = bigframes.core.window_spec.rows( @@ -1308,7 +1308,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_spec, self._block.value_columns, is_series=True ) - @requires_strict_ordering() + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = bigframes.core.window_spec.cumulative_rows( min_periods=min_periods @@ -1574,17 +1574,17 @@ def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None) def drop_duplicates(self, *, keep: str = "first") -> Series: if keep is not False: - enforce_ordered(self, "drop_duplicates") + validations.enforce_ordered(self, "drop_duplicates") block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) - @requires_strict_ordering() + @validations.requires_strict_ordering() def unique(self) -> Series: return self.drop_duplicates() def duplicated(self, keep: str = "first") -> Series: if keep is not False: - enforce_ordered(self, "duplicated") + validations.enforce_ordered(self, "duplicated") block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep ) @@ -1750,7 +1750,7 @@ def map( result_df = self_df.join(map_df, on="series") return result_df[self.name] - @requires_strict_ordering() + @validations.requires_strict_ordering() def sample( self, n: Optional[int] = None, From c09f5f6234aeabc06d94d31ec1d2ea2f2bba8944 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 26 Jun 2024 16:56:50 +0000 Subject: [PATCH 12/15] mark some analytic ops as order independent --- bigframes/operations/aggregations.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index f85f53c55c..52bbb80d12 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -44,7 +44,11 @@ def can_order_by(self): @property def order_independent(self): - """Whether the output of the operator depends on the ordering of input rows. Navigation functions are a notable case.""" + """ + True if the output of the operator does not depend on the ordering of input rows. + + Navigation functions are a notable case that are not order independent. + """ return False @abc.abstractmethod @@ -85,7 +89,11 @@ def arguments(self) -> int: @property def order_independent(self): - # Almost all aggregation functions are order independent, excepting array and string agg + """ + True if results don't depend on the order of the input. + + Almost all aggregation functions are order independent, excepting ``array_agg`` and ``string_agg``. + """ return not self.can_order_by @@ -304,6 +312,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ) return pd.ArrowDtype(pa_type) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class QcutOp(UnaryWindowOp): @@ -322,6 +334,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class NuniqueOp(UnaryAggregateOp): @@ -359,6 +375,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class DenseRankOp(UnaryWindowOp): @@ -371,6 +391,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class FirstOp(UnaryWindowOp): From c999a145658cc4e1b9a972f063f4713361940b88 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 26 Jun 2024 17:13:52 +0000 Subject: [PATCH 13/15] rename validate.py to validations.py --- bigframes/core/groupby/__init__.py | 2 +- bigframes/core/indexes/base.py | 2 +- bigframes/core/{validate.py => validations.py} | 0 bigframes/dataframe.py | 2 +- bigframes/series.py | 2 +- tests/system/small/test_unordered.py | 9 +++++---- 6 files changed, 9 insertions(+), 8 deletions(-) rename bigframes/core/{validate.py => validations.py} (100%) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index b58a49e66b..06ca4f9c7c 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -27,7 +27,7 @@ import bigframes.core.blocks as blocks import bigframes.core.ordering as order import bigframes.core.utils as utils -import bigframes.core.validate as validations +import bigframes.core.validations as validations import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 72833b5cd5..696742180b 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -30,7 +30,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as order import bigframes.core.utils as utils -import bigframes.core.validate as validations +import bigframes.core.validations as validations import bigframes.dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops diff --git a/bigframes/core/validate.py b/bigframes/core/validations.py similarity index 100% rename from bigframes/core/validate.py rename to bigframes/core/validations.py diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 6c8df92637..68a66d5673 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -61,7 +61,7 @@ import bigframes.core.indexes as indexes import bigframes.core.ordering as order import bigframes.core.utils as utils -import bigframes.core.validate as validations +import bigframes.core.validations as validations import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes diff --git a/bigframes/series.py b/bigframes/series.py index f800161d6a..ca4b071514 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -43,7 +43,7 @@ import bigframes.core.ordering as order import bigframes.core.scalar as scalars import bigframes.core.utils as utils -import bigframes.core.validate as validations +import bigframes.core.validations as validations import bigframes.core.window import bigframes.core.window_spec import bigframes.dataframe diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index aaaeb12bdd..36bf2a2585 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -91,21 +91,22 @@ def test_unordered_drop_duplicates(unordered_session, keep): pytest.param( lambda x: x.cumsum(), id="cumsum", - marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), ), pytest.param( lambda x: x.idxmin(), id="idxmin", - marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), ), pytest.param( lambda x: x.a.iloc[1::2], id="series_iloc", - marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), ), ], ) def test_unordered_mode_blocks_windowing(unordered_session, function): pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) df = bpd.DataFrame(pd_df, session=unordered_session) - function(df) + with pytest.raises( + bigframes.exceptions.OrderRequiredError, + match=r"Op.*not supported when strict ordering is disabled", + ): + function(df) From 50895de67be6a43746db044229eacf226c75da19 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 26 Jun 2024 17:15:57 +0000 Subject: [PATCH 14/15] docstring for WindowSpec.row_bounded --- bigframes/core/window_spec.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 5617680919..57c57b451a 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -155,5 +155,10 @@ class WindowSpec: @property def row_bounded(self): - # relevant for determining if window requires total ordering for determinism. + """ + Whether the window is bounded by row offsets. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ return isinstance(self.bounds, RowsWindowBounds) From 57260199832280a4e452b9f2ddb5ce5db0de17df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Fri, 28 Jun 2024 09:50:45 -0500 Subject: [PATCH 15/15] Apply suggestions from code review --- bigframes/dataframe.py | 4 ++-- bigframes/series.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 68a66d5673..561141fdee 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3488,7 +3488,7 @@ def drop_duplicates( keep: str = "first", ) -> DataFrame: if keep is not False: - validations.enforce_ordered(self, "drop_duplicates") + validations.enforce_ordered(self, "drop_duplicates(keep != False)") if subset is None: column_ids = self._block.value_columns elif utils.is_list_like(subset): @@ -3503,7 +3503,7 @@ def drop_duplicates( def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Series: if keep is not False: - validations.enforce_ordered(self, "duplicated") + validations.enforce_ordered(self, "duplicated(keep != False)") if subset is None: column_ids = self._block.value_columns else: diff --git a/bigframes/series.py b/bigframes/series.py index 9e50b92cb6..c325783e96 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -680,7 +680,7 @@ def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": - validations.enforce_ordered(self, "nlargest") + validations.enforce_ordered(self, "nlargest(keep != 'all')") return Series( block_ops.nlargest(self._block, n, [self._value_column], keep=keep) ) @@ -689,7 +689,7 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") if keep != "all": - validations.enforce_ordered(self, "nsmallest") + validations.enforce_ordered(self, "nsmallest(keep != 'all')") return Series( block_ops.nsmallest(self._block, n, [self._value_column], keep=keep) ) @@ -1609,7 +1609,7 @@ def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None) def drop_duplicates(self, *, keep: str = "first") -> Series: if keep is not False: - validations.enforce_ordered(self, "drop_duplicates") + validations.enforce_ordered(self, "drop_duplicates(keep != False)") block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) @@ -1619,7 +1619,7 @@ def unique(self) -> Series: def duplicated(self, keep: str = "first") -> Series: if keep is not False: - validations.enforce_ordered(self, "duplicated") + validations.enforce_ordered(self, "duplicated(keep != False)") block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep )