Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion bigframes/core/agg_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import Callable, Mapping, TypeVar

from bigframes import dtypes
from bigframes.core import expression
from bigframes.core import expression, window_spec
import bigframes.core.identifiers as ids
import bigframes.operations.aggregations as agg_ops

Expand Down Expand Up @@ -149,3 +149,68 @@ def replace_args(
self, larg: expression.Expression, rarg: expression.Expression
) -> BinaryAggregation:
return BinaryAggregation(self.op, larg, rarg)


@dataclasses.dataclass(frozen=True)
class WindowExpression(expression.Expression):
analytic_expr: Aggregation
window: window_spec.WindowSpec

@property
def column_references(self) -> typing.Tuple[ids.ColumnId, ...]:
return tuple(
itertools.chain.from_iterable(
map(lambda x: x.column_references, self.inputs)
)
)

@functools.cached_property
def is_resolved(self) -> bool:
return all(input.is_resolved for input in self.inputs)

@property
def output_type(self) -> dtypes.ExpressionType:
return self.analytic_expr.output_type

@property
def inputs(
self,
) -> typing.Tuple[expression.Expression, ...]:
return (self.analytic_expr, *self.window.expressions)

@property
def free_variables(self) -> typing.Tuple[str, ...]:
return tuple(
itertools.chain.from_iterable(map(lambda x: x.free_variables, self.inputs))
)

@property
def is_const(self) -> bool:
return all(child.is_const for child in self.inputs)

def transform_children(
self: WindowExpression,
t: Callable[[expression.Expression], expression.Expression],
) -> WindowExpression:
return WindowExpression(
self.analytic_expr.transform_children(t),
self.window.transform_exprs(t),
)

def bind_variables(
self: WindowExpression,
bindings: Mapping[str, expression.Expression],
allow_partial_bindings: bool = False,
) -> WindowExpression:
return self.transform_children(
lambda x: x.bind_variables(bindings, allow_partial_bindings)
)

def bind_refs(
self: WindowExpression,
bindings: Mapping[ids.ColumnId, expression.Expression],
allow_partial_bindings: bool = False,
) -> WindowExpression:
return self.transform_children(
lambda x: x.bind_refs(bindings, allow_partial_bindings)
)
2 changes: 1 addition & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ def apply_analytic(
block = self
if skip_null_groups:
for key in window.grouping_keys:
block = block.filter(ops.notnull_op.as_expr(key.id.name))
block = block.filter(ops.notnull_op.as_expr(key))
expr, result_id = block._expr.project_window_expr(
agg_expr,
window,
Expand Down
Loading