Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PartitionSearchMode into datafusion_physical_plan, rename to InputOrderMode #8364

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::{
get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
};
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode,
};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};

use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::windows::PartitionSearchMode;
use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down Expand Up @@ -611,7 +612,7 @@ fn analyze_window_sort_removal(
window_expr.to_vec(),
window_child,
partitionby_exprs.to_vec(),
PartitionSearchMode::Sorted,
InputOrderMode::Sorted,
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::physical_plan::{ExecutionPlan, Partitioning};
use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning};
use crate::prelude::{CsvReadOptions, SessionContext};

use arrow_schema::{Schema, SchemaRef, SortOptions};
Expand All @@ -44,7 +44,6 @@ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::windows::PartitionSearchMode;

use async_trait::async_trait;

Expand Down Expand Up @@ -240,7 +239,7 @@ pub fn bounded_window_exec(
.unwrap()],
input.clone(),
vec![],
PartitionSearchMode::Sorted,
InputOrderMode::Sorted,
)
.unwrap(),
)
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::unnest::UnnestExec;
use crate::physical_plan::values::ValuesExec;
use crate::physical_plan::windows::{
BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
};
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, Partitioning,
PhysicalExpr, WindowExpr,
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, InputOrderMode,
Partitioning, PhysicalExpr, WindowExpr,
};

use arrow::compute::SortOptions;
Expand Down Expand Up @@ -761,7 +759,7 @@ impl DefaultPhysicalPlanner {
window_expr,
input_exec,
physical_partition_keys,
PartitionSearchMode::Sorted,
InputOrderMode::Sorted,
)?)
} else {
Arc::new(WindowAggExec::try_new(
Expand Down
12 changes: 5 additions & 7 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use arrow::util::pretty::pretty_format_batches;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
create_window_expr, BoundedWindowAggExec, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
Expand All @@ -43,9 +43,7 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};
use datafusion_physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
Expand Down Expand Up @@ -385,9 +383,9 @@ async fn run_window_test(
random_seed: u64,
partition_by_columns: Vec<&str>,
orderby_columns: Vec<&str>,
search_mode: PartitionSearchMode,
search_mode: InputOrderMode,
) -> Result<()> {
let is_linear = !matches!(search_mode, PartitionSearchMode::Sorted);
let is_linear = !matches!(search_mode, InputOrderMode::Sorted);
let mut rng = StdRng::seed_from_u64(random_seed);
let schema = input1[0].schema();
let session_config = SessionConfig::new().with_batch_size(50);
Expand Down
30 changes: 15 additions & 15 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ use crate::aggregates::{
};

use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
get_ordered_partition_by_indices, get_window_mode, PartitionSearchMode,
};
use crate::windows::{get_ordered_partition_by_indices, get_window_mode};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning,
SendableRecordBatchStream, Statistics,
};

Expand Down Expand Up @@ -304,7 +302,9 @@ pub struct AggregateExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
required_input_ordering: Option<LexRequirement>,
partition_search_mode: PartitionSearchMode,
/// Describes how the input is ordered relative to the group by columns
input_order_mode: InputOrderMode,
/// Describe how the output is ordered
output_ordering: Option<LexOrdering>,
}

Expand Down Expand Up @@ -409,15 +409,15 @@ fn get_aggregate_search_mode(
aggr_expr: &mut [Arc<dyn AggregateExpr>],
order_by_expr: &mut [Option<LexOrdering>],
ordering_req: &mut Vec<PhysicalSortExpr>,
) -> PartitionSearchMode {
) -> InputOrderMode {
let groupby_exprs = group_by
.expr
.iter()
.map(|(item, _)| item.clone())
.collect::<Vec<_>>();
let mut partition_search_mode = PartitionSearchMode::Linear;
let mut input_order_mode = InputOrderMode::Linear;
if !group_by.is_single() || groupby_exprs.is_empty() {
return partition_search_mode;
return input_order_mode;
}

if let Some((should_reverse, mode)) =
Expand All @@ -439,9 +439,9 @@ fn get_aggregate_search_mode(
);
*ordering_req = reverse_order_bys(ordering_req);
}
partition_search_mode = mode;
input_order_mode = mode;
}
partition_search_mode
input_order_mode
}

/// Check whether group by expression contains all of the expression inside `requirement`
Expand Down Expand Up @@ -515,7 +515,7 @@ impl AggregateExec {
&input.equivalence_properties(),
)?;
let mut ordering_req = requirement.unwrap_or(vec![]);
let partition_search_mode = get_aggregate_search_mode(
let input_order_mode = get_aggregate_search_mode(
&group_by,
&input,
&mut aggr_expr,
Expand Down Expand Up @@ -567,7 +567,7 @@ impl AggregateExec {
metrics: ExecutionPlanMetricsSet::new(),
required_input_ordering,
limit: None,
partition_search_mode,
input_order_mode,
output_ordering,
})
}
Expand Down Expand Up @@ -767,8 +767,8 @@ impl DisplayAs for AggregateExec {
write!(f, ", lim=[{limit}]")?;
}

if self.partition_search_mode != PartitionSearchMode::Linear {
write!(f, ", ordering_mode={:?}", self.partition_search_mode)?;
if self.input_order_mode != InputOrderMode::Linear {
write!(f, ", ordering_mode={:?}", self.input_order_mode)?;
}
}
}
Expand Down Expand Up @@ -819,7 +819,7 @@ impl ExecutionPlan for AggregateExec {
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
if self.partition_search_mode == PartitionSearchMode::Linear {
if self.input_order_mode == InputOrderMode::Linear {
// Cannot run without breaking pipeline.
plan_err!(
"Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs."
Expand Down
12 changes: 5 additions & 7 deletions datafusion/physical-plan/src/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
mod full;
mod partial;

use crate::windows::PartitionSearchMode;
use crate::InputOrderMode;
pub(crate) use full::GroupOrderingFull;
pub(crate) use partial::GroupOrderingPartial;

Expand All @@ -42,18 +42,16 @@ impl GroupOrdering {
/// Create a `GroupOrdering` for the the specified ordering
pub fn try_new(
input_schema: &Schema,
mode: &PartitionSearchMode,
mode: &InputOrderMode,
ordering: &[PhysicalSortExpr],
) -> Result<Self> {
match mode {
PartitionSearchMode::Linear => Ok(GroupOrdering::None),
PartitionSearchMode::PartiallySorted(order_indices) => {
InputOrderMode::Linear => Ok(GroupOrdering::None),
InputOrderMode::PartiallySorted(order_indices) => {
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)
.map(GroupOrdering::Partial)
}
PartitionSearchMode::Sorted => {
Ok(GroupOrdering::Full(GroupOrderingFull::new()))
}
InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())),
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl GroupedHashAggregateStream {
.find_longest_permutation(&agg_group_by.output_exprs());
let group_ordering = GroupOrdering::try_new(
&group_schema,
&agg.partition_search_mode,
&agg.input_order_mode,
ordering.as_slice(),
)?;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
mod ordering;
pub mod projection;
pub mod repartition;
pub mod sorts;
Expand All @@ -72,6 +73,7 @@ pub mod windows;

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

Expand Down
51 changes: 51 additions & 0 deletions datafusion/physical-plan/src/ordering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Specifies how the input to an aggregation or window operator is ordered
/// relative to their `GROUP BY` or `PARTITION BY` expressions.
///
/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`
///
/// ## Window Functions
/// - A `PARTITION BY b` clause can use `Linear` mode.
/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode.
///
/// ## Aggregations
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought making the GROUP BY correspondence made this example clearer, even though there is non trivial redundancy

/// - A `GROUP BY b` clause can use `Linear` mode.
/// - A `GROUP BY a, c` or a `GROUP BY BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode.
///
/// Note these are the same examples as above, but with `GROUP BY` instead of
/// `PARTITION BY` to make the examples easier to read.
#[derive(Debug, Clone, PartialEq)]
pub enum InputOrderMode {
/// There is no partial permutation of the expressions satisfying the
/// existing ordering.
Linear,
/// There is a partial permutation of the expressions satisfying the
/// existing ordering. Indices describing the longest partial permutation
/// are stored in the vector.
PartiallySorted(Vec<usize>),
/// There is a (full) permutation of the expressions satisfying the
/// existing ordering.
Sorted,
}
Loading