From 31d58556dbf7bc9868f7694371dbdfa8be0b8105 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 10 Aug 2023 11:33:55 +0300 Subject: [PATCH 01/15] projection exec is updated, get_ordering method is added to physical expr's --- .../core/src/datasource/physical_plan/mod.rs | 4 +- .../core/src/physical_plan/joins/utils.rs | 92 +++++-- .../core/src/physical_plan/projection.rs | 249 ++++++++++++++++-- .../tests/sqllogictests/test_files/order.slt | 35 +++ datafusion/physical-expr/src/equivalence.rs | 46 +--- .../physical-expr/src/expressions/binary.rs | 101 ++++++- .../physical-expr/src/expressions/cast.rs | 11 +- .../physical-expr/src/expressions/literal.rs | 6 +- .../physical-expr/src/expressions/negative.rs | 11 +- datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/physical_expr.rs | 39 +++ datafusion/physical-expr/src/utils.rs | 128 ++++++++- 12 files changed, 642 insertions(+), 82 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index a9ca6fc90a6b..457fc3a38ed8 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -959,7 +959,9 @@ fn get_projected_output_ordering( // since rest of the orderings are violated break; } - all_orderings.push(new_ordering); + if !new_ordering.is_empty() { + all_orderings.push(new_ordering); + } } all_orderings } diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index abba191f047b..c9acf46655df 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -49,7 +49,9 @@ use datafusion_physical_expr::{ OrderingEquivalentClass, PhysicalExpr, PhysicalSortExpr, }; -use datafusion_physical_expr::utils::normalize_sort_exprs; +use datafusion_physical_expr::utils::{ + normalize_sort_expr_with_equivalence_properties, normalize_sort_exprs, +}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; @@ -190,18 +192,9 @@ pub fn calculate_join_output_ordering( // In the case below, right ordering should be offseted with the left // side length, since we append the right table to the left table. JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let updated_on_columns = on_columns - .iter() - .map(|(left, right)| { - ( - left.clone(), - Column::new(right.name(), right.index() + left_columns_len), - ) - }) - .collect::>(); let updated_right_ordering = add_offset_to_lex_ordering(right_ordering, left_columns_len)?; - (updated_right_ordering, updated_on_columns) + (updated_right_ordering, on_columns.to_vec()) } _ => (right_ordering.to_vec(), on_columns.to_vec()), }; @@ -227,9 +220,14 @@ pub fn calculate_join_output_ordering( (false, true) => { // Special case, we can prefix ordering of left side with the ordering of right side. if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) { + replace_on_columns_of_right_ordering( + &on_columns, + &mut right_ordering, + left_columns_len, + ); merge_vectors(&right_ordering, left_ordering) } else { - right_ordering + right_ordering.to_vec() } } // Doesn't maintain ordering, output ordering is None. @@ -318,24 +316,32 @@ pub fn cross_join_equivalence_properties( } /// Update right table ordering equivalences so that they point to valid indices -/// at the output of the join schema. To do so, we increment column indices by left table size +/// at the output of the join schema, and also they are normalized with equivalence +/// columns. To do so, we increment column indices by left table size /// when join schema consist of combination of left and right schema (Inner, Left, Full, Right joins). +/// Then, we normalize the sort expressions of ordering equvalences one by one. fn get_updated_right_ordering_equivalence_properties( join_type: &JoinType, right_oeq_classes: &[OrderingEquivalentClass], left_columns_len: usize, + join_eq_properties: &EquivalenceProperties, ) -> Result> { - match join_type { + let updated_indices = match join_type { // In these modes, indices of the right schema should be offset by // the left table size. JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { add_offset_to_ordering_equivalence_classes( right_oeq_classes, left_columns_len, - ) + )? } - _ => Ok(right_oeq_classes.to_vec()), - } + _ => right_oeq_classes.to_vec(), + }; + + Ok(normalize_ordering_equivalence_classes( + &updated_indices, + join_eq_properties, + )) } /// Merge left and right sort expressions, checking for duplicates. @@ -356,18 +362,19 @@ fn prefix_ordering_equivalence_with_existing_ordering( oeq_classes: &[OrderingEquivalentClass], eq_classes: &[EquivalentClass], ) -> Vec { + let existing_ordering = normalize_sort_exprs(existing_ordering, eq_classes, &[]); oeq_classes .iter() .map(|oeq_class| { let normalized_head = normalize_sort_exprs(oeq_class.head(), eq_classes, &[]); - let updated_head = merge_vectors(existing_ordering, &normalized_head); + let updated_head = merge_vectors(&existing_ordering, &normalized_head); let updated_others = oeq_class .others() .iter() .map(|ordering| { let normalized_ordering = normalize_sort_exprs(ordering, eq_classes, &[]); - merge_vectors(existing_ordering, &normalized_ordering) + merge_vectors(&existing_ordering, &normalized_ordering) }) .collect(); OrderingEquivalentClass::new(updated_head, updated_others) @@ -411,6 +418,7 @@ pub fn combine_join_ordering_equivalence_properties( join_type, right_oeq_properties.classes(), left_columns_len, + &join_eq_properties, )?; let left_output_ordering = left.output_ordering().unwrap_or(&[]); // Right side ordering equivalence properties should be prepended with @@ -435,6 +443,7 @@ pub fn combine_join_ordering_equivalence_properties( join_type, right_oeq_properties.classes(), left_columns_len, + &join_eq_properties, )?; new_properties.extend(right_oeq_classes); // In this special case, left side ordering can be prefixed with right side ordering. @@ -442,8 +451,10 @@ pub fn combine_join_ordering_equivalence_properties( && left.output_ordering().is_some() && *join_type == JoinType::Inner { - let left_oeq_classes = right_oeq_properties.classes(); + let left_oeq_classes = left_oeq_properties.classes(); let right_output_ordering = right.output_ordering().unwrap_or(&[]); + let right_output_ordering = + add_offset_to_lex_ordering(right_output_ordering, left_columns_len)?; // Left side ordering equivalence properties should be prepended with // those of the right side while constructing output ordering equivalence // properties since stream side is the right side. @@ -454,7 +465,7 @@ pub fn combine_join_ordering_equivalence_properties( // to the ordering equivalences of the join. let updated_left_oeq_classes = prefix_ordering_equivalence_with_existing_ordering( - right_output_ordering, + &right_output_ordering, left_oeq_classes, join_eq_properties.classes(), ); @@ -524,6 +535,45 @@ pub(crate) fn add_offset_to_ordering_equivalence_classes( .collect() } +fn normalize_ordering_equivalence_classes( + updated_indices: &[OrderingEquivalentClass], + join_eq_properties: &EquivalenceProperties, +) -> Vec { + updated_indices + .iter() + .map(|class| { + let head = class + .head() + .iter() + .map(|expr| { + normalize_sort_expr_with_equivalence_properties( + expr.clone(), + join_eq_properties.classes(), + ) + }) + .collect::>(); + + let others = class + .others() + .iter() + .map(|other| { + other + .iter() + .map(|expr| { + normalize_sort_expr_with_equivalence_properties( + expr.clone(), + join_eq_properties.classes(), + ) + }) + .collect() + }) + .collect(); + + EquivalentClass::new(head, others) + }) + .collect() +} + impl Display for JoinSide { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 5c4b66114328..bbed39d74cd9 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -32,8 +32,11 @@ use crate::physical_plan::{ }; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use arrow_schema::SortOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_physical_expr::utils::get_indices_of_matching_sort_exprs_with_order_eq; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -41,11 +44,11 @@ use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, Statistics}; -use datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast; -use datafusion_physical_expr::expressions::CastExpr; +use datafusion_physical_expr::expressions::UnKnownColumn; use datafusion_physical_expr::{ normalize_out_expr_with_columns_map, project_equivalence_properties, - project_ordering_equivalence_properties, OrderingEquivalenceProperties, + project_ordering_equivalence_properties, ExtendedSortOptions, + OrderingEquivalenceProperties, }; /// Execution plan for a projection @@ -64,6 +67,9 @@ pub struct ProjectionExec { columns_map: HashMap>, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Expressions' orderings calculated by output ordering and equivalence classes of input plan. + /// The projected expressions are mapped by their indices to this vector. + orderings: Vec>, } impl ProjectionExec { @@ -114,6 +120,8 @@ impl ProjectionExec { }; } + let orderings = find_orderings_of_exprs(&expr, &input)?; + // Output Ordering need to respect the alias let child_output_ordering = input.output_ordering(); let output_ordering = match child_output_ordering { @@ -136,13 +144,17 @@ impl ProjectionExec { None => None, }; + let output_ordering = + validate_output_ordering(output_ordering, &orderings, &expr); + Ok(Self { expr, schema, - input: input.clone(), + input, output_ordering, columns_map, metrics: ExecutionPlanMetricsSet::new(), + orderings, }) } @@ -251,17 +263,7 @@ impl ExecutionPlan for ProjectionExec { return new_properties; } - let mut input_oeq = self.input().ordering_equivalence_properties(); - // Stores cast expression and its `Column` version in the output: - let mut cast_exprs: Vec<(CastExpr, Column)> = vec![]; - for (idx, (expr, name)) in self.expr.iter().enumerate() { - if let Some(cast_expr) = expr.as_any().downcast_ref::() { - let target_col = Column::new(name, idx); - cast_exprs.push((cast_expr.clone(), target_col)); - } - } - - update_ordering_equivalence_with_cast(&cast_exprs, &mut input_oeq); + let input_oeq = self.input().ordering_equivalence_properties(); project_ordering_equivalence_properties( input_oeq, @@ -269,6 +271,30 @@ impl ExecutionPlan for ProjectionExec { &mut new_properties, ); + if let Some(output_ordering) = + self.output_ordering.as_ref().and_then(|o| o.get(0)) + { + for order in self.orderings.iter().flatten() { + if order.eq(output_ordering) { + continue; + } + + let add_new_oeq = + if let Some(first_class) = new_properties.classes().get(0) { + !first_class.others().iter().any(|v| &v[0] == order) + } else { + true + }; + + if add_new_oeq { + new_properties.add_equal_conditions(( + &vec![output_ordering.clone()], + &vec![order.clone()], + )); + } + } + } + new_properties } @@ -318,6 +344,199 @@ impl ExecutionPlan for ProjectionExec { } } +fn find_orderings_of_exprs( + expr: &[(Arc, String)], + input: &Arc, +) -> Result>> { + let mut orderings: Vec> = vec![]; + if let Some(input_output_ordering) = input.output_ordering().unwrap_or(&[]).get(0) { + for (index, (expression, name)) in expr.iter().enumerate() { + let initial_expr = ExprOrdering::new(expression.clone()); + let transformed = initial_expr.transform_up(&|expr| { + update_ordering( + expr, + input_output_ordering, + || input.equivalence_properties().clone(), + || input.ordering_equivalence_properties().clone(), + ) + })?; + if let Some(ExtendedSortOptions::Ordered(sort_options)) = transformed.state { + orderings.push(Some(PhysicalSortExpr { + expr: Arc::new(Column::new(name, index)), + options: sort_options, + })); + } else { + orderings.push(None); + } + } + } + Ok(orderings) +} + +// If the output ordering corresponds to an UnKnownColumn, it means that the column +// having the input output ordering is not found in any of the projected expressions. +// In that case, we set the new output ordering to be the column constructed by +// the expression residing at the leftmost side of the expressions that have an ordering. +fn validate_output_ordering( + output_ordering: Option>, + orderings: &[Option], + expr: &[(Arc, String)], +) -> Option> { + output_ordering.and_then(|ordering| { + if ordering + .get(0)? + .expr + .clone() + .as_any() + .downcast_ref::() + .is_some() + { + orderings + .iter() + .position(|o| o.is_some()) + .map(|index| { + [PhysicalSortExpr { + expr: Arc::new(Column::new(&expr[index].1, index)), + options: orderings[index].clone().unwrap().options, + }] + .to_vec() + }) + .or(None) + } else { + Some(ordering) + } + }) +} + +/// Each expression in a PhysicalExpr is stated with [`ExprOrdering`] struct. +/// Parent expression's [`ExprOrdering`] is set according to [`ExprOrdering`] of its children. +#[derive(Debug)] +struct ExprOrdering { + expr: Arc, + state: Option, + children_states: Option>, +} + +impl ExprOrdering { + fn new(expr: Arc) -> Self { + Self { + expr, + state: None, + children_states: None, + } + } + + fn children(&self) -> Vec { + self.expr + .children() + .into_iter() + .map(|e| ExprOrdering::new(e)) + .collect() + } + + pub fn new_with_children( + children_states: Vec, + parent_expr: Arc, + ) -> Self { + Self { + expr: parent_expr, + state: None, + children_states: Some(children_states), + } + } +} + +/// Calculates the a [`PhysicalExpr`] node's [`ExprOrdering`] from its children. +fn update_ordering< + F: Fn() -> EquivalenceProperties, + F2: Fn() -> OrderingEquivalenceProperties, +>( + mut node: ExprOrdering, + sort_expr: &PhysicalSortExpr, + equal_properties: F, + ordering_equal_properties: F2, +) -> Result> { + // if we can directly match a sort expr with the current node, we can set its state and return early. + // TODO: If there is a PhysicalExpr other than Column at the node (let's say a+b), and there is an + // ordering equivalence of it (let's say c+d), we actually can find it at this step. + if sort_expr.expr.eq(&node.expr) { + node.state = Some(ExtendedSortOptions::Ordered(sort_expr.options)); + return Ok(Transformed::Yes(node)); + } + + // intermediate node calculation: + if let Some(children) = &node.children_states { + let children_sort_options = children.iter().collect::>(); + let parent_sort_options = node.expr.get_ordering(&children_sort_options); + + node.state = Some(parent_sort_options); + + Ok(Transformed::Yes(node)) + } + // leaf node: (only Column and Literal do not have a child) + else { + // column leaf: + if let Some(column) = node.expr.as_any().downcast_ref::() { + node.state = get_indices_of_matching_sort_exprs_with_order_eq( + &[sort_expr.clone()], + &[column.clone()], + equal_properties, + ordering_equal_properties, + ) + .map(|(sort_options, _)| { + ExtendedSortOptions::Ordered(SortOptions { + descending: sort_options[0].descending, + nulls_first: sort_options[0].nulls_first, + }) + }); + return Ok(Transformed::Yes(node)); + } + // last opiton, literal leaf: + node.state = Some(node.expr.get_ordering(&[])); + Ok(Transformed::Yes(node)) + } +} + +impl TreeNode for ExprOrdering { + fn apply_children(&self, op: &mut F) -> Result + where + F: FnMut(&Self) -> Result, + { + let children = self.children(); + for child in children { + match op(&child)? { + VisitRecursion::Continue => {} + VisitRecursion::Skip => return Ok(VisitRecursion::Continue), + VisitRecursion::Stop => return Ok(VisitRecursion::Stop), + } + } + + Ok(VisitRecursion::Continue) + } + + fn map_children(self, transform: F) -> Result + where + F: FnMut(Self) -> Result, + { + let children = self.children(); + if children.is_empty() { + Ok(self) + } else { + let children_nodes = children + .into_iter() + .map(transform) + .collect::>>()?; + Ok(ExprOrdering::new_with_children( + children_nodes + .iter() + .map(|c| c.state.unwrap_or(ExtendedSortOptions::Unordered)) + .collect(), + self.expr, + )) + } + } +} + /// If e is a direct column reference, returns the field level /// metadata for that field, if any. Otherwise returns None fn get_field_metadata( diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt index 92faff623c1e..6a7f8ebf60ec 100644 --- a/datafusion/core/tests/sqllogictests/test_files/order.slt +++ b/datafusion/core/tests/sqllogictests/test_files/order.slt @@ -410,3 +410,38 @@ SELECT DISTINCT time as "first_seen" FROM t ORDER BY 1; ## Cleanup statement ok drop table t; + +# Create a table having 3 columns which are ordering equivalent by the source. In the next step, +# we will expect to observe the removed sort exec by propagating the orders across projection. +statement ok +CREATE EXTERNAL TABLE multiple_ordered_table ( + a0 INTEGER, + a INTEGER, + b INTEGER, + c INTEGER, + d INTEGER +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (a ASC) +WITH ORDER (b ASC) +WITH ORDER (c ASC) +LOCATION 'tests/data/window_2.csv'; + +query TT +EXPLAIN SELECT (b+a+c) AS result +FROM multiple_ordered_table +ORDER BY result; +---- +logical_plan +Sort: result ASC NULLS LAST +--Projection: multiple_ordered_table.b + multiple_ordered_table.a + multiple_ordered_table.c AS result +----TableScan: multiple_ordered_table projection=[a, b, c] +physical_plan +SortPreservingMergeExec: [result@0 ASC NULLS LAST] +--ProjectionExec: expr=[b@1 + a@0 + c@2 as result] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true + +statement ok +drop table multiple_ordered_table; \ No newline at end of file diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index d08b2e2c4add..30dbecfdc27e 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::{CastExpr, Column}; +use crate::expressions::Column; use crate::{ normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr, PhysicalSortExpr, @@ -414,7 +414,11 @@ pub fn project_equivalence_properties( class.remove(&column); } } - eq_classes.retain(|props| props.len() > 1); + + eq_classes.retain(|props| { + props.len() > 1 + && !(props.len() == 2 && props.head.eq(props.others().iter().next().unwrap())) + }); output_eq.extend(eq_classes); } @@ -446,7 +450,7 @@ pub fn project_ordering_equivalence_properties( class.update_with_aliases(&oeq_alias_map, fields); } - // Prune columns that no longer is in the schema from from the OrderingEquivalenceProperties. + // Prune columns that no longer is in the schema from the OrderingEquivalenceProperties. for class in eq_classes.iter_mut() { let sort_exprs_to_remove = class .iter() @@ -471,42 +475,6 @@ pub fn project_ordering_equivalence_properties( output_eq.extend(eq_classes); } -/// Update `ordering` if it contains cast expression with target column -/// after projection, if there is no cast expression among `ordering` expressions, -/// returns `None`. -fn update_with_cast_exprs( - cast_exprs: &[(CastExpr, Column)], - mut ordering: LexOrdering, -) -> Option { - let mut is_changed = false; - for sort_expr in ordering.iter_mut() { - for (cast_expr, target_col) in cast_exprs.iter() { - if sort_expr.expr.eq(cast_expr.expr()) { - sort_expr.expr = Arc::new(target_col.clone()) as _; - is_changed = true; - } - } - } - is_changed.then_some(ordering) -} - -/// Update cast expressions inside ordering equivalence -/// properties with its target column after projection -pub fn update_ordering_equivalence_with_cast( - cast_exprs: &[(CastExpr, Column)], - input_oeq: &mut OrderingEquivalenceProperties, -) { - for cls in input_oeq.classes.iter_mut() { - for ordering in - std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter()) - { - if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs, ordering) { - cls.insert(updated_ordering); - } - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 34633f6e1dc3..1ceff1e64cec 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -55,6 +55,7 @@ use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn}; +use arrow_schema::SortOptions; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn, @@ -82,7 +83,7 @@ use crate::array_expressions::{ use crate::expressions::cast_column; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::intervals::{apply_operator, Interval}; -use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::{down_cast_any_ref, ExtendedSortOptions}; use crate::PhysicalExpr; use datafusion_common::cast::as_boolean_array; @@ -802,6 +803,104 @@ impl PhysicalExpr for BinaryExpr { let mut s = state; self.hash(&mut s); } + + /// [`BinaryExpr`] has its own rules for each operator. + /// TODO: There may me rules specific to some data types (such as division and multiplication on unsigned integers) + fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + let (left_child, right_child) = + if let (Some(&left), Some(&right)) = (children.get(0), children.get(1)) { + (left, right) + } else { + return ExtendedSortOptions::Unordered; + }; + match self.op() { + Operator::Plus => left_child.add(right_child), + Operator::Minus => left_child.sub(right_child), + Operator::Gt | Operator::GtEq => left_child.gt_or_gteq(right_child), + Operator::Lt | Operator::LtEq => right_child.gt_or_gteq(left_child), + Operator::And => left_child.and(right_child), + _ => ExtendedSortOptions::Unordered, + } + } +} + +impl ExtendedSortOptions { + fn add(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Self::Singleton, _) => *rhs, + (_, Self::Singleton) => *self, + ( + Self::Ordered(SortOptions { + descending: left_descending, + nulls_first: left_nulls_first, + }), + Self::Ordered(SortOptions { + descending: right_descending, + nulls_first: right_nulls_first, + }), + ) if left_descending == right_descending => Self::Ordered(SortOptions { + descending: *left_descending, + nulls_first: *left_nulls_first || *right_nulls_first, + }), + _ => Self::Unordered, + } + } + + fn sub(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Self::Singleton, Self::Singleton) => Self::Singleton, + (Self::Singleton, Self::Ordered(rhs_opts)) => Self::Ordered(SortOptions { + descending: !rhs_opts.descending, + nulls_first: rhs_opts.nulls_first, + }), + (_, Self::Singleton) => *self, + (Self::Ordered(lhs_opts), Self::Ordered(rhs_opts)) + if lhs_opts.descending != rhs_opts.descending => + { + Self::Ordered(SortOptions { + descending: lhs_opts.descending, + nulls_first: lhs_opts.nulls_first || rhs_opts.nulls_first, + }) + } + _ => Self::Unordered, + } + } + + fn gt_or_gteq(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Self::Singleton, Self::Ordered(rhs_opts)) => Self::Ordered(SortOptions { + descending: !rhs_opts.descending, + nulls_first: rhs_opts.nulls_first, + }), + (_, Self::Singleton) => *self, + (Self::Ordered(lhs_opts), Self::Ordered(rhs_opts)) + if lhs_opts.descending != rhs_opts.descending => + { + *self + } + _ => Self::Unordered, + } + } + + fn and(&self, rhs: &Self) -> Self { + match (self, rhs) { + (Self::Ordered(lhs_opts), Self::Ordered(rhs_opts)) + if lhs_opts.descending == rhs_opts.descending => + { + Self::Ordered(SortOptions { + descending: lhs_opts.descending, + nulls_first: lhs_opts.nulls_first || rhs_opts.nulls_first, + }) + } + (Self::Ordered(opt), Self::Singleton) + | (Self::Singleton, Self::Ordered(opt)) => Self::Ordered(SortOptions { + descending: opt.descending, + nulls_first: opt.nulls_first, + }), + (Self::Singleton, Self::Singleton) => Self::Singleton, + _ => Self::Unordered, + } + } } impl PartialEq for BinaryExpr { diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index b6c3536a1e96..c110adef9102 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::intervals::Interval; -use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::{down_cast_any_ref, ExtendedSortOptions}; use crate::PhysicalExpr; use arrow::compute; use arrow::compute::{kernels, CastOptions}; @@ -141,6 +141,15 @@ impl PhysicalExpr for CastExpr { // Add `self.cast_options` when hash is available // https://github.com/apache/arrow-rs/pull/4395 } + + /// [`CastExpr`]'s are preserving the ordering of its child. + fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + if let Some(&&child) = children.first() { + child + } else { + ExtendedSortOptions::Unordered + } + } } impl PartialEq for CastExpr { diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 8e8671612378..4859fe7a0b7d 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::{down_cast_any_ref, ExtendedSortOptions}; use crate::PhysicalExpr; use arrow::{ @@ -88,6 +88,10 @@ impl PhysicalExpr for Literal { let mut s = state; self.hash(&mut s); } + + fn get_ordering(&self, _children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + ExtendedSortOptions::Singleton + } } impl PartialEq for Literal { diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 7f1bd43fec70..34c4d42ed213 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -32,7 +32,7 @@ use arrow::{ record_batch::RecordBatch, }; -use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::{down_cast_any_ref, ExtendedSortOptions}; use crate::PhysicalExpr; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{ @@ -134,6 +134,15 @@ impl PhysicalExpr for NegativeExpr { let mut s = state; self.hash(&mut s); } + + /// The ordering of a [`NegativeExpr`] is simply the reverse of its child. + fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + if let Some(&&child) = children.first() { + -child + } else { + ExtendedSortOptions::Unordered + } + } } impl PartialEq for NegativeExpr { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index fc29060487fc..7a0a93ddf275 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -58,7 +58,7 @@ pub use equivalence::{ EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, OrderingEquivalentClass, }; -pub use physical_expr::{PhysicalExpr, PhysicalExprRef}; +pub use physical_expr::{ExtendedSortOptions, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::{ diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index e9fb66b6345d..754b21c86015 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -22,6 +22,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use arrow_schema::SortOptions; use datafusion_common::utils::DataPtr; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; @@ -29,6 +30,7 @@ use datafusion_expr::ColumnarValue; use std::any::Any; use std::fmt::{Debug, Display}; use std::hash::{Hash, Hasher}; +use std::ops::Neg; use std::sync::Arc; /// Expression that can be evaluated against a RecordBatch @@ -126,6 +128,11 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { /// Note: [`PhysicalExpr`] is not constrained by [`Hash`] /// directly because it must remain object safe. fn dyn_hash(&self, _state: &mut dyn Hasher); + + /// Providing children's [`ExtendedSortOptions`], returns the [`ExtendedSortOptions`] of a [`PhysicalExpr`]. + fn get_ordering(&self, _children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + ExtendedSortOptions::Unordered + } } impl Hash for dyn PhysicalExpr { @@ -134,6 +141,38 @@ impl Hash for dyn PhysicalExpr { } } +/// To propagate [`SortOptions`] across the [`PhysicalExpr`], using the [`Option`] +/// structure is insufficient. There must be a differentiation between unordered columns +/// and literal values since literals do not break the ordering when they are used as a child +/// of a binary expression, if the other child has some ordering. On the other hand, unordered +/// columns cannot maintain the ordering when they take part in such operations. +#[derive(PartialEq, Debug, Clone, Copy)] +pub enum ExtendedSortOptions { + // For an ordered data, we use ordinary [`SortOptions`] + Ordered(SortOptions), + // Unordered data are represented as Unordered + Unordered, + // Singleton is used for single-valued literal numbers + Singleton, +} + +impl Neg for ExtendedSortOptions { + type Output = Self; + + fn neg(self) -> Self::Output { + match self { + ExtendedSortOptions::Ordered(SortOptions { + descending, + nulls_first, + }) => ExtendedSortOptions::Ordered(SortOptions { + descending: !descending, + nulls_first, + }), + _ => self, + } + } +} + /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 8f63d94b3b34..49a77702347d 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -25,9 +25,11 @@ use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; use arrow::datatypes::SchemaRef; +use arrow_schema::SortOptions; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRewriter, VisitRecursion, }; +use datafusion_common::utils::longest_consecutive_prefix; use datafusion_common::Result; use datafusion_expr::Operator; @@ -153,6 +155,19 @@ pub fn normalize_expr_with_equivalence_properties( .unwrap_or(expr) } +/// This function returns the head [`PhysicalSortExpr`] of equivalence set of a [`PhysicalSortExpr`], +/// if there is any, otherwise; returns the same [`PhysicalSortExpr`]. +pub fn normalize_sort_expr_with_equivalence_properties( + mut sort_requirement: PhysicalSortExpr, + eq_properties: &[EquivalentClass], +) -> PhysicalSortExpr { + sort_requirement.expr = + normalize_expr_with_equivalence_properties(sort_requirement.expr, eq_properties); + sort_requirement +} + +/// This function returns the head [`PhysicalSortRequirement`] of equivalence set of a [`PhysicalSortRequirement`], +/// if there is any, otherwise; returns the same [`PhysicalSortRequirement`]. fn normalize_sort_requirement_with_equivalence_properties( mut sort_requirement: PhysicalSortRequirement, eq_properties: &[EquivalentClass], @@ -500,7 +515,7 @@ pub fn get_indices_of_matching_exprs< /// This function finds the indices of `targets` within `items` using strict /// equality. -fn get_indices_of_exprs_strict>>( +pub fn get_indices_of_exprs_strict>>( targets: impl IntoIterator, items: &[Arc], ) -> Vec { @@ -762,6 +777,117 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { Ok(make_array(data)) } +/// Return indices of each item in `required_exprs` inside `provided_exprs`. +/// All of the indices should be found inside `provided_exprs`. +/// Also found indices should be a permutation of range consecutive range from 0 to n. +/// Such as \[2,1,0\] is valid (\[0,1,2\] is consecutive). However, \[3,1,0\] is not +/// valid (\[0,1,3\] is not consecutive). +fn get_lexicographical_match_indices( + required_exprs: &[Arc], + provided_exprs: &[Arc], +) -> Option> { + let indices_of_equality = get_indices_of_exprs_strict(required_exprs, provided_exprs); + let mut ordered_indices = indices_of_equality.clone(); + ordered_indices.sort(); + let n_match = indices_of_equality.len(); + let first_n = longest_consecutive_prefix(ordered_indices); + // If we found all the expressions, return early: + if n_match == required_exprs.len() && first_n == n_match && n_match > 0 { + return Some(indices_of_equality); + } + None +} + +/// This function attempts to find a full match between required and provided +/// sorts, returning the indices and sort options of the matches found. +/// +/// First, it normalizes the sort requirements and then checks for matches. +/// If no full match is found, it then checks against ordering equivalence properties. +/// If still no full match is found, it returns `None`. +/// required_columns columns of lexicographical ordering. +pub fn get_indices_of_matching_sort_exprs_with_order_eq< + F: Fn() -> EquivalenceProperties, + F2: Fn() -> OrderingEquivalenceProperties, +>( + provided_sorts: &[PhysicalSortExpr], + required_columns: &[Column], + equal_properties: F, + ordering_equal_properties: F2, +) -> Option<(Vec, Vec)> { + // Transform the required columns into a vector of Arc: + let required_exprs = required_columns + .iter() + .map(|required_column| Arc::new(required_column.clone()) as _) + .collect::>>(); + + // Create a vector of `PhysicalSortRequirement`s from the required expressions: + let sort_requirement_on_requirements = required_exprs + .iter() + .map(|required_expr| PhysicalSortRequirement { + expr: required_expr.clone(), + options: None, + }) + .collect::>(); + + let order_eq_properties = ordering_equal_properties(); + let eq_properties = equal_properties(); + + let normalized_required = normalize_sort_requirements( + &sort_requirement_on_requirements, + eq_properties.classes(), + &[], + ); + let normalized_provided_requirements = normalize_sort_requirements( + &PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()), + eq_properties.classes(), + &[], + ); + + let provided_sorts = normalized_provided_requirements + .iter() + .map(|req| req.expr.clone()) + .collect::>(); + + let normalized_required_expr = normalized_required + .iter() + .map(|req| req.expr.clone()) + .collect::>(); + + if let Some(indices_of_equality) = + get_lexicographical_match_indices(&normalized_required_expr, &provided_sorts) + { + return Some(( + indices_of_equality + .iter() + .filter_map(|index| normalized_provided_requirements[*index].options) + .collect(), + indices_of_equality, + )); + } + + // We did not find all the expressions, consult ordering equivalence properties: + for class in order_eq_properties.classes() { + let head = class.head(); + for ordering in class.others().iter().chain(std::iter::once(head)) { + let order_eq_class_exprs = convert_to_expr(ordering); + if let Some(indices_of_equality) = get_lexicographical_match_indices( + &normalized_required_expr, + &order_eq_class_exprs, + ) { + return Some(( + indices_of_equality + .iter() + .map(|index| ordering[*index].options) + .collect(), + indices_of_equality, + )); + } + } + } + // If no match found, return `None`: + None +} + #[cfg(test)] mod tests { use super::*; From 3b7198f291808ec0eebc3bf028f1db03aec5e8b6 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 10 Aug 2023 13:15:15 +0300 Subject: [PATCH 02/15] fix after merge --- datafusion/physical-expr/src/equivalence.rs | 21 +++++++++++++++++++ .../physical-expr/src/expressions/binary.rs | 2 +- datafusion/physical-expr/src/lib.rs | 2 +- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 30dbecfdc27e..fb0ec3cefe63 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -475,6 +475,27 @@ pub fn project_ordering_equivalence_properties( output_eq.extend(eq_classes); } +/// Retrieves the ordering equivalence properties for a given schema and output ordering. +pub fn ordering_equivalence_properties_helper( + schema: SchemaRef, + eq_orderings: &[LexOrdering], +) -> OrderingEquivalenceProperties { + let mut oep = OrderingEquivalenceProperties::new(schema); + let first_ordering = if let Some(first) = eq_orderings.first() { + first + } else { + // Return an empty OrderingEquivalenceProperties: + return oep; + }; + // First entry among eq_orderings is the head, skip it: + for ordering in eq_orderings.iter().skip(1) { + if !ordering.is_empty() { + oep.add_equal_conditions((first_ordering, ordering)) + } + } + oep +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 9ff88bb16035..22b50fb9d005 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -48,8 +48,8 @@ use arrow::record_batch::RecordBatch; use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn}; -use arrow_schema::SortOptions; use arrow::compute::kernels::concat_elements::concat_elements_utf8; +use arrow_schema::SortOptions; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 8b5e8eed9e31..63b29423a6ab 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -60,8 +60,8 @@ pub use equivalence::{ OrderingEquivalenceProperties, OrderingEquivalentClass, }; -pub use physical_expr::{ExtendedSortOptions, PhysicalExpr, PhysicalExprRef}; pub use partitioning::{Distribution, Partitioning}; +pub use physical_expr::{ExtendedSortOptions, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::{ From 6032cef819d82cf61dabcb1f249fc9ba36046bc7 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 11 Aug 2023 15:04:21 +0300 Subject: [PATCH 03/15] simplifications --- .../core/src/physical_plan/joins/utils.rs | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index c9acf46655df..b44f9a9e5710 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -188,15 +188,13 @@ pub fn calculate_join_output_ordering( assert_eq!(maintains_input_order.len(), 2); let left_maintains = maintains_input_order[0]; let right_maintains = maintains_input_order[1]; - let (mut right_ordering, on_columns) = match join_type { + let mut right_ordering = match join_type { // In the case below, right ordering should be offseted with the left // side length, since we append the right table to the left table. JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let updated_right_ordering = - add_offset_to_lex_ordering(right_ordering, left_columns_len)?; - (updated_right_ordering, on_columns.to_vec()) + add_offset_to_lex_ordering(right_ordering, left_columns_len)? } - _ => (right_ordering.to_vec(), on_columns.to_vec()), + _ => right_ordering.to_vec(), }; let output_ordering = match (left_maintains, right_maintains) { (true, true) => { @@ -208,7 +206,7 @@ pub fn calculate_join_output_ordering( // Special case, we can prefix ordering of right side with the ordering of left side. if join_type == JoinType::Inner && probe_side == Some(JoinSide::Left) { replace_on_columns_of_right_ordering( - &on_columns, + on_columns, &mut right_ordering, left_columns_len, ); @@ -221,7 +219,7 @@ pub fn calculate_join_output_ordering( // Special case, we can prefix ordering of left side with the ordering of right side. if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) { replace_on_columns_of_right_ordering( - &on_columns, + on_columns, &mut right_ordering, left_columns_len, ); @@ -319,14 +317,19 @@ pub fn cross_join_equivalence_properties( /// at the output of the join schema, and also they are normalized with equivalence /// columns. To do so, we increment column indices by left table size /// when join schema consist of combination of left and right schema (Inner, Left, Full, Right joins). -/// Then, we normalize the sort expressions of ordering equvalences one by one. +/// Then, we normalize the sort expressions of ordering equivalences one by one. +/// We make sure that, each expression in the ordering equivalence is either +/// - head of the one of the equivalent classes +/// - or doesn't have an equivalent column +/// by this way, once we normalize an expression according to equivalence properties +/// then it can be safely used for ordering equivalence normalization. fn get_updated_right_ordering_equivalence_properties( join_type: &JoinType, right_oeq_classes: &[OrderingEquivalentClass], left_columns_len: usize, join_eq_properties: &EquivalenceProperties, ) -> Result> { - let updated_indices = match join_type { + let updated_oeqs = match join_type { // In these modes, indices of the right schema should be offset by // the left table size. JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { @@ -339,7 +342,7 @@ fn get_updated_right_ordering_equivalence_properties( }; Ok(normalize_ordering_equivalence_classes( - &updated_indices, + &updated_oeqs, join_eq_properties, )) } From d58b8de44d6cd7e2504056e9a6ed823250594e36 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 14 Aug 2023 13:35:55 +0300 Subject: [PATCH 04/15] Refactor, normalization code --- .../core/src/physical_plan/joins/utils.rs | 29 ++++++------------- datafusion/physical-expr/src/utils.rs | 24 +++++++++++++-- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index b44f9a9e5710..9ced42d58527 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -50,7 +50,7 @@ use datafusion_physical_expr::{ }; use datafusion_physical_expr::utils::{ - normalize_sort_expr_with_equivalence_properties, normalize_sort_exprs, + normalize_sort_exprs, normalize_sort_exprs_with_equivalence_properties, }; use futures::future::{BoxFuture, Shared}; @@ -545,30 +545,19 @@ fn normalize_ordering_equivalence_classes( updated_indices .iter() .map(|class| { - let head = class - .head() - .iter() - .map(|expr| { - normalize_sort_expr_with_equivalence_properties( - expr.clone(), - join_eq_properties.classes(), - ) - }) - .collect::>(); + let head = normalize_sort_exprs_with_equivalence_properties( + class.head(), + join_eq_properties, + ); let others = class .others() .iter() .map(|other| { - other - .iter() - .map(|expr| { - normalize_sort_expr_with_equivalence_properties( - expr.clone(), - join_eq_properties.classes(), - ) - }) - .collect() + normalize_sort_exprs_with_equivalence_properties( + other, + join_eq_properties, + ) }) .collect(); diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 49a77702347d..adb5b3c12d5f 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -20,7 +20,9 @@ use crate::equivalence::{ OrderingEquivalentClass, }; use crate::expressions::{BinaryExpr, Column, UnKnownColumn}; -use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; +use crate::{ + LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, +}; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; @@ -157,7 +159,7 @@ pub fn normalize_expr_with_equivalence_properties( /// This function returns the head [`PhysicalSortExpr`] of equivalence set of a [`PhysicalSortExpr`], /// if there is any, otherwise; returns the same [`PhysicalSortExpr`]. -pub fn normalize_sort_expr_with_equivalence_properties( +fn normalize_sort_expr_with_equivalence_properties( mut sort_requirement: PhysicalSortExpr, eq_properties: &[EquivalentClass], ) -> PhysicalSortExpr { @@ -166,6 +168,24 @@ pub fn normalize_sort_expr_with_equivalence_properties( sort_requirement } +/// This function returns the head [`PhysicalSortExpr`] of equivalence set of a [`PhysicalSortExpr`], +/// for each `PhysicalSortExpr` inside `sort_exprs` +/// if there is any, otherwise; returns the same [`PhysicalSortExpr`]. +pub fn normalize_sort_exprs_with_equivalence_properties( + sort_exprs: LexOrderingRef, + eq_properties: &EquivalenceProperties, +) -> LexOrdering { + sort_exprs + .iter() + .map(|expr| { + normalize_sort_expr_with_equivalence_properties( + expr.clone(), + eq_properties.classes(), + ) + }) + .collect() +} + /// This function returns the head [`PhysicalSortRequirement`] of equivalence set of a [`PhysicalSortRequirement`], /// if there is any, otherwise; returns the same [`PhysicalSortRequirement`]. fn normalize_sort_requirement_with_equivalence_properties( From f341250ce913b6cc089a671133f6be1bc887f550 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 14 Aug 2023 13:48:05 +0300 Subject: [PATCH 05/15] Simplifications --- .../core/src/physical_plan/joins/utils.rs | 30 +------------------ datafusion/physical-expr/src/utils.rs | 26 ++++++++++++++++ 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 9ced42d58527..247d4cf35c87 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -50,7 +50,7 @@ use datafusion_physical_expr::{ }; use datafusion_physical_expr::utils::{ - normalize_sort_exprs, normalize_sort_exprs_with_equivalence_properties, + normalize_ordering_equivalence_classes, normalize_sort_exprs, }; use futures::future::{BoxFuture, Shared}; @@ -538,34 +538,6 @@ pub(crate) fn add_offset_to_ordering_equivalence_classes( .collect() } -fn normalize_ordering_equivalence_classes( - updated_indices: &[OrderingEquivalentClass], - join_eq_properties: &EquivalenceProperties, -) -> Vec { - updated_indices - .iter() - .map(|class| { - let head = normalize_sort_exprs_with_equivalence_properties( - class.head(), - join_eq_properties, - ); - - let others = class - .others() - .iter() - .map(|other| { - normalize_sort_exprs_with_equivalence_properties( - other, - join_eq_properties, - ) - }) - .collect(); - - EquivalentClass::new(head, others) - }) - .collect() -} - impl Display for JoinSide { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index adb5b3c12d5f..5bf85e0d69dd 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -256,6 +256,32 @@ pub fn normalize_sort_exprs( let normalized_exprs = PhysicalSortRequirement::to_sort_exprs(normalized_exprs); collapse_vec(normalized_exprs) } +/// This function makes sure that `oeq_classes` expressions, that are inside +/// `eq_properties` is head of the `eq_properties` (if it is in the others replace with head). +pub fn normalize_ordering_equivalence_classes( + oeq_classes: &[OrderingEquivalentClass], + eq_properties: &EquivalenceProperties, +) -> Vec { + oeq_classes + .iter() + .map(|class| { + let head = normalize_sort_exprs_with_equivalence_properties( + class.head(), + eq_properties, + ); + + let others = class + .others() + .iter() + .map(|other| { + normalize_sort_exprs_with_equivalence_properties(other, eq_properties) + }) + .collect(); + + EquivalentClass::new(head, others) + }) + .collect() +} /// Transform `sort_reqs` vector, to standardized version using `eq_properties` and `ordering_eq_properties` /// Assume `eq_properties` states that `Column a` and `Column b` are aliases. From 07ee9513d75bc3edf7626ffc83432824372b54dd Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 15 Aug 2023 10:37:15 +0300 Subject: [PATCH 06/15] ScalarFunctionExpr's have a maintain order flag, each func labeled as their preservation of order --- .../tests/sqllogictests/test_files/order.slt | 104 +++++++++++++++++- datafusion/physical-expr/src/functions.rs | 47 ++++++++ .../physical-expr/src/scalar_function.rs | 19 ++++ datafusion/physical-expr/src/udf.rs | 1 + .../proto/src/physical_plan/from_proto.rs | 2 + datafusion/proto/src/physical_plan/mod.rs | 2 + 6 files changed, 174 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt index 6a7f8ebf60ec..219586d84e6d 100644 --- a/datafusion/core/tests/sqllogictests/test_files/order.slt +++ b/datafusion/core/tests/sqllogictests/test_files/order.slt @@ -444,4 +444,106 @@ SortPreservingMergeExec: [result@0 ASC NULLS LAST] ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true statement ok -drop table multiple_ordered_table; \ No newline at end of file +drop table multiple_ordered_table; + +# Create tables having some ordered columns. In the next step, we will expect to observe that scalar +# functions, such as mathematical functions like atan(), ceil(), sqrt(), or date_time functions +# like date_bin() and date_trunc(), will maintain the order of its argument columns. +statement ok +CREATE EXTERNAL TABLE csv_with_timestamps ( + name VARCHAR, + ts TIMESTAMP +) +STORED AS CSV +WITH ORDER (ts ASC NULLS LAST) +LOCATION 'tests/data/timestamps.csv'; + +query TT +EXPLAIN SELECT DATE_BIN(INTERVAL '15 minutes', ts, TIMESTAMP '2022-08-03 14:40:00Z') as db15 +FROM csv_with_timestamps +ORDER BY db15; +---- +logical_plan +Sort: db15 ASC NULLS LAST +--Projection: date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15 +----TableScan: csv_with_timestamps projection=[ts] +physical_plan +SortPreservingMergeExec: [db15@0 ASC NULLS LAST] +--ProjectionExec: expr=[date_bin(900000000000, ts@0, 1659537600000000000) as db15] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], has_header=false + +query TT +EXPLAIN SELECT DATE_TRUNC('DAY', ts) as dt_day +FROM csv_with_timestamps +ORDER BY dt_day; +---- +logical_plan +Sort: dt_day ASC NULLS LAST +--Projection: date_trunc(Utf8("DAY"), csv_with_timestamps.ts) AS dt_day +----TableScan: csv_with_timestamps projection=[ts] +physical_plan +SortPreservingMergeExec: [dt_day@0 ASC NULLS LAST] +--ProjectionExec: expr=[date_trunc(DAY, ts@0) as dt_day] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], has_header=false + +statement ok +drop table csv_with_timestamps; + +statement ok +drop table aggregate_test_100; + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER(c11) +LOCATION '../../testing/data/csv/aggregate_test_100.csv' + +query TT +EXPLAIN SELECT ATAN(c11) as atan_c11 +FROM aggregate_test_100 +ORDER BY atan_c11; +---- +logical_plan +Sort: atan_c11 ASC NULLS LAST +--Projection: atan(aggregate_test_100.c11) AS atan_c11 +----TableScan: aggregate_test_100 projection=[c11] +physical_plan +SortPreservingMergeExec: [atan_c11@0 ASC NULLS LAST] +--ProjectionExec: expr=[atan(c11@0) as atan_c11] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true + +query TT +EXPLAIN SELECT CEIL(c11) as ceil_c11 +FROM aggregate_test_100 +ORDER BY ceil_c11; +---- +logical_plan +Sort: ceil_c11 ASC NULLS LAST +--Projection: ceil(aggregate_test_100.c11) AS ceil_c11 +----TableScan: aggregate_test_100 projection=[c11] +physical_plan +SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST] +--ProjectionExec: expr=[ceil(c11@0) as ceil_c11] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true + +statement ok +drop table aggregate_test_100; \ No newline at end of file diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index d1a5119ee8a3..798e18fb1fec 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -173,11 +173,14 @@ pub fn create_physical_expr( _ => create_physical_fun(fun, execution_props)?, }; + let maintains_order = get_physical_fun_order(fun); + Ok(Arc::new(ScalarFunctionExpr::new( &format!("{fun}"), fun_expr, input_phy_exprs.to_vec(), &data_type, + maintains_order, ))) } @@ -894,6 +897,50 @@ pub fn create_physical_fun( }) } +/// This function determines the preservation of order for a scalar function +/// according to its arguments. It returns an Option<(usize, bool)> where +/// the tuple contains information about the index of the function's arguments +/// which is order-preserved and whether the order is maintained or reversed. +pub fn get_physical_fun_order(fun: &BuiltinScalarFunction) -> Option<(usize, bool)> { + // math_expressions and datetime_expressions are considered only for the initial implementation of this feature. + if matches!( + fun, + BuiltinScalarFunction::Atan + | BuiltinScalarFunction::Acosh + | BuiltinScalarFunction::Asinh + | BuiltinScalarFunction::Atanh + | BuiltinScalarFunction::Ceil + | BuiltinScalarFunction::Degrees + | BuiltinScalarFunction::Exp + | BuiltinScalarFunction::Factorial + | BuiltinScalarFunction::Floor + | BuiltinScalarFunction::Ln + | BuiltinScalarFunction::Log10 + | BuiltinScalarFunction::Log2 + | BuiltinScalarFunction::Radians + | BuiltinScalarFunction::Round + | BuiltinScalarFunction::Signum + | BuiltinScalarFunction::Sinh + | BuiltinScalarFunction::Sqrt + | BuiltinScalarFunction::Cbrt + | BuiltinScalarFunction::Tanh + | BuiltinScalarFunction::Trunc + | BuiltinScalarFunction::Pi + | BuiltinScalarFunction::Log + ) { + Some((0, true)) + } else if matches!( + fun, + BuiltinScalarFunction::Log + | BuiltinScalarFunction::DateTrunc + | BuiltinScalarFunction::DateBin + ) { + Some((1, true)) + } else { + None + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index df1e459efbb0..c9f992532851 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -31,6 +31,7 @@ use crate::physical_expr::down_cast_any_ref; use crate::utils::expr_list_eq_strict_order; +use crate::ExtendedSortOptions; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; @@ -51,6 +52,9 @@ pub struct ScalarFunctionExpr { name: String, args: Vec>, return_type: DataType, + // This field is used to store which argument of the scalar function maintains + // the order information. `true` means order is maintained, `false` means order is reversed. + maintains_order: Option<(usize, bool)>, } impl Debug for ScalarFunctionExpr { @@ -71,12 +75,14 @@ impl ScalarFunctionExpr { fun: ScalarFunctionImplementation, args: Vec>, return_type: &DataType, + maintains_order: Option<(usize, bool)>, ) -> Self { Self { fun, name: name.to_owned(), args, return_type: return_type.clone(), + maintains_order, } } @@ -157,6 +163,7 @@ impl PhysicalExpr for ScalarFunctionExpr { self.fun.clone(), children, self.return_type(), + self.maintains_order, ))) } @@ -167,6 +174,18 @@ impl PhysicalExpr for ScalarFunctionExpr { self.return_type.hash(&mut s); // Add `self.fun` when hash is available } + + fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + if let Some((index, opt)) = self.maintains_order { + return if opt { + *children[index] + } else { + -*children[index] + }; + } + + ExtendedSortOptions::Unordered + } } impl PartialEq for ScalarFunctionExpr { diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr/src/udf.rs index 5aca1df8a800..af1e77cbf566 100644 --- a/datafusion/physical-expr/src/udf.rs +++ b/datafusion/physical-expr/src/udf.rs @@ -39,5 +39,6 @@ pub fn create_physical_expr( fun.fun.clone(), input_phy_exprs.to_vec(), (fun.return_type)(&input_exprs_types)?.as_ref(), + None, ))) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d9553f9e769e..e06b4f3e0cee 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -257,6 +257,7 @@ pub fn parse_physical_expr( fun_expr, args, &convert_required!(e.return_type)?, + None, )) } ExprType::ScalarUdf(e) => { @@ -273,6 +274,7 @@ pub fn parse_physical_expr( scalar_fun, args, &convert_required!(e.return_type)?, + None, )) } ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new( diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7fe193ffbb4b..c8e240ff9e97 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1825,6 +1825,7 @@ mod roundtrip_tests { fun_expr, vec![col("a", &schema)?], &DataType::Int64, + None, ); let project = @@ -1858,6 +1859,7 @@ mod roundtrip_tests { scalar_fn, vec![col("a", &schema)?], &DataType::Int64, + None, ); let project = From f4bfe522c519956d46b50c3aa3e29c96ec1ae92c Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 17 Aug 2023 14:13:19 +0300 Subject: [PATCH 07/15] Simplifications --- .../core/src/physical_plan/projection.rs | 7 ++- .../physical-expr/src/expressions/binary.rs | 9 +--- .../physical-expr/src/expressions/cast.rs | 8 +--- .../physical-expr/src/expressions/literal.rs | 2 +- .../physical-expr/src/expressions/negative.rs | 8 +--- datafusion/physical-expr/src/functions.rs | 43 ++++++++++++++++--- datafusion/physical-expr/src/physical_expr.rs | 2 +- .../physical-expr/src/scalar_function.rs | 27 +++++------- 8 files changed, 58 insertions(+), 48 deletions(-) diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index a1044ccd8df1..d1799dc5d7a5 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -465,9 +465,8 @@ fn update_ordering< } // intermediate node calculation: - if let Some(children) = &node.children_states { - let children_sort_options = children.iter().collect::>(); - let parent_sort_options = node.expr.get_ordering(&children_sort_options); + if let Some(children_sort_options) = &node.children_states { + let parent_sort_options = node.expr.get_ordering(children_sort_options); node.state = Some(parent_sort_options); @@ -491,7 +490,7 @@ fn update_ordering< }); return Ok(Transformed::Yes(node)); } - // last opiton, literal leaf: + // last option, literal leaf: node.state = Some(node.expr.get_ordering(&[])); Ok(Transformed::Yes(node)) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 22b50fb9d005..c6d8cadeeed2 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -698,13 +698,8 @@ impl PhysicalExpr for BinaryExpr { /// [`BinaryExpr`] has its own rules for each operator. /// TODO: There may me rules specific to some data types (such as division and multiplication on unsigned integers) - fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { - let (left_child, right_child) = - if let (Some(&left), Some(&right)) = (children.get(0), children.get(1)) { - (left, right) - } else { - return ExtendedSortOptions::Unordered; - }; + fn get_ordering(&self, children: &[ExtendedSortOptions]) -> ExtendedSortOptions { + let (left_child, right_child) = (&children[0], &children[1]); match self.op() { Operator::Plus => left_child.add(right_child), Operator::Minus => left_child.sub(right_child), diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 7ceaa0e97f6b..95a73b34cfb8 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -141,12 +141,8 @@ impl PhysicalExpr for CastExpr { } /// [`CastExpr`]'s are preserving the ordering of its child. - fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { - if let Some(&&child) = children.first() { - child - } else { - ExtendedSortOptions::Unordered - } + fn get_ordering(&self, children: &[ExtendedSortOptions]) -> ExtendedSortOptions { + children[0] } } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 4859fe7a0b7d..49f70d0dc2f1 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -89,7 +89,7 @@ impl PhysicalExpr for Literal { self.hash(&mut s); } - fn get_ordering(&self, _children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + fn get_ordering(&self, _children: &[ExtendedSortOptions]) -> ExtendedSortOptions { ExtendedSortOptions::Singleton } } diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 65a0d8378400..a1a228790691 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -104,12 +104,8 @@ impl PhysicalExpr for NegativeExpr { } /// The ordering of a [`NegativeExpr`] is simply the reverse of its child. - fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { - if let Some(&&child) = children.first() { - -child - } else { - ExtendedSortOptions::Unordered - } + fn get_ordering(&self, children: &[ExtendedSortOptions]) -> ExtendedSortOptions { + -children[0] } } diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 798e18fb1fec..b249879c355a 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -34,8 +34,8 @@ use crate::execution_props::ExecutionProps; use crate::{ array_expressions, conditional_expressions, datetime_expressions, expressions::{cast_column, nullif_func}, - math_expressions, string_expressions, struct_expressions, PhysicalExpr, - ScalarFunctionExpr, + math_expressions, string_expressions, struct_expressions, ExtendedSortOptions, + PhysicalExpr, ScalarFunctionExpr, }; use arrow::{ array::ArrayRef, @@ -173,14 +173,14 @@ pub fn create_physical_expr( _ => create_physical_fun(fun, execution_props)?, }; - let maintains_order = get_physical_fun_order(fun); + let monotonicity = get_func_monotonicity(fun); Ok(Arc::new(ScalarFunctionExpr::new( &format!("{fun}"), fun_expr, input_phy_exprs.to_vec(), &data_type, - maintains_order, + monotonicity, ))) } @@ -897,11 +897,34 @@ pub fn create_physical_fun( }) } +/// Stores monotonicity of the ScalarFunction +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FuncMonotonicity { + // `true` => monotonically increasing + // `false` => monotonically decreasing function (e.g reverses the ordering of its argument) + is_ascending: bool, + // Index of the variable argument + idx: usize, +} + +impl FuncMonotonicity { + pub fn out_ordering( + &self, + arg_orderings: &[ExtendedSortOptions], + ) -> ExtendedSortOptions { + if self.is_ascending { + arg_orderings[self.idx] + } else { + -arg_orderings[self.idx] + } + } +} + /// This function determines the preservation of order for a scalar function /// according to its arguments. It returns an Option<(usize, bool)> where /// the tuple contains information about the index of the function's arguments /// which is order-preserved and whether the order is maintained or reversed. -pub fn get_physical_fun_order(fun: &BuiltinScalarFunction) -> Option<(usize, bool)> { +pub fn get_func_monotonicity(fun: &BuiltinScalarFunction) -> Option { // math_expressions and datetime_expressions are considered only for the initial implementation of this feature. if matches!( fun, @@ -928,14 +951,20 @@ pub fn get_physical_fun_order(fun: &BuiltinScalarFunction) -> Option<(usize, boo | BuiltinScalarFunction::Pi | BuiltinScalarFunction::Log ) { - Some((0, true)) + Some(FuncMonotonicity { + is_ascending: true, + idx: 0, + }) } else if matches!( fun, BuiltinScalarFunction::Log | BuiltinScalarFunction::DateTrunc | BuiltinScalarFunction::DateBin ) { - Some((1, true)) + Some(FuncMonotonicity { + is_ascending: true, + idx: 1, + }) } else { None } diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 754b21c86015..bf326a909a3c 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -130,7 +130,7 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { fn dyn_hash(&self, _state: &mut dyn Hasher); /// Providing children's [`ExtendedSortOptions`], returns the [`ExtendedSortOptions`] of a [`PhysicalExpr`]. - fn get_ordering(&self, _children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { + fn get_ordering(&self, _children: &[ExtendedSortOptions]) -> ExtendedSortOptions { ExtendedSortOptions::Unordered } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index c9f992532851..fe3ab7d084c8 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -29,6 +29,7 @@ //! This module also has a set of coercion rules to improve user experience: if an argument i32 is passed //! to a function that supports f64, it is coerced to f64. +use crate::functions::FuncMonotonicity; use crate::physical_expr::down_cast_any_ref; use crate::utils::expr_list_eq_strict_order; use crate::ExtendedSortOptions; @@ -52,9 +53,8 @@ pub struct ScalarFunctionExpr { name: String, args: Vec>, return_type: DataType, - // This field is used to store which argument of the scalar function maintains - // the order information. `true` means order is maintained, `false` means order is reversed. - maintains_order: Option<(usize, bool)>, + // Keeps monotonicity information of the function + monotonicity: Option, } impl Debug for ScalarFunctionExpr { @@ -75,14 +75,14 @@ impl ScalarFunctionExpr { fun: ScalarFunctionImplementation, args: Vec>, return_type: &DataType, - maintains_order: Option<(usize, bool)>, + monotonicity: Option, ) -> Self { Self { fun, name: name.to_owned(), args, return_type: return_type.clone(), - maintains_order, + monotonicity, } } @@ -163,7 +163,7 @@ impl PhysicalExpr for ScalarFunctionExpr { self.fun.clone(), children, self.return_type(), - self.maintains_order, + self.monotonicity.clone(), ))) } @@ -175,16 +175,11 @@ impl PhysicalExpr for ScalarFunctionExpr { // Add `self.fun` when hash is available } - fn get_ordering(&self, children: &[&ExtendedSortOptions]) -> ExtendedSortOptions { - if let Some((index, opt)) = self.maintains_order { - return if opt { - *children[index] - } else { - -*children[index] - }; - } - - ExtendedSortOptions::Unordered + fn get_ordering(&self, children: &[ExtendedSortOptions]) -> ExtendedSortOptions { + self.monotonicity + .as_ref() + .map(|monotonicity| monotonicity.out_ordering(children)) + .unwrap_or(ExtendedSortOptions::Unordered) } } From 621e88f4057b127ef67a4d94b2b2b53f40fb38d1 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 24 Aug 2023 15:25:22 +0300 Subject: [PATCH 08/15] fix after merge --- .../core/src/physical_plan/projection.rs | 4 --- datafusion/physical-expr/src/functions.rs | 10 +++--- datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/physical_expr.rs | 34 ------------------- .../physical-expr/src/scalar_function.rs | 6 ++-- 5 files changed, 8 insertions(+), 48 deletions(-) diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 61f5a545ad13..f7f8b0f4524d 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -36,8 +36,6 @@ use crate::physical_plan::{ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; -use arrow_schema::SortOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::{Literal, UnKnownColumn}; @@ -120,8 +118,6 @@ impl ProjectionExec { }; } - let orderings = find_orderings_of_exprs(&expr, &input)?; - // Output Ordering need to respect the alias let child_output_ordering = input.output_ordering(); let output_ordering = match child_output_ordering { diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index c97ca1acb8b0..2723164d1e3d 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -31,11 +31,12 @@ //! argument is automatically is coerced to f64. use crate::execution_props::ExecutionProps; +use crate::sort_properties::SortProperties; use crate::{ array_expressions, conditional_expressions, datetime_expressions, expressions::{cast_column, nullif_func}, - math_expressions, string_expressions, struct_expressions, ExtendedSortOptions, - PhysicalExpr, ScalarFunctionExpr, + math_expressions, string_expressions, struct_expressions, PhysicalExpr, + ScalarFunctionExpr, }; use arrow::{ array::ArrayRef, @@ -893,10 +894,7 @@ pub struct FuncMonotonicity { } impl FuncMonotonicity { - pub fn out_ordering( - &self, - arg_orderings: &[ExtendedSortOptions], - ) -> ExtendedSortOptions { + pub fn out_ordering(&self, arg_orderings: &[SortProperties]) -> SortProperties { if self.is_ascending { arg_orderings[self.idx] } else { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index a5f9692c4e7c..a8e49bfbd6d1 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -61,7 +61,7 @@ pub use equivalence::{ }; pub use partitioning::{Distribution, Partitioning}; -pub use physical_expr::{ExtendedSortOptions, PhysicalExpr, PhysicalExprRef}; +pub use physical_expr::{PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::{ diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 43d1ce685df2..ce3b7b6cf4fe 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -23,7 +23,6 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use arrow_schema::SortOptions; use datafusion_common::utils::DataPtr; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; @@ -31,7 +30,6 @@ use datafusion_expr::ColumnarValue; use std::any::Any; use std::fmt::{Debug, Display}; use std::hash::{Hash, Hasher}; -use std::ops::Neg; use std::sync::Arc; /// Expression that can be evaluated against a RecordBatch @@ -146,38 +144,6 @@ impl Hash for dyn PhysicalExpr { } } -/// To propagate [`SortOptions`] across the [`PhysicalExpr`], using the [`Option`] -/// structure is insufficient. There must be a differentiation between unordered columns -/// and literal values since literals do not break the ordering when they are used as a child -/// of a binary expression, if the other child has some ordering. On the other hand, unordered -/// columns cannot maintain the ordering when they take part in such operations. -#[derive(PartialEq, Debug, Clone, Copy)] -pub enum ExtendedSortOptions { - // For an ordered data, we use ordinary [`SortOptions`] - Ordered(SortOptions), - // Unordered data are represented as Unordered - Unordered, - // Singleton is used for single-valued literal numbers - Singleton, -} - -impl Neg for ExtendedSortOptions { - type Output = Self; - - fn neg(self) -> Self::Output { - match self { - ExtendedSortOptions::Ordered(SortOptions { - descending, - nulls_first, - }) => ExtendedSortOptions::Ordered(SortOptions { - descending: !descending, - nulls_first, - }), - _ => self, - } - } -} - /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index fe3ab7d084c8..f1f245b16629 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -31,8 +31,8 @@ use crate::functions::FuncMonotonicity; use crate::physical_expr::down_cast_any_ref; +use crate::sort_properties::SortProperties; use crate::utils::expr_list_eq_strict_order; -use crate::ExtendedSortOptions; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; @@ -175,11 +175,11 @@ impl PhysicalExpr for ScalarFunctionExpr { // Add `self.fun` when hash is available } - fn get_ordering(&self, children: &[ExtendedSortOptions]) -> ExtendedSortOptions { + fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { self.monotonicity .as_ref() .map(|monotonicity| monotonicity.out_ordering(children)) - .unwrap_or(ExtendedSortOptions::Unordered) + .unwrap_or(SortProperties::Unordered) } } From 0fa1dfe50c26d03ec6b93a4b98a1895574bf9c73 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 24 Aug 2023 17:35:13 +0300 Subject: [PATCH 09/15] fix the test file location --- datafusion/sqllogictest/test_files/order.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 8a2dad7f530f..11aa321778d1 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -456,7 +456,7 @@ CREATE EXTERNAL TABLE csv_with_timestamps ( ) STORED AS CSV WITH ORDER (ts ASC NULLS LAST) -LOCATION 'tests/data/timestamps.csv'; +LOCATION '../core/tests/data/timestamps.csv'; query TT EXPLAIN SELECT DATE_BIN(INTERVAL '15 minutes', ts, TIMESTAMP '2022-08-03 14:40:00Z') as db15 From 2188bc342d1e6c90546d916bb1bb597b3d66d020 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Fri, 25 Aug 2023 15:45:13 +0300 Subject: [PATCH 10/15] the format of the monotonicity for scalar functions changed --- datafusion/physical-expr/src/functions.rs | 96 +++++++++++++------ .../physical-expr/src/scalar_function.rs | 8 +- datafusion/sqllogictest/test_files/order.slt | 31 ++++++ 3 files changed, 102 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 2723164d1e3d..b504433cec94 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -48,6 +48,7 @@ use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::{ BuiltinScalarFunction, ColumnarValue, ScalarFunctionImplementation, }; +use std::ops::Neg; use std::sync::Arc; /// Create a physical (function) expression. @@ -883,32 +884,72 @@ pub fn create_physical_fun( }) } -/// Stores monotonicity of the ScalarFunction -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct FuncMonotonicity { - // `true` => monotonically increasing - // `false` => monotonically decreasing function (e.g reverses the ordering of its argument) - is_ascending: bool, - // Index of the variable argument - idx: usize, +/// Monotonicity of the ScalarFunction with respect to its arguments. +/// Each element of this vector corresponds to an argument and indicates whether +/// the function's behavior is monotonic, non-monotonic or unknown for that argument. +/// - None signifies unknown monotonicity or non-monotonicity. +/// - Some(true) indicates a consistent monotonic increase or decrease in tandem. +/// - Some(false) indicates that while one argument increases, the other decreases (and vice versa). +pub type FuncMonotonicity = Vec>; + +/// Determines the [`BuiltinScalarFunction`]'s monotonicity for the given arguments +/// and function's behaviour depending on its arguments. +pub fn out_ordering( + func: &FuncMonotonicity, + arg_orderings: &[SortProperties], +) -> SortProperties { + func.iter().zip(arg_orderings).fold( + SortProperties::Singleton, + |prev_sort_prop, (item, arg)| { + let current_sort = func_order_in_one_dimension(item, arg); + + match (prev_sort_prop, current_sort) { + (_, SortProperties::Unordered) => SortProperties::Unordered, + (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, + (SortProperties::Ordered(prev), SortProperties::Ordered(current)) + if prev.descending != current.descending => + { + SortProperties::Unordered + } + _ => prev_sort_prop, + } + }, + ) } -impl FuncMonotonicity { - pub fn out_ordering(&self, arg_orderings: &[SortProperties]) -> SortProperties { - if self.is_ascending { - arg_orderings[self.idx] - } else { - -arg_orderings[self.idx] +/// Provided that how the [`BuiltinScalarFunction`] is effected by the argument and the argument's `SortProperties`, +/// the function decides how the [`BuiltinScalarFunction`] behaves for that argument. +fn func_order_in_one_dimension( + func_monotonicity: &Option, + arg: &SortProperties, +) -> SortProperties { + if *arg == SortProperties::Singleton { + SortProperties::Singleton + } else { + match func_monotonicity { + None => SortProperties::Unordered, + Some(false) => { + if let SortProperties::Ordered(_) = arg { + arg.neg() + } else { + SortProperties::Unordered + } + } + Some(true) => { + if let SortProperties::Ordered(_) = arg { + *arg + } else { + SortProperties::Unordered + } + } } } } -/// This function determines the preservation of order for a scalar function -/// according to its arguments. It returns an Option<(usize, bool)> where -/// the tuple contains information about the index of the function's arguments -/// which is order-preserved and whether the order is maintained or reversed. +/// This function determines the preservation of order for a scalar function according to its arguments. +/// The list can be extended, math_expressions and datetime_expressions are considered only for +/// the initial implementation of this feature. pub fn get_func_monotonicity(fun: &BuiltinScalarFunction) -> Option { - // math_expressions and datetime_expressions are considered only for the initial implementation of this feature. if matches!( fun, BuiltinScalarFunction::Atan @@ -932,22 +973,15 @@ pub fn get_func_monotonicity(fun: &BuiltinScalarFunction) -> Option>, return_type: DataType, - // Keeps monotonicity information of the function + // Keeps monotonicity information of the function. + // FuncMonotonicity vector is one to one mapped to `args`, + // and it specifies the effect of an increase or decrease in + // the corresponding `arg` to the function value. monotonicity: Option, } @@ -178,7 +182,7 @@ impl PhysicalExpr for ScalarFunctionExpr { fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { self.monotonicity .as_ref() - .map(|monotonicity| monotonicity.out_ordering(children)) + .map(|monotonicity| out_ordering(monotonicity, children)) .unwrap_or(SortProperties::Unordered) } } diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 11aa321778d1..8148f1c4c7c9 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -513,6 +513,7 @@ CREATE EXTERNAL TABLE aggregate_test_100 ( STORED AS CSV WITH HEADER ROW WITH ORDER(c11) +WITH ORDER(c12 DESC) LOCATION '../../testing/data/csv/aggregate_test_100.csv' query TT @@ -545,5 +546,35 @@ SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST] ----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true +query TT + EXPLAIN SELECT LOG(c11, c12) as log_c11_base_c12 + FROM aggregate_test_100 + ORDER BY log_c11_base_c12; +---- +logical_plan +Sort: log_c11_base_c12 ASC NULLS LAST +--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c11_base_c12 +----TableScan: aggregate_test_100 projection=[c11, c12] +physical_plan +SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST] +--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c11_base_c12] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_ordering=[c11@0 ASC NULLS LAST], has_header=true + +query TT +EXPLAIN SELECT LOG(c12, c11) as log_c12_base_c11 +FROM aggregate_test_100 +ORDER BY log_c12_base_c11 DESC; +---- +logical_plan +Sort: log_c12_base_c11 DESC NULLS FIRST +--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c12_base_c11 +----TableScan: aggregate_test_100 projection=[c11, c12] +physical_plan +SortPreservingMergeExec: [log_c12_base_c11@0 DESC] +--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c12_base_c11] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_ordering=[c11@0 ASC NULLS LAST], has_header=true + statement ok drop table aggregate_test_100; From f4ccf6b3c31557947c6480e69198221f6b686a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Fri, 25 Aug 2023 17:46:05 +0300 Subject: [PATCH 11/15] Update datafusion/physical-expr/src/functions.rs Co-authored-by: Mehmet Ozan Kabak --- datafusion/physical-expr/src/functions.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index b504433cec94..c6d56faddbda 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -884,12 +884,12 @@ pub fn create_physical_fun( }) } -/// Monotonicity of the ScalarFunction with respect to its arguments. +/// Monotonicity of the `ScalarFunctionExpr` with respect to its arguments. /// Each element of this vector corresponds to an argument and indicates whether -/// the function's behavior is monotonic, non-monotonic or unknown for that argument. -/// - None signifies unknown monotonicity or non-monotonicity. -/// - Some(true) indicates a consistent monotonic increase or decrease in tandem. -/// - Some(false) indicates that while one argument increases, the other decreases (and vice versa). +/// the function's behavior is monotonic, or non-monotonic/unknown for that argument, namely: +/// - `None` signifies unknown monotonicity or non-monotonicity. +/// - `Some(true)` indicates that the function is monotonically increasing w.r.t. the argument in question. +/// - Some(false) indicates that the function is monotonically decreasing w.r.t. the argument in question. pub type FuncMonotonicity = Vec>; /// Determines the [`BuiltinScalarFunction`]'s monotonicity for the given arguments From b8b6abd6663e8613bc1ce0ae42a96aa5181ee4bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Fri, 25 Aug 2023 17:46:16 +0300 Subject: [PATCH 12/15] Update datafusion/physical-expr/src/functions.rs Co-authored-by: Mehmet Ozan Kabak --- datafusion/physical-expr/src/functions.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index c6d56faddbda..089c58f08da5 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -892,8 +892,8 @@ pub fn create_physical_fun( /// - Some(false) indicates that the function is monotonically decreasing w.r.t. the argument in question. pub type FuncMonotonicity = Vec>; -/// Determines the [`BuiltinScalarFunction`]'s monotonicity for the given arguments -/// and function's behaviour depending on its arguments. +/// Determines a [`ScalarFunctionExpr`]'s monotonicity for the given arguments +/// and the function's behavior depending on its arguments. pub fn out_ordering( func: &FuncMonotonicity, arg_orderings: &[SortProperties], From 68d02349d13e12da2518825d7a81262878c1f935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Fri, 25 Aug 2023 17:46:25 +0300 Subject: [PATCH 13/15] Update datafusion/physical-expr/src/functions.rs Co-authored-by: Mehmet Ozan Kabak --- datafusion/physical-expr/src/functions.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 089c58f08da5..a0341a84172d 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -917,8 +917,7 @@ pub fn out_ordering( ) } -/// Provided that how the [`BuiltinScalarFunction`] is effected by the argument and the argument's `SortProperties`, -/// the function decides how the [`BuiltinScalarFunction`] behaves for that argument. +/// This function decides the monotonicity property of a [`ScalarFunctionExpr`] for a single argument (i.e. across a single dimension), given that argument's sort properties. fn func_order_in_one_dimension( func_monotonicity: &Option, arg: &SortProperties, From ea34b49ad2f4d001af9625c0538d15aa51447728 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Fri, 25 Aug 2023 17:46:33 +0300 Subject: [PATCH 14/15] Update datafusion/physical-expr/src/functions.rs Co-authored-by: Mehmet Ozan Kabak --- datafusion/physical-expr/src/functions.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index a0341a84172d..814763f1b1c3 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -945,9 +945,9 @@ fn func_order_in_one_dimension( } } -/// This function determines the preservation of order for a scalar function according to its arguments. -/// The list can be extended, math_expressions and datetime_expressions are considered only for -/// the initial implementation of this feature. +/// This function specifies monotonicity behaviors for built-in scalar functions. +/// The list can be extended, only mathematical and datetime functions are +/// considered for the initial implementation of this feature. pub fn get_func_monotonicity(fun: &BuiltinScalarFunction) -> Option { if matches!( fun, From cc4807b3eef01a215bcb50ae3fe704a88747a1fd Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Fri, 25 Aug 2023 17:49:12 +0300 Subject: [PATCH 15/15] renaming --- datafusion/physical-expr/src/functions.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index b504433cec94..033c89a34454 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -900,10 +900,10 @@ pub fn out_ordering( ) -> SortProperties { func.iter().zip(arg_orderings).fold( SortProperties::Singleton, - |prev_sort_prop, (item, arg)| { + |prev_sort, (item, arg)| { let current_sort = func_order_in_one_dimension(item, arg); - match (prev_sort_prop, current_sort) { + match (prev_sort, current_sort) { (_, SortProperties::Unordered) => SortProperties::Unordered, (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, (SortProperties::Ordered(prev), SortProperties::Ordered(current)) @@ -911,7 +911,7 @@ pub fn out_ordering( { SortProperties::Unordered } - _ => prev_sort_prop, + _ => prev_sort, } }, )