diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index abba191f047b..dd23e39e2774 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -44,13 +44,14 @@ use datafusion_common::{ plan_err, DataFusionError, JoinType, Result, ScalarValue, SharedResult, }; use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::utils::{ + normalize_ordering_equivalence_classes, normalize_sort_exprs, +}; use datafusion_physical_expr::{ EquivalentClass, LexOrdering, LexOrderingRef, OrderingEquivalenceProperties, OrderingEquivalentClass, PhysicalExpr, PhysicalSortExpr, }; -use datafusion_physical_expr::utils::normalize_sort_exprs; - use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use itertools::Itertools; @@ -186,24 +187,13 @@ pub fn calculate_join_output_ordering( assert_eq!(maintains_input_order.len(), 2); let left_maintains = maintains_input_order[0]; let right_maintains = maintains_input_order[1]; - let (mut right_ordering, on_columns) = match join_type { + let mut right_ordering = match join_type { // In the case below, right ordering should be offseted with the left // side length, since we append the right table to the left table. JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let updated_on_columns = on_columns - .iter() - .map(|(left, right)| { - ( - left.clone(), - Column::new(right.name(), right.index() + left_columns_len), - ) - }) - .collect::>(); - let updated_right_ordering = - add_offset_to_lex_ordering(right_ordering, left_columns_len)?; - (updated_right_ordering, updated_on_columns) + add_offset_to_lex_ordering(right_ordering, left_columns_len)? } - _ => (right_ordering.to_vec(), on_columns.to_vec()), + _ => right_ordering.to_vec(), }; let output_ordering = match (left_maintains, right_maintains) { (true, true) => { @@ -215,7 +205,7 @@ pub fn calculate_join_output_ordering( // Special case, we can prefix ordering of right side with the ordering of left side. if join_type == JoinType::Inner && probe_side == Some(JoinSide::Left) { replace_on_columns_of_right_ordering( - &on_columns, + on_columns, &mut right_ordering, left_columns_len, ); @@ -227,6 +217,11 @@ pub fn calculate_join_output_ordering( (false, true) => { // Special case, we can prefix ordering of left side with the ordering of right side. if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) { + replace_on_columns_of_right_ordering( + on_columns, + &mut right_ordering, + left_columns_len, + ); merge_vectors(&right_ordering, left_ordering) } else { right_ordering @@ -317,25 +312,39 @@ pub fn cross_join_equivalence_properties( new_properties } -/// Update right table ordering equivalences so that they point to valid indices -/// at the output of the join schema. To do so, we increment column indices by left table size -/// when join schema consist of combination of left and right schema (Inner, Left, Full, Right joins). +/// Update right table ordering equivalences so that: (1) They point to valid +/// indices at the output of the join schema, and (2) they are normalized w.r.t. +/// given equivalence properties. To do so, we first increment column indices by +/// the left table size when join schema consists of a combination of left and +/// right schemas (Inner, Left, Full, Right joins). +/// Then, we normalize the sort expressions of ordering equivalences one by one. +/// We make sure that each expression in the ordering equivalence is either: +/// - Is the head of an equivalent classes, or +/// - Doesn't have an equivalent column. +/// This way, once we normalize an expression according to equivalence properties, +/// then it can be safely used for ordering equivalence normalization. fn get_updated_right_ordering_equivalence_properties( join_type: &JoinType, right_oeq_classes: &[OrderingEquivalentClass], left_columns_len: usize, + join_eq_properties: &EquivalenceProperties, ) -> Result> { - match join_type { + let updated_oeqs = match join_type { // In these modes, indices of the right schema should be offset by // the left table size. JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { add_offset_to_ordering_equivalence_classes( right_oeq_classes, left_columns_len, - ) + )? } - _ => Ok(right_oeq_classes.to_vec()), - } + _ => right_oeq_classes.to_vec(), + }; + + Ok(normalize_ordering_equivalence_classes( + &updated_oeqs, + join_eq_properties, + )) } /// Merge left and right sort expressions, checking for duplicates. @@ -356,18 +365,19 @@ fn prefix_ordering_equivalence_with_existing_ordering( oeq_classes: &[OrderingEquivalentClass], eq_classes: &[EquivalentClass], ) -> Vec { + let existing_ordering = normalize_sort_exprs(existing_ordering, eq_classes, &[]); oeq_classes .iter() .map(|oeq_class| { let normalized_head = normalize_sort_exprs(oeq_class.head(), eq_classes, &[]); - let updated_head = merge_vectors(existing_ordering, &normalized_head); + let updated_head = merge_vectors(&existing_ordering, &normalized_head); let updated_others = oeq_class .others() .iter() .map(|ordering| { let normalized_ordering = normalize_sort_exprs(ordering, eq_classes, &[]); - merge_vectors(existing_ordering, &normalized_ordering) + merge_vectors(&existing_ordering, &normalized_ordering) }) .collect(); OrderingEquivalentClass::new(updated_head, updated_others) @@ -411,6 +421,7 @@ pub fn combine_join_ordering_equivalence_properties( join_type, right_oeq_properties.classes(), left_columns_len, + &join_eq_properties, )?; let left_output_ordering = left.output_ordering().unwrap_or(&[]); // Right side ordering equivalence properties should be prepended with @@ -435,6 +446,7 @@ pub fn combine_join_ordering_equivalence_properties( join_type, right_oeq_properties.classes(), left_columns_len, + &join_eq_properties, )?; new_properties.extend(right_oeq_classes); // In this special case, left side ordering can be prefixed with right side ordering. @@ -442,8 +454,10 @@ pub fn combine_join_ordering_equivalence_properties( && left.output_ordering().is_some() && *join_type == JoinType::Inner { - let left_oeq_classes = right_oeq_properties.classes(); + let left_oeq_classes = left_oeq_properties.classes(); let right_output_ordering = right.output_ordering().unwrap_or(&[]); + let right_output_ordering = + add_offset_to_lex_ordering(right_output_ordering, left_columns_len)?; // Left side ordering equivalence properties should be prepended with // those of the right side while constructing output ordering equivalence // properties since stream side is the right side. @@ -454,7 +468,7 @@ pub fn combine_join_ordering_equivalence_properties( // to the ordering equivalences of the join. let updated_left_oeq_classes = prefix_ordering_equivalence_with_existing_ordering( - right_output_ordering, + &right_output_ordering, left_oeq_classes, join_eq_properties.classes(), ); @@ -1368,6 +1382,7 @@ mod tests { use arrow::datatypes::Fields; use arrow::error::Result as ArrowResult; use arrow::{datatypes::DataType, error::ArrowError}; + use arrow_schema::SortOptions; use datafusion_common::ScalarValue; use std::pin::Pin; @@ -1898,4 +1913,182 @@ mod tests { Ok(()) } + + #[test] + fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> { + let join_type = JoinType::Inner; + + let options = SortOptions::default(); + let right_oeq_classes = OrderingEquivalentClass::new( + vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("x", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 1)), + options, + }, + ], + vec![vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 2)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("w", 3)), + options, + }, + ]], + ); + + let left_columns_len = 4; + + let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"] + .into_iter() + .map(|name| Field::new(name, DataType::Int32, true)) + .collect(); + + let mut join_eq_properties = + EquivalenceProperties::new(Arc::new(Schema::new(fields))); + join_eq_properties + .add_equal_conditions((&Column::new("a", 0), &Column::new("x", 4))); + join_eq_properties + .add_equal_conditions((&Column::new("d", 3), &Column::new("w", 7))); + + let result = get_updated_right_ordering_equivalence_properties( + &join_type, + &[right_oeq_classes.clone()], + left_columns_len, + &join_eq_properties, + )?; + + let expected = OrderingEquivalentClass::new( + vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 5)), + options, + }, + ], + vec![vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 6)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options, + }, + ]], + ); + + assert_eq!(result[0].head(), expected.head()); + assert_eq!(result[0].others(), expected.others()); + + Ok(()) + } + + #[test] + fn test_calculate_join_output_ordering() -> Result<()> { + let options = SortOptions::default(); + let left_ordering = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("c", 2)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options, + }, + ]; + let right_ordering = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 2)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 1)), + options, + }, + ]; + let join_type = JoinType::Inner; + let on_columns = [(Column::new("b", 1), Column::new("x", 0))]; + let left_columns_len = 5; + let maintains_input_orders = [[true, false], [false, true]]; + let probe_sides = [Some(JoinSide::Left), Some(JoinSide::Right)]; + + let expected = [ + Some(vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("c", 2)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 7)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 6)), + options, + }, + ]), + Some(vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 7)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 6)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("c", 2)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options, + }, + ]), + ]; + + for (i, (maintains_input_order, probe_side)) in + maintains_input_orders.iter().zip(probe_sides).enumerate() + { + assert_eq!( + calculate_join_output_ordering( + &left_ordering, + &right_ordering, + join_type, + &on_columns, + left_columns_len, + maintains_input_order, + probe_side + )?, + expected[i] + ); + } + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 168ee0138df8..c61077f1a0d4 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -69,6 +69,6 @@ pub use sort_expr::{ }; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, - normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map, - reverse_order_bys, split_conjunction, + normalize_expr_with_equivalence_properties, normalize_ordering_equivalence_classes, + normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction, }; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 8f63d94b3b34..1492d6008e50 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -20,7 +20,9 @@ use crate::equivalence::{ OrderingEquivalentClass, }; use crate::expressions::{BinaryExpr, Column, UnKnownColumn}; -use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; +use crate::{ + LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, +}; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; @@ -153,6 +155,41 @@ pub fn normalize_expr_with_equivalence_properties( .unwrap_or(expr) } +/// This function returns the normalized version of `sort_expr` with respect to +/// `eq_properties`, if possible. Otherwise, it returns its first argument as is. +/// Note that this simply means returning the head [`PhysicalSortExpr`] in the +/// given equivalence set. +fn normalize_sort_expr_with_equivalence_properties( + mut sort_expr: PhysicalSortExpr, + eq_properties: &[EquivalentClass], +) -> PhysicalSortExpr { + sort_expr.expr = + normalize_expr_with_equivalence_properties(sort_expr.expr, eq_properties); + sort_expr +} + +/// This function returns the normalized version of every [`PhysicalSortExpr`] +/// in `sort_exprs` w.r.t. `eq_properties`, if possible. The [`PhysicalSortExpr`]s +/// for which this is impossible are returned as is. Basically, this function +/// applies [`normalize_sort_expr_with_equivalence_properties`] to multiple +/// [`PhysicalSortExpr`]s at once. +pub fn normalize_sort_exprs_with_equivalence_properties( + sort_exprs: LexOrderingRef, + eq_properties: &EquivalenceProperties, +) -> LexOrdering { + sort_exprs + .iter() + .map(|expr| { + normalize_sort_expr_with_equivalence_properties( + expr.clone(), + eq_properties.classes(), + ) + }) + .collect() +} + +/// This function returns the head [`PhysicalSortRequirement`] of equivalence set of a [`PhysicalSortRequirement`], +/// if there is any, otherwise; returns the same [`PhysicalSortRequirement`]. fn normalize_sort_requirement_with_equivalence_properties( mut sort_requirement: PhysicalSortRequirement, eq_properties: &[EquivalentClass], @@ -222,6 +259,34 @@ pub fn normalize_sort_exprs( collapse_vec(normalized_exprs) } +/// This function "normalizes" its argument `oeq_classes` by making sure that +/// it only refers to representative (i.e. head) entries in the given equivlance +/// properties (`eq_properties`). +pub fn normalize_ordering_equivalence_classes( + oeq_classes: &[OrderingEquivalentClass], + eq_properties: &EquivalenceProperties, +) -> Vec { + oeq_classes + .iter() + .map(|class| { + let head = normalize_sort_exprs_with_equivalence_properties( + class.head(), + eq_properties, + ); + + let others = class + .others() + .iter() + .map(|other| { + normalize_sort_exprs_with_equivalence_properties(other, eq_properties) + }) + .collect(); + + EquivalentClass::new(head, others) + }) + .collect() +} + /// Transform `sort_reqs` vector, to standardized version using `eq_properties` and `ordering_eq_properties` /// Assume `eq_properties` states that `Column a` and `Column b` are aliases. /// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]` and `vec![a ASC, c ASC]` are