Skip to content

Commit

Permalink
Do not resort inputs to UnionExec if they are already sorted (#4946)
Browse files Browse the repository at this point in the history
* Do not resort inputs to Union if they are already sorted

* Remove debugging
  • Loading branch information
alamb committed Jan 17, 2023
1 parent 84ba3c2 commit 279440b
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
80 changes: 79 additions & 1 deletion datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,19 @@ fn check_alignment(
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::displayable;
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::prelude::SessionContext;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_common::{Result, Statistics};
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::{col, NotExpr};
use datafusion_physical_expr::PhysicalSortExpr;
Expand Down Expand Up @@ -813,6 +817,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_union_inputs_sorted() -> Result<()> {
let schema = create_test_schema()?;

let source1 = parquet_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), source1);

let source2 = parquet_exec_sorted(&schema, sort_exprs.clone());

let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);

// one input to the union is already sorted, one is not.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should not be changed
let expected_optimized = expected_input.clone();
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
Expand Down Expand Up @@ -856,4 +887,51 @@ mod tests {
) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}

/// Create a non sorted parquet exec
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
))
}

// Created a sorted parquet exec
fn parquet_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<ParquetExec> {
let sort_exprs = sort_exprs.into_iter().collect();

Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: Some(sort_exprs),
infinite_source: false,
},
None,
None,
))
}

fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}
}
24 changes: 24 additions & 0 deletions datafusion/core/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,30 @@ impl ExecutionPlan for UnionExec {
}
}

fn maintains_input_order(&self) -> bool {
let first_input_ordering = self.inputs[0].output_ordering();
// If the Union is not partition aware and all the input
// ordering spec strictly equal with the first_input_ordering,
// then the `UnionExec` maintains the input order
//
// It might be too strict here in the case that the input
// ordering are compatible but not exactly the same. See
// comments in output_ordering
!self.partition_aware
&& first_input_ordering.is_some()
&& self
.inputs
.iter()
.map(|plan| plan.output_ordering())
.all(|ordering| {
ordering.is_some()
&& sort_expr_list_eq_strict_order(
ordering.unwrap(),
first_input_ordering.unwrap(),
)
})
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down

0 comments on commit 279440b

Please sign in to comment.