Skip to content

Commit

Permalink
Resolve contradictory requirements by conversion of ordering sensitiv…
Browse files Browse the repository at this point in the history
…e aggregators (#6482)

* Naive test pass

i

* Add new tests and simplifications

* move tests to the .slt file

* update requirement

* update tests

* Add support for partiallyOrdered aggregation sensitive.

* Resolve linter errors

* update comments

* minor changes

* retract changes in generated

* update proto files

* Simplifications

* Make types consistent in schema, and data

* Update todos

* Convert API to vector

* Convert get_finest to handle Vector inputs

* simplifications, update comment

* initial commit, add test

* Add support for FIRST Aggregate function.

* Add support for last aggregate

* Update cargo.lock

* Remove distinct, and limit from First and last aggregate.

* Add reverse for First and Last Aggregator

* Update cargo lock

* Minor code simplifications

* Update comment

* Update documents

* Fix projection pushdown bug

* fix projection push down failure bug

* combine first_agg and last_agg parsers

* Update documentation

* Update subproject

* initial commit

* Add test code

* initial version

* simplify prints

* minor changes

* sqllogictests pass

* All tests pass

* update proto function names

* Minor changes

* do not consider ordering requirement in ordering insensitive aggregators

* Reject aggregate order by for window functions.

* initial commit

* Add new tests

* Add reverse for sort removal

* simplifications

* simplifications

* Bug fix, do not consider reverse requirement if it is not possible

* Fix cargo lock file

* Change reverse_order_by function place

* Move required input ordering calculation logic to its own function

* Add new tests

* simplifications

* Update comment

* Rename aggregator first and last

* minor change

* Comment improvements

* Remove count from First,Last accumulators

* Remove clone

* Simplifications

* Simplifications, update comment

* Improve comments

* Move LexOrdering requirement to common place

* Update comment, refactor implementation

* bug fix:

* Use naive requirement when reverse requirement is not helpful by convention.

* Update test

* Update comments

* Change function place

* Move get_finer to utls

* change name of last, first impl,

* Fix error message

* change display of first and last

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: berkaysynnada <berkay.sahin@synnada.ai>
  • Loading branch information
3 people authored Jun 3, 2023
1 parent 9d22054 commit 5ddcbc4
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 126 deletions.
5 changes: 2 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,14 +1460,13 @@ impl SessionState {
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is determined before this rule.
Arc::new(EnforceDistribution::new()),
// The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
Arc::new(CombinePartialFinalAggregate::new()),
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
// ordering. Please make sure that the whole plan tree is determined before this rule.
// Note that one should always run this rule after running the EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
// The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution
// and EnforceSorting rules
Arc::new(CombinePartialFinalAggregate::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
Expand Down
338 changes: 242 additions & 96 deletions datafusion/core/src/physical_plan/aggregates/mod.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions datafusion/core/tests/sqllogictests/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ statement error Error during planning: The percentile sample points count for Ap
SELECT approx_percentile_cont(c3, 0.95, 111.1) FROM aggregate_test_100

# csv_query_array_agg_unsupported
statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1
statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100

statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1
Expand Down Expand Up @@ -1169,7 +1169,7 @@ select c2, sum(c3) sum_c3, avg(c3) avg_c3, max(c3) max_c3, min(c3) min_c3, count
5 -194 -13.857142857143 118 -101 14

# csv_query_array_agg_unsupported
statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1
statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100;

# csv_query_array_cube_agg_with_overflow
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sqllogictests/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ physical_plan after PipelineFixer SAME TEXT AS ABOVE
physical_plan after repartition SAME TEXT AS ABOVE
physical_plan after global_sort_selection SAME TEXT AS ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
205 changes: 205 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2384,3 +2384,208 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS
FRA [200.0, 50.0] 250
GRC [80.0, 30.0] 110
TUR [100.0, 75.0] 175

# test_reverse_aggregate_expr
# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
# that have contradictory requirements at first glance.
query TT
EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
FROM sales_global
GROUP BY country
----
logical_plan
Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
----SortExec: expr=[amount@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]

query T?RR
SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
FROM sales_global
GROUP BY country
----
FRA [200.0, 50.0] 50 50
TUR [100.0, 75.0] 75 75
GRC [80.0, 30.0] 30 30

# test_reverse_aggregate_expr2
# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
# that have contradictory requirements at first glance.
query TT
EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
FROM sales_global
GROUP BY country
----
logical_plan
Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]

query T?RR
SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
FROM sales_global
GROUP BY country
----
GRC [30.0, 80.0] 30 30
FRA [50.0, 200.0] 50 50
TUR [75.0, 100.0] 75 75

# test_reverse_aggregate_expr3
# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
# that have contradictory requirements at first glance. This algorithm shouldn't depend
# on the order of the aggregation expressions.
query TT
EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
FROM sales_global
GROUP BY country
----
logical_plan
Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]

query TRR?
SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
FROM sales_global
GROUP BY country
----
GRC 30 30 [30.0, 80.0]
FRA 50 50 [50.0, 200.0]
TUR 75 75 [75.0, 100.0]

# test_reverse_aggregate_expr4
# Ordering requirement by the ordering insensitive aggregators shouldn't have effect on
# final plan. Hence seemingly conflicting requirements by SUM and ARRAY_AGG shouldn't raise error.
query TT
EXPLAIN SELECT country, SUM(amount ORDER BY ts DESC) AS sum1,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
FROM sales_global
GROUP BY country
----
logical_plan
Projection: sales_global.country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
--Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
----TableScan: sales_global projection=[country, ts, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)]
----SortExec: expr=[amount@2 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]

query TR?
SELECT country, SUM(amount ORDER BY ts DESC) AS sum1,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
FROM sales_global
GROUP BY country
----
GRC 110 [30.0, 80.0]
FRA 250 [50.0, 200.0]
TUR 175 [75.0, 100.0]

# test_reverse_aggregate_expr5
# If all of the ordering sensitive aggregation functions are reversible
# we should be able to reverse requirements, if this helps to remove a SortExec.
# Hence in query below, FIRST_VALUE, and LAST_VALUE should be reversed to calculate its result according to `ts ASC` ordering.
# Please note that after `ts ASC` ordering because of inner query. There is no SortExec in the final plan.
query TT
EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
LAST_VALUE(amount ORDER BY ts DESC) as lv1,
SUM(amount ORDER BY ts DESC) as sum1
FROM (SELECT *
FROM sales_global
ORDER BY ts ASC)
GROUP BY country
----
logical_plan
Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
----Sort: sales_global.ts ASC NULLS LAST
------TableScan: sales_global projection=[country, ts, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)]
----SortExec: expr=[ts@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]

query TRRR
SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
LAST_VALUE(amount ORDER BY ts DESC) as lv1,
SUM(amount ORDER BY ts DESC) as sum1
FROM (SELECT *
FROM sales_global
ORDER BY ts ASC)
GROUP BY country
----
GRC 80 30 110
FRA 200 50 250
TUR 100 75 175

# If existing ordering doesn't satisfy requirement, we should do calculations
# on naive requirement (by convention, otherwise the final plan will be unintuitive),
# even if reverse ordering is possible.
# hence, below query should add `SortExec(ts DESC)` to the final plan.
query TT
EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
LAST_VALUE(amount ORDER BY ts DESC) as lv1,
SUM(amount ORDER BY ts DESC) as sum1
FROM sales_global
GROUP BY country
----
logical_plan
Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
----TableScan: sales_global projection=[country, ts, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)]
----SortExec: expr=[ts@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]

query TRRR
SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
LAST_VALUE(amount ORDER BY ts DESC) as lv1,
SUM(amount ORDER BY ts DESC) as sum1
FROM sales_global
GROUP BY country
----
TUR 100 75 175
GRC 80 30 110
FRA 200 50 250

# Run order-sensitive aggregators in multiple partitions
statement ok
set datafusion.execution.target_partitions = 2;

# Currently, we do not support running order-sensitive aggregators in multiple partitions.
statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
FROM sales_global
GROUP BY country
14 changes: 12 additions & 2 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,14 @@ impl AggregateExpr for FirstValue {
}

fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
let name = if self.name.starts_with("FIRST") {
format!("LAST{}", &self.name[5..])
} else {
format!("LAST_VALUE({})", self.expr)
};
Some(Arc::new(LastValue::new(
self.expr.clone(),
self.name.clone(),
name,
self.data_type.clone(),
)))
}
Expand Down Expand Up @@ -214,9 +219,14 @@ impl AggregateExpr for LastValue {
}

fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
let name = if self.name.starts_with("LAST") {
format!("FIRST{}", &self.name[4..])
} else {
format!("FIRST_VALUE({})", self.expr)
};
Some(Arc::new(FirstValue::new(
self.expr.clone(),
self.name.clone(),
name,
self.data_type.clone(),
)))
}
Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::aggregate::row_accumulator::RowAccumulator;
use crate::expressions::{ArrayAgg, FirstValue, LastValue};
use crate::PhysicalExpr;
use arrow::datatypes::Field;
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -130,3 +131,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
)))
}
}

/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
/// However, a `FirstValue` depends on the input ordering (if the order changes,
/// the first value in the list would change).
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
|| aggr_expr.as_any().is::<ArrayAgg>()
}
6 changes: 4 additions & 2 deletions datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ pub use equivalence::{
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement};
pub use sort_expr::{
LexOrdering, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map,
split_conjunction,
reverse_order_bys, split_conjunction,
};
5 changes: 4 additions & 1 deletion datafusion/physical-expr/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str {
}
}

/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec<PhysicalSortExpr>`
///`LexOrdering` is a type alias for lexicographical ordering definition`Vec<PhysicalSortExpr>`
pub type LexOrdering = Vec<PhysicalSortExpr>;

///`LexOrderingReq` is a type alias for lexicographical ordering requirement definition`Vec<PhysicalSortRequirement>`
pub type LexOrderingReq = Vec<PhysicalSortRequirement>;
38 changes: 38 additions & 0 deletions datafusion/physical-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,44 @@ pub fn reassign_predicate_columns(
})
}

/// Reverses the ORDER BY expression, which is useful during equivalent window
/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
/// 'ORDER BY a DESC, NULLS FIRST'.
pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
order_bys
.iter()
.map(|e| PhysicalSortExpr {
expr: e.expr.clone(),
options: !e.options,
})
.collect()
}

/// Find the finer requirement among `req1` and `req2`
/// If `None`, this means that `req1` and `req2` are not compatible
/// e.g there is no requirement that satisfies both
pub fn get_finer_ordering<
'a,
F: Fn() -> EquivalenceProperties,
F2: Fn() -> OrderingEquivalenceProperties,
>(
req1: &'a [PhysicalSortExpr],
req2: &'a [PhysicalSortExpr],
eq_properties: F,
ordering_eq_properties: F2,
) -> Option<&'a [PhysicalSortExpr]> {
if ordering_satisfy_concrete(req1, req2, &eq_properties, &ordering_eq_properties) {
// Finer requirement is `provided`, since it satisfies the other:
return Some(req1);
}
if ordering_satisfy_concrete(req2, req1, &eq_properties, &ordering_eq_properties) {
// Finer requirement is `req`, since it satisfies the other:
return Some(req2);
}
// Neither `provided` nor `req` satisfies one another, they are incompatible.
None
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 5ddcbc4

Please sign in to comment.