Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 17 additions & 41 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs};
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
use datafusion_physical_expr::{
normalize_expr_with_equivalence_properties, OrderedColumn,
OrderingEquivalenceProperties, PhysicalSortRequirement,
OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement,
};
pub use window_agg_exec::WindowAggExec;

Expand Down Expand Up @@ -254,30 +254,10 @@ pub(crate) fn window_ordering_equivalence(
) -> OrderingEquivalenceProperties {
// We need to update the schema, so we can not directly use
// `input.ordering_equivalence_properties()`.
let mut result = OrderingEquivalenceProperties::new(schema.clone());
result.extend(
input
.ordering_equivalence_properties()
.classes()
.iter()
.cloned(),
);
let mut normalized_out_ordering = vec![];
for item in input.output_ordering().unwrap_or(&[]) {
// To account for ordering equivalences, first normalize the expression:
let normalized = normalize_expr_with_equivalence_properties(
item.expr.clone(),
input.equivalence_properties().classes(),
);
// Currently we only support, ordering equivalences for `Column` expressions.
// TODO: Add support for ordering equivalence for all `PhysicalExpr`s
if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
normalized_out_ordering
.push(OrderedColumn::new(column.clone(), item.options));
} else {
break;
}
}
let mut builder = OrderingEquivalenceBuilder::new(schema.clone())
.with_equivalences(input.equivalence_properties())
.with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec()))
.extend(input.ordering_equivalence_properties());
for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
Expand All @@ -289,25 +269,21 @@ pub(crate) fn window_ordering_equivalence(
.as_any()
.is::<RowNumber>()
{
// If there is an existing ordering, add new ordering as an equivalence:
if !normalized_out_ordering.is_empty() {
if let Some((idx, field)) =
schema.column_with_name(expr.field().unwrap().name())
{
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = OrderedColumn::new(column, options);
result
.add_equal_conditions((&normalized_out_ordering, &vec![rhs]));
}
if let Some((idx, field)) =
schema.column_with_name(expr.field().unwrap().name())
{
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
let rhs = OrderedColumn::new(column, options);
builder.add_equal_conditions(vec![rhs]);
}
}
}
}
result
builder.build()
}
#[cfg(test)]
mod tests {
Expand Down
78 changes: 77 additions & 1 deletion datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// under the License.

use crate::expressions::Column;
use crate::{PhysicalSortExpr, PhysicalSortRequirement};
use crate::{
normalize_expr_with_equivalence_properties, PhysicalSortExpr, PhysicalSortRequirement,
};

use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
Expand Down Expand Up @@ -259,6 +261,80 @@ impl OrderingEquivalentClass {
}
}

/// This is a builder object facilitating incremental construction
/// for ordering equivalences.
pub struct OrderingEquivalenceBuilder {
eq_properties: EquivalenceProperties,
ordering_eq_properties: OrderingEquivalenceProperties,
existing_ordering: Vec<PhysicalSortExpr>,
}

impl OrderingEquivalenceBuilder {
pub fn new(schema: SchemaRef) -> Self {
let eq_properties = EquivalenceProperties::new(schema.clone());
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema);
Self {
eq_properties,
ordering_eq_properties,
existing_ordering: vec![],
}
}

pub fn extend(
mut self,
new_ordering_eq_properties: OrderingEquivalenceProperties,
) -> Self {
self.ordering_eq_properties
.extend(new_ordering_eq_properties.classes().iter().cloned());
self
}

pub fn with_existing_ordering(
mut self,
existing_ordering: Option<Vec<PhysicalSortExpr>>,
) -> Self {
if let Some(existing_ordering) = existing_ordering {
self.existing_ordering = existing_ordering;
}
self
}

pub fn with_equivalences(mut self, new_eq_properties: EquivalenceProperties) -> Self {
self.eq_properties = new_eq_properties;
self
}

pub fn add_equal_conditions(&mut self, new_equivalent_ordering: Vec<OrderedColumn>) {
let mut normalized_out_ordering = vec![];
for item in &self.existing_ordering {
// To account for ordering equivalences, first normalize the expression:
let normalized = normalize_expr_with_equivalence_properties(
item.expr.clone(),
self.eq_properties.classes(),
);
// Currently we only support ordering equivalences for `Column` expressions.
// TODO: Add support for ordering equivalence for all `PhysicalExpr`s.
if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
normalized_out_ordering
.push(OrderedColumn::new(column.clone(), item.options));
} else {
break;
}
}
// If there is an existing ordering, add new ordering as an equivalence:
if !normalized_out_ordering.is_empty() {
self.ordering_eq_properties.add_equal_conditions((
&normalized_out_ordering,
&new_equivalent_ordering,
));
}
}

pub fn build(self) -> OrderingEquivalenceProperties {
self.ordering_eq_properties
}
}

/// This function applies the given projection to the given equivalence
/// properties to compute the resulting (projected) equivalence properties; e.g.
/// 1) Adding an alias, which can introduce additional equivalence properties,
Expand Down