diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index d4732cc1f604a..d7eedf7f18ad8 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -47,14 +47,14 @@ mod window_agg_exec; pub use bounded_window_agg_exec::BoundedWindowAggExec; pub use bounded_window_agg_exec::PartitionSearchMode; use datafusion_common::utils::longest_consecutive_prefix; +use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs}; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; use datafusion_physical_expr::{ - normalize_expr_with_equivalence_properties, OrderedColumn, - OrderingEquivalenceProperties, PhysicalSortRequirement, + OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement, }; pub use window_agg_exec::WindowAggExec; @@ -254,30 +254,10 @@ pub(crate) fn window_ordering_equivalence( ) -> OrderingEquivalenceProperties { // We need to update the schema, so we can not directly use // `input.ordering_equivalence_properties()`. - let mut result = OrderingEquivalenceProperties::new(schema.clone()); - result.extend( - input - .ordering_equivalence_properties() - .classes() - .iter() - .cloned(), - ); - let mut normalized_out_ordering = vec![]; - for item in input.output_ordering().unwrap_or(&[]) { - // To account for ordering equivalences, first normalize the expression: - let normalized = normalize_expr_with_equivalence_properties( - item.expr.clone(), - input.equivalence_properties().classes(), - ); - // Currently we only support, ordering equivalences for `Column` expressions. - // TODO: Add support for ordering equivalence for all `PhysicalExpr`s - if let Some(column) = normalized.as_any().downcast_ref::() { - normalized_out_ordering - .push(OrderedColumn::new(column.clone(), item.options)); - } else { - break; - } - } + let mut builder = OrderingEquivalenceBuilder::new(schema.clone()) + .with_equivalences(input.equivalence_properties()) + .with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec())) + .extend(input.ordering_equivalence_properties()); for expr in window_expr { if let Some(builtin_window_expr) = expr.as_any().downcast_ref::() @@ -289,25 +269,21 @@ pub(crate) fn window_ordering_equivalence( .as_any() .is::() { - // If there is an existing ordering, add new ordering as an equivalence: - if !normalized_out_ordering.is_empty() { - if let Some((idx, field)) = - schema.column_with_name(expr.field().unwrap().name()) - { - let column = Column::new(field.name(), idx); - let options = SortOptions { - descending: false, - nulls_first: false, - }; // ASC, NULLS LAST - let rhs = OrderedColumn::new(column, options); - result - .add_equal_conditions((&normalized_out_ordering, &vec![rhs])); - } + if let Some((idx, field)) = + schema.column_with_name(expr.field().unwrap().name()) + { + let column = Column::new(field.name(), idx); + let options = SortOptions { + descending: false, + nulls_first: false, + }; // ASC, NULLS LAST + let rhs = OrderedColumn::new(column, options); + builder.add_equal_conditions(vec![rhs]); } } } } - result + builder.build() } #[cfg(test)] mod tests { diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index a1f2df9208f07..659d159c62145 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -16,7 +16,9 @@ // under the License. use crate::expressions::Column; -use crate::{PhysicalSortExpr, PhysicalSortRequirement}; +use crate::{ + normalize_expr_with_equivalence_properties, PhysicalSortExpr, PhysicalSortRequirement, +}; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; @@ -259,6 +261,80 @@ impl OrderingEquivalentClass { } } +/// This is a builder object facilitating incremental construction +/// for ordering equivalences. +pub struct OrderingEquivalenceBuilder { + eq_properties: EquivalenceProperties, + ordering_eq_properties: OrderingEquivalenceProperties, + existing_ordering: Vec, +} + +impl OrderingEquivalenceBuilder { + pub fn new(schema: SchemaRef) -> Self { + let eq_properties = EquivalenceProperties::new(schema.clone()); + let ordering_eq_properties = OrderingEquivalenceProperties::new(schema); + Self { + eq_properties, + ordering_eq_properties, + existing_ordering: vec![], + } + } + + pub fn extend( + mut self, + new_ordering_eq_properties: OrderingEquivalenceProperties, + ) -> Self { + self.ordering_eq_properties + .extend(new_ordering_eq_properties.classes().iter().cloned()); + self + } + + pub fn with_existing_ordering( + mut self, + existing_ordering: Option>, + ) -> Self { + if let Some(existing_ordering) = existing_ordering { + self.existing_ordering = existing_ordering; + } + self + } + + pub fn with_equivalences(mut self, new_eq_properties: EquivalenceProperties) -> Self { + self.eq_properties = new_eq_properties; + self + } + + pub fn add_equal_conditions(&mut self, new_equivalent_ordering: Vec) { + let mut normalized_out_ordering = vec![]; + for item in &self.existing_ordering { + // To account for ordering equivalences, first normalize the expression: + let normalized = normalize_expr_with_equivalence_properties( + item.expr.clone(), + self.eq_properties.classes(), + ); + // Currently we only support ordering equivalences for `Column` expressions. + // TODO: Add support for ordering equivalence for all `PhysicalExpr`s. + if let Some(column) = normalized.as_any().downcast_ref::() { + normalized_out_ordering + .push(OrderedColumn::new(column.clone(), item.options)); + } else { + break; + } + } + // If there is an existing ordering, add new ordering as an equivalence: + if !normalized_out_ordering.is_empty() { + self.ordering_eq_properties.add_equal_conditions(( + &normalized_out_ordering, + &new_equivalent_ordering, + )); + } + } + + pub fn build(self) -> OrderingEquivalenceProperties { + self.ordering_eq_properties + } +} + /// This function applies the given projection to the given equivalence /// properties to compute the resulting (projected) equivalence properties; e.g. /// 1) Adding an alias, which can introduce additional equivalence properties,