Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions bigframes/core/agg_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import functools
import itertools
import typing
from typing import Callable, Mapping, TypeVar
from typing import Callable, Mapping, Tuple, TypeVar

from bigframes import dtypes
from bigframes.core import expression, window_spec
Expand Down Expand Up @@ -63,6 +63,10 @@ def inputs(
) -> typing.Tuple[expression.Expression, ...]:
...

@property
def children(self) -> Tuple[expression.Expression, ...]:
return self.inputs

@property
def free_variables(self) -> typing.Tuple[str, ...]:
return tuple(
Expand All @@ -73,6 +77,10 @@ def free_variables(self) -> typing.Tuple[str, ...]:
def is_const(self) -> bool:
return all(child.is_const for child in self.inputs)

@functools.cached_property
def is_scalar_expr(self) -> bool:
return False

@abc.abstractmethod
def replace_args(self: TExpression, *arg) -> TExpression:
...
Expand Down Expand Up @@ -176,8 +184,13 @@ def output_type(self) -> dtypes.ExpressionType:
def inputs(
self,
) -> typing.Tuple[expression.Expression, ...]:
# TODO: Maybe make the window spec itself an expression?
return (self.analytic_expr, *self.window.expressions)

@property
def children(self) -> Tuple[expression.Expression, ...]:
return self.inputs

@property
def free_variables(self) -> typing.Tuple[str, ...]:
return tuple(
Expand All @@ -188,12 +201,16 @@ def free_variables(self) -> typing.Tuple[str, ...]:
def is_const(self) -> bool:
return all(child.is_const for child in self.inputs)

@functools.cached_property
def is_scalar_expr(self) -> bool:
return False

def transform_children(
self: WindowExpression,
t: Callable[[expression.Expression], expression.Expression],
) -> WindowExpression:
return WindowExpression(
self.analytic_expr.transform_children(t),
t(self.analytic_expr), # type: ignore
self.window.transform_exprs(t),
)

Expand Down
28 changes: 25 additions & 3 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@
from dataclasses import dataclass
import datetime
import functools
import itertools
import typing
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple

import google.cloud.bigquery
import pandas
import pyarrow as pa

from bigframes.core import agg_expressions, bq_data
from bigframes.core import (
agg_expressions,
bq_data,
expression_factoring,
join_def,
local_data,
)
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.identifiers as ids
import bigframes.core.join_def as join_def
import bigframes.core.local_data as local_data
import bigframes.core.nodes as nodes
from bigframes.core.ordering import OrderingExpression
import bigframes.core.ordering as orderings
Expand Down Expand Up @@ -261,6 +266,23 @@ def compute_values(self, assignments: Sequence[ex.Expression]):
col_ids,
)

def compute_general_expression(self, assignments: Sequence[ex.Expression]):
named_exprs = [
expression_factoring.NamedExpression(expr, ids.ColumnId.unique())
for expr in assignments
]
# TODO: Push this to rewrite later to go from block expression to planning form
# TODO: Jointly fragmentize expressions to more efficiently reuse common sub-expressions
fragments = tuple(
itertools.chain.from_iterable(
expression_factoring.fragmentize_expression(expr)
for expr in named_exprs
)
)
target_ids = tuple(named_expr.name for named_expr in named_exprs)
new_root = expression_factoring.push_into_tree(self.node, fragments, target_ids)
return (ArrayValue(new_root), target_ids)

def project_to_id(self, expression: ex.Expression):
array_val, ids = self.compute_values(
[expression],
Expand Down
153 changes: 65 additions & 88 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,18 @@ def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
window_spec = windows.unbound()

original_columns = block.value_columns
block, shift_columns = block.multi_apply_window_op(
original_columns, agg_ops.ShiftOp(periods), window_spec=window_spec
)
exprs = []
for original_col, shifted_col in zip(original_columns, shift_columns):
change_expr = ops.sub_op.as_expr(original_col, shifted_col)
pct_change_expr = ops.div_op.as_expr(change_expr, shifted_col)
for original_col in original_columns:
shift_expr = agg_expressions.WindowExpression(
agg_expressions.UnaryAggregation(
agg_ops.ShiftOp(periods), ex.deref(original_col)
),
window_spec,
)
change_expr = ops.sub_op.as_expr(original_col, shift_expr)
pct_change_expr = ops.div_op.as_expr(change_expr, shift_expr)
exprs.append(pct_change_expr)
return block.project_exprs(exprs, labels=column_labels, drop=True)
return block.project_block_exprs(exprs, labels=column_labels, drop=True)


def rank(
Expand All @@ -428,16 +431,11 @@ def rank(

columns = columns or tuple(col for col in block.value_columns)
labels = [block.col_id_to_label[id] for id in columns]
# Step 1: Calculate row numbers for each row
# Identify null values to be treated according to na_option param
rownum_col_ids = []
nullity_col_ids = []

result_exprs = []
for col in columns:
block, nullity_col_id = block.apply_unary_op(
col,
ops.isnull_op,
)
nullity_col_ids.append(nullity_col_id)
# Step 1: Calculate row numbers for each row
# Identify null values to be treated according to na_option param
window_ordering = (
ordering.OrderingExpression(
ex.deref(col),
Expand All @@ -448,87 +446,66 @@ def rank(
),
)
# Count_op ignores nulls, so if na_option is "top" or "bottom", we instead count the nullity columns, where nulls have been mapped to bools
block, rownum_id = block.apply_window_op(
col if na_option == "keep" else nullity_col_id,
agg_ops.dense_rank_op if method == "dense" else agg_ops.count_op,
window_spec=windows.unbound(
grouping_keys=grouping_cols, ordering=window_ordering
)
target_expr = (
ex.deref(col) if na_option == "keep" else ops.isnull_op.as_expr(col)
)
window_op = agg_ops.dense_rank_op if method == "dense" else agg_ops.count_op
window_spec = (
windows.unbound(grouping_keys=grouping_cols, ordering=window_ordering)
if method == "dense"
else windows.rows(
end=0, ordering=window_ordering, grouping_keys=grouping_cols
),
skip_reproject_unsafe=(col != columns[-1]),
)
)
result_expr: ex.Expression = agg_expressions.WindowExpression(
agg_expressions.UnaryAggregation(window_op, target_expr), window_spec
)
if pct:
block, max_id = block.apply_window_op(
rownum_id, agg_ops.max_op, windows.unbound(grouping_keys=grouping_cols)
result_expr = ops.div_op.as_expr(
result_expr,
agg_expressions.WindowExpression(
agg_expressions.UnaryAggregation(agg_ops.max_op, result_expr),
windows.unbound(grouping_keys=grouping_cols),
),
)
block, rownum_id = block.project_expr(ops.div_op.as_expr(rownum_id, max_id))

rownum_col_ids.append(rownum_id)

# Step 2: Apply aggregate to groups of like input values.
# This step is skipped for method=='first' or 'dense'
if method in ["average", "min", "max"]:
agg_op = {
"average": agg_ops.mean_op,
"min": agg_ops.min_op,
"max": agg_ops.max_op,
}[method]
post_agg_rownum_col_ids = []
for i in range(len(columns)):
block, result_id = block.apply_window_op(
rownum_col_ids[i],
agg_op,
window_spec=windows.unbound(grouping_keys=(columns[i], *grouping_cols)),
skip_reproject_unsafe=(i < (len(columns) - 1)),
# Step 2: Apply aggregate to groups of like input values.
# This step is skipped for method=='first' or 'dense'
if method in ["average", "min", "max"]:
agg_op = {
"average": agg_ops.mean_op,
"min": agg_ops.min_op,
"max": agg_ops.max_op,
}[method]
result_expr = agg_expressions.WindowExpression(
agg_expressions.UnaryAggregation(agg_op, result_expr),
windows.unbound(grouping_keys=(col, *grouping_cols)),
)
post_agg_rownum_col_ids.append(result_id)
rownum_col_ids = post_agg_rownum_col_ids

# Pandas masks all values where any grouping column is null
# Note: we use pd.NA instead of float('nan')
if grouping_cols:
predicate = functools.reduce(
ops.and_op.as_expr,
[ops.notnull_op.as_expr(column_id) for column_id in grouping_cols],
)
block = block.project_exprs(
[
ops.where_op.as_expr(
ex.deref(col),
predicate,
ex.const(None),
)
for col in rownum_col_ids
],
labels=labels,
)
rownum_col_ids = list(block.value_columns[-len(rownum_col_ids) :])

# Step 3: post processing: mask null values and cast to float
if method in ["min", "max", "first", "dense"]:
# Pandas rank always produces Float64, so must cast for aggregation types that produce ints
return (
block.select_columns(rownum_col_ids)
.multi_apply_unary_op(ops.AsTypeOp(pd.Float64Dtype()))
.with_column_labels(labels)
)
if na_option == "keep":
# For na_option "keep", null inputs must produce null outputs
exprs = []
for i in range(len(columns)):
exprs.append(
ops.where_op.as_expr(
ex.const(pd.NA, dtype=pd.Float64Dtype()),
nullity_col_ids[i],
rownum_col_ids[i],
)
# Pandas masks all values where any grouping column is null
# Note: we use pd.NA instead of float('nan')
if grouping_cols:
predicate = functools.reduce(
ops.and_op.as_expr,
[ops.notnull_op.as_expr(column_id) for column_id in grouping_cols],
)
result_expr = ops.where_op.as_expr(
result_expr,
predicate,
ex.const(None),
)
return block.project_exprs(exprs, labels=labels, drop=True)

return block.select_columns(rownum_col_ids).with_column_labels(labels)
# Step 3: post processing: mask null values and cast to float
if method in ["min", "max", "first", "dense"]:
# Pandas rank always produces Float64, so must cast for aggregation types that produce ints
result_expr = ops.AsTypeOp(pd.Float64Dtype()).as_expr(result_expr)
elif na_option == "keep":
# For na_option "keep", null inputs must produce null outputs
result_expr = ops.where_op.as_expr(
ex.const(pd.NA, dtype=pd.Float64Dtype()),
ops.isnull_op.as_expr(col),
result_expr,
)
result_exprs.append(result_expr)
return block.project_block_exprs(result_exprs, labels=labels, drop=True)


def dropna(
Expand Down
21 changes: 21 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,27 @@ def project_exprs(
index_labels=self._index_labels,
)

# This is a new experimental version of the project_exprs that supports mixing analytic and scalar expressions
def project_block_exprs(
self,
exprs: Sequence[ex.Expression],
labels: Union[Sequence[Label], pd.Index],
drop=False,
) -> Block:
new_array, _ = self.expr.compute_general_expression(exprs)
if drop:
new_array = new_array.drop_columns(self.value_columns)

new_array.node.validate_tree()
return Block(
new_array,
index_columns=self.index_columns,
column_labels=labels
if drop
else self.column_labels.append(pd.Index(labels)),
index_labels=self._index_labels,
)

def apply_window_op(
self,
column: str,
Expand Down
Loading