diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 7da8005f90ec..b3f1cbdbaf7b 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -45,6 +45,11 @@ pub struct ListingOptions { /// Group files to avoid that the number of partitions exceeds /// this limit pub target_partitions: usize, + /// Preserve the partition column value boundaries when forming file groups. + /// When true, files that share the same partition column values are kept in + /// the same execution partition, allowing downstream operators to rely on + /// those columns being constant per partition. + pub preserve_partition_values: bool, /// Optional pre-known sort order(s). Must be `SortExpr`s. /// /// DataFusion may take advantage of this ordering to omit sorts @@ -77,6 +82,7 @@ impl ListingOptions { table_partition_cols: vec![], collect_stat: false, target_partitions: 1, + preserve_partition_values: false, file_sort_order: vec![], } } @@ -89,6 +95,12 @@ impl ListingOptions { pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self { self = self.with_target_partitions(config.target_partitions()); self = self.with_collect_stat(config.collect_statistics()); + if !self.table_partition_cols.is_empty() { + self.preserve_partition_values = config + .options() + .execution + .listing_table_preserve_partition_values; + } self } @@ -239,6 +251,14 @@ impl ListingOptions { self } + /// Control whether files that share the same partition column values should + /// remain in the same execution partition. Defaults to `false`, but is + /// automatically enabled when partition columns are provided. + pub fn with_preserve_partition_values(mut self, preserve: bool) -> Self { + self.preserve_partition_values = preserve; + self + } + /// Set file sort order on [`ListingOptions`] and returns self. /// /// ``` diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 33d5c86bf88d..40882d684e57 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -47,6 +47,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::ExecutionPlan; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; +use log::debug; use object_store::ObjectStore; use std::any::Any; use std::collections::HashMap; @@ -457,32 +458,42 @@ impl TableProvider for ListingTable { } let output_ordering = self.try_create_output_ordering(state.execution_props())?; - match state - .config_options() - .execution - .split_file_groups_by_statistics - .then(|| { - output_ordering.first().map(|output_ordering| { - FileScanConfig::split_groups_by_statistics_with_target_partitions( - &self.table_schema, - &partitioned_file_lists, - output_ordering, - self.options.target_partitions, - ) + if !self.options.preserve_partition_values { + // Split file groups by statistics to optimize sorted scans + match state + .config_options() + .execution + .split_file_groups_by_statistics + .then(|| { + output_ordering.first().map(|output_ordering| { + FileScanConfig::split_groups_by_statistics_with_target_partitions( + &self.table_schema, + &partitioned_file_lists, + output_ordering, + self.options.target_partitions, + ) + }) }) - }) - .flatten() - { - Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), - Some(Ok(new_groups)) => { - if new_groups.len() <= self.options.target_partitions { - partitioned_file_lists = new_groups; - } else { - log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") + .flatten() + { + Some(Err(e)) => { + log::debug!("failed to split file groups by statistics: {e}") } - } - None => {} // no ordering required - }; + Some(Ok(new_groups)) => { + if new_groups.len() <= self.options.target_partitions { + partitioned_file_lists = new_groups; + } else { + log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") + } + } + None => {} // no ordering required + }; + } else { + debug!( + "Skipping statistics-based file splitting because preserve_partition_values is enabled. \ + File groups will respect partition boundaries instead of sort statistics." + ); + } let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) @@ -508,6 +519,9 @@ impl TableProvider for ListingTable { .with_limit(limit) .with_output_ordering(output_ordering) .with_expr_adapter(self.expr_adapter_factory.clone()) + .with_preserve_partition_values( + self.options.preserve_partition_values, + ) .build(), ) .await?; @@ -653,7 +667,12 @@ impl ListingTable { let (file_group, inexact_stats) = get_files_with_limit(files, limit, self.options.collect_stat).await?; - let file_groups = file_group.split_files(self.options.target_partitions); + let target_partitions = self.options.target_partitions.max(1); + let file_groups = if self.options.preserve_partition_values { + file_group.split_by_partition_values() + } else { + file_group.split_files(target_partitions) + }; let (mut file_groups, mut stats) = compute_all_files_statistics( file_groups, self.schema(), diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 212db653f713..49f884cc8432 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -591,6 +591,14 @@ config_namespace! { /// inferred and will be represented in the table schema). pub listing_table_factory_infer_partitions: bool, default = true + /// Should a `ListingTable` created through the `ListingTableFactory` keep files that share + /// the same partition column values in the same execution partition when `PARTITIONED BY` + /// columns are declared. Defaults to false to avoid overhead on non-partitioned tables. + /// Optimization is best with moderate partition counts (10-100 partitions) and large partitions + /// (>500K rows). With many small partitions (500+), task scheduling overhead may outweigh shuffle + /// savings. + pub listing_table_preserve_partition_values: bool, default = false + /// Should DataFusion support recursive CTEs pub enable_recursive_ctes: bool, default = true diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 0ddf195d4eff..d30aa1768d41 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -271,3 +271,8 @@ name = "dataframe" [[bench]] harness = false name = "spm" + +[[bench]] +harness = false +name = "hive_partitioned" +required-features = ["parquet"] diff --git a/datafusion/core/benches/hive_partitioned.rs b/datafusion/core/benches/hive_partitioned.rs new file mode 100644 index 000000000000..41560caca3ac --- /dev/null +++ b/datafusion/core/benches/hive_partitioned.rs @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int32Array}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use std::fs::{self, File}; +use std::io::Write; +use std::path::Path; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::runtime::Runtime; + +/// Generate partitioned data for benchmarking +/// Creates `num_partitions` directories, each with one parquet file containing `rows_per_partition`. +fn generate_partitioned_data( + base_dir: &Path, + num_partitions: usize, + rows_per_partition: usize, +) { + let schema = Arc::new(Schema::new(vec![Field::new("val", DataType::Int32, false)])); + + let props = WriterProperties::builder().build(); + + for i in 0..num_partitions { + let part_dir = base_dir.join(format!("part_col={i}")); + fs::create_dir_all(&part_dir).unwrap(); + let file_path = part_dir.join("data.parquet"); + let file = File::create(file_path).unwrap(); + + let mut writer = + ArrowWriter::try_new(file, schema.clone(), Some(props.clone())).unwrap(); + + // Generate data: just a simple sequence + let vals: Vec = (0..rows_per_partition).map(|x| x as i32).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vals)) as ArrayRef], + ) + .unwrap(); + + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } +} + +/// Save execution plans to file +async fn save_plans(table_path: &str, output_file: &Path) { + let query = "SELECT part_col, count(*), sum(val), avg(val) FROM t GROUP BY part_col"; + let mut file = File::create(output_file).unwrap(); + + writeln!(file, "KeyPartitioned Aggregation Benchmark Plans\n").unwrap(); + writeln!(file, "Query: {query}\n").unwrap(); + + // Optimized plan + let config_opt = SessionConfig::new().with_target_partitions(20).set_bool( + "datafusion.execution.listing_table_preserve_partition_values", + true, + ); + let ctx_opt = SessionContext::new_with_config(config_opt); + let options = ParquetReadOptions { + table_partition_cols: vec![("part_col".to_string(), DataType::Int32)], + ..Default::default() + }; + ctx_opt + .register_parquet("t", table_path, options.clone()) + .await + .unwrap(); + + let df_opt = ctx_opt.sql(query).await.unwrap(); + let plan_opt = df_opt + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + writeln!(file, "=== WITH KeyPartitioned Optimization ===").unwrap(); + writeln!(file, "{}\n", pretty_format_batches(&plan_opt).unwrap()).unwrap(); + + // Unoptimized plan + let config_unopt = SessionConfig::new().with_target_partitions(20).set_bool( + "datafusion.execution.listing_table_preserve_partition_values", + false, + ); + let ctx_unopt = SessionContext::new_with_config(config_unopt); + ctx_unopt + .register_parquet("t", table_path, options) + .await + .unwrap(); + + let df_unopt = ctx_unopt.sql(query).await.unwrap(); + let plan_unopt = df_unopt + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + writeln!(file, "=== WITHOUT KeyPartitioned Optimization ===").unwrap(); + writeln!(file, "{}", pretty_format_batches(&plan_unopt).unwrap()).unwrap(); +} + +fn run_benchmark(c: &mut Criterion) { + // Benchmark KeyPartitioned optimization for aggregations on Hive-partitioned tables + // 20 partitions × 500K rows = 10M total rows + let partitions = 20; + let rows_per_partition = 500_000; + let tmp_dir = TempDir::new().unwrap(); + + generate_partitioned_data(tmp_dir.path(), partitions, rows_per_partition); + let table_path = tmp_dir.path().to_str().unwrap(); + + let rt = Runtime::new().unwrap(); + + // Save execution plans if SAVE_PLANS env var is set + if std::env::var("SAVE_PLANS").is_ok() { + let output_path = Path::new("hive_partitioned_plans.txt"); + rt.block_on(save_plans(table_path, output_path)); + println!("Execution plans saved to {}", output_path.display()); + } + + let query = "SELECT part_col, count(*), sum(val), avg(val) FROM t GROUP BY part_col"; + let mut group = c.benchmark_group("hive_partitioned_agg"); + + group.bench_function("with_key_partitioned", |b| { + b.to_async(&rt).iter(|| async { + let config = SessionConfig::new().with_target_partitions(20).set_bool( + "datafusion.execution.listing_table_preserve_partition_values", + true, + ); + let ctx = SessionContext::new_with_config(config); + + let options = ParquetReadOptions { + table_partition_cols: vec![("part_col".to_string(), DataType::Int32)], + ..Default::default() + }; + + ctx.register_parquet("t", table_path, options) + .await + .unwrap(); + + let df = ctx.sql(query).await.unwrap(); + df.collect().await.unwrap(); + }) + }); + + group.bench_function("without_key_partitioned", |b| { + b.to_async(&rt).iter(|| async { + let config = SessionConfig::new().with_target_partitions(20).set_bool( + "datafusion.execution.listing_table_preserve_partition_values", + false, + ); + let ctx = SessionContext::new_with_config(config); + + let options = ParquetReadOptions { + table_partition_cols: vec![("part_col".to_string(), DataType::Int32)], + ..Default::default() + }; + + ctx.register_parquet("t", table_path, options) + .await + .unwrap(); + + let df = ctx.sql(query).await.unwrap(); + df.collect().await.unwrap(); + }) + }); + + group.finish(); +} + +criterion_group!(benches, run_benchmark); +criterion_main!(benches); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index f98297d0e3f7..c7ff469b3205 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -132,7 +132,13 @@ impl TableProviderFactory for ListingTableFactory { }; options = options.with_table_partition_cols(table_partition_cols); - + if !cmd.table_partition_cols.is_empty() { + let preserve = session_state + .config_options() + .execution + .listing_table_preserve_partition_values; + options = options.with_preserve_partition_values(preserve); + } options .validate_partitions(session_state, &table_path) .await?; @@ -216,6 +222,7 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, }; + use arrow::datatypes::{Field, Schema}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{Constraints, DFSchema, TableReference}; @@ -519,4 +526,77 @@ mod tests { let listing_options = listing_table.options(); assert!(listing_options.table_partition_cols.is_empty()); } + + fn partitioned_table_cmd( + location: &Path, + schema: Arc, + ) -> CreateExternalTable { + CreateExternalTable { + name: TableReference::bare("fact"), + location: location.to_str().unwrap().to_string(), + file_type: "parquet".to_string(), + schema, + table_partition_cols: vec!["bucket".to_string()], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Constraints::default(), + column_defaults: HashMap::new(), + } + } + + fn value_and_partition_schema() -> Arc { + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int64, true), + Field::new("bucket", DataType::Int64, true), + ])); + Arc::new(arrow_schema.to_dfschema().unwrap()) + } + + #[tokio::test] + async fn test_preserve_partition_values_disabled_by_default() { + let dir = tempfile::tempdir().unwrap(); + let factory = ListingTableFactory::new(); + let context = SessionContext::new(); + let state = context.state(); + let cmd = partitioned_table_cmd(dir.path(), value_and_partition_schema()); + + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!listing_table.options().preserve_partition_values); + } + + #[tokio::test] + async fn test_preserve_partition_values_can_be_enabled() { + let dir = tempfile::tempdir().unwrap(); + let factory = ListingTableFactory::new(); + let mut config = SessionConfig::new(); + config + .options_mut() + .execution + .listing_table_preserve_partition_values = true; + let context = SessionContext::new_with_config(config); + let state = context.state(); + assert!( + state + .config_options() + .execution + .listing_table_preserve_partition_values + ); + let cmd = partitioned_table_cmd(dir.path(), value_and_partition_schema()); + + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider + .as_any() + .downcast_ref::() + .unwrap(); + assert!(listing_table.options().preserve_partition_values); + } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2ae5aed30df9..2d1f036d394d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -100,6 +100,7 @@ use datafusion_physical_plan::metrics::MetricType; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::recursive_query::RecursiveQueryExec; use datafusion_physical_plan::unnest::ListUnnest; +use datafusion_physical_plan::Distribution; use async_trait::async_trait; use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper}; @@ -799,9 +800,16 @@ impl DefaultPhysicalPlanner { Arc::clone(&physical_input_schema), )?); - let can_repartition = !groups.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_aggregations(); + let grouping_input_exprs = groups.input_exprs(); + let output_partitioning = initial_aggr.properties().output_partitioning(); + + let key_partition_on_group = groups.is_single() + && !groups.expr().is_empty() + && matches!(output_partitioning, Partitioning::KeyPartitioned(..)) + && output_partitioning.satisfy( + &Distribution::HashPartitioned(grouping_input_exprs), + initial_aggr.properties().equivalence_properties(), + ); // Some aggregators may be modified during initialization for // optimization purposes. For example, a FIRST_VALUE may turn @@ -809,8 +817,27 @@ impl DefaultPhysicalPlanner { // To reflect such changes to subsequent stages, use the updated // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); + let has_ordered_aggregate = updated_aggregates + .iter() + .any(|agg| !agg.order_bys().is_empty()); + let requires_single_partition = + groups.expr().is_empty() || has_ordered_aggregate; + + // Determine if we can use parallel final aggregation: + // 1. KeyPartitioned on grouping keys with multiple partitions + // 2. Hash repartition enabled (only if not already KeyPartitioned) + let child_partition_count = output_partitioning.partition_count(); + let key_partition_supports_parallel = + key_partition_on_group && child_partition_count > 1; + // Only enable hash repartition if we're NOT already KeyPartitioned on the group keys + let hash_repartition_enabled = !key_partition_on_group + && !groups.is_empty() + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_aggregations(); + let use_partitioned_final = !requires_single_partition + && (key_partition_supports_parallel || hash_repartition_enabled); - let next_partition_mode = if can_repartition { + let next_partition_mode = if use_partitioned_final { // construct a second aggregation with 'AggregateMode::FinalPartitioned' AggregateMode::FinalPartitioned } else { diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index 9c76f6ab6f58..e5f5da46ff12 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -105,6 +105,26 @@ fn final_aggregate_exec( ) } +fn final_partitioned_aggregate_exec( + input: Arc, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, +) -> Arc { + let schema = input.schema(); + let n_aggr = aggr_expr.len(); + Arc::new( + AggregateExec::try_new( + AggregateMode::FinalPartitioned, + group_by, + aggr_expr, + vec![None; n_aggr], + input, + schema, + ) + .unwrap(), + ) +} + fn repartition_exec(input: Arc) -> Arc { Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap()) } @@ -274,3 +294,62 @@ fn aggregations_with_limit_combined() -> datafusion_common::Result<()> { ); Ok(()) } + +#[test] +fn aggregations_final_partitioned_combined() -> datafusion_common::Result<()> { + let schema = schema(); + let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)]; + + let plan = final_partitioned_aggregate_exec( + partial_aggregate_exec( + parquet_exec(schema), + PhysicalGroupBy::default(), + aggr_expr.clone(), + ), + PhysicalGroupBy::default(), + aggr_expr, + ); + // should combine the Partial/FinalPartitioned AggregateExecs to the SinglePartitioned AggregateExec + assert_optimized!( + plan, + @ " + AggregateExec: mode=SinglePartitioned, gby=[], aggr=[COUNT(1)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); + Ok(()) +} + +#[test] +fn aggregations_final_partitioned_with_group_combined() -> datafusion_common::Result<()> { + let schema = schema(); + let aggr_expr = vec![ + AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("Sum(b)") + .build() + .map(Arc::new) + .unwrap(), + ]; + let groups: Vec<(Arc, String)> = + vec![(col("c", &schema)?, "c".to_string())]; + + let partial_group_by = PhysicalGroupBy::new_single(groups); + let partial_agg = + partial_aggregate_exec(parquet_exec(schema), partial_group_by, aggr_expr.clone()); + + let groups: Vec<(Arc, String)> = + vec![(col("c", &partial_agg.schema())?, "c".to_string())]; + let final_group_by = PhysicalGroupBy::new_single(groups); + + let plan = final_partitioned_aggregate_exec(partial_agg, final_group_by, aggr_expr); + // should combine the Partial/FinalPartitioned AggregateExecs to the SinglePartitioned AggregateExec + assert_optimized!( + plan, + @ r" + AggregateExec: mode=SinglePartitioned, gby=[c@2 as c], aggr=[Sum(b)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 7034b71fd500..96b69dab6252 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -77,6 +77,10 @@ mod test { target_partition: Option, ) -> Arc { let mut session_config = SessionConfig::new().with_collect_statistics(true); + session_config + .options_mut() + .execution + .listing_table_preserve_partition_values = false; if let Some(partition) = target_partition { session_config = session_config.with_target_partitions(partition); } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 998d09285cf1..93634f8881e6 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -18,10 +18,11 @@ //! Logic for managing groups of [`PartitionedFile`]s in DataFusion use crate::{FileRange, PartitionedFile}; -use datafusion_common::Statistics; +use datafusion_common::{ScalarValue, Statistics}; use itertools::Itertools; +use log::debug; use std::cmp::{min, Ordering}; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashMap}; use std::iter::repeat_with; use std::mem; use std::ops::{Deref, DerefMut, Index, IndexMut}; @@ -436,6 +437,68 @@ impl FileGroup { self.statistics.as_mut().map(Arc::make_mut) } + /// Split this file group so that each resulting group contains files that + /// share the same partition column values. If any file is missing + /// partition values, this method falls back to returning a single group + /// containing all files. + pub fn split_by_partition_values(self) -> Vec { + if self.is_empty() { + return vec![]; + } + + let mut groups: HashMap, Vec> = HashMap::new(); + let mut has_unpartitioned = false; + + for file in self.files { + if file.partition_values.is_empty() { + debug!( + "File {:?} has no partition values, will fall back to single group", + file.path(), + ); + has_unpartitioned = true; + groups.entry(vec![]).or_default().push(file); + } else { + groups + .entry(file.partition_values.clone()) + .or_default() + .push(file); + } + } + + if has_unpartitioned && groups.len() > 1 { + debug!( + "Mixed partitioned/non-partitioned files detected, falling back to single file group" + ); + let all_files = groups.into_values().flatten().collect(); + return vec![FileGroup::new(all_files)]; + } + + let mut entries: Vec<_> = groups.into_iter().collect(); + entries.sort_by(|(left, _), (right, _)| compare_partition_keys(left, right)); + entries + .into_iter() + .map(|(_, files)| FileGroup::new(files)) + .collect() + } + + /// Returns the unique set of partition values represented by this group, + /// if and only if all files share the same values. + pub fn unique_partition_values(&self) -> Option<&[ScalarValue]> { + let first = self.files.first()?; + if first.partition_values.is_empty() { + return None; + } + + if self.files[1..] + .iter() + .all(|file| file.partition_values == first.partition_values) + { + Some(&first.partition_values) + } else { + None + } + } + /// Partition the list of files into `n` groups pub fn split_files(mut self, n: usize) -> Vec { if self.is_empty() { @@ -503,6 +566,23 @@ impl Default for FileGroup { } } +fn compare_partition_keys(left: &[ScalarValue], right: &[ScalarValue]) -> Ordering { + for (l, r) in left.iter().zip(right.iter()) { + match l.partial_cmp(r) { + Some(Ordering::Less) => return Ordering::Less, + Some(Ordering::Greater) => return Ordering::Greater, + Some(Ordering::Equal) => continue, + None => { + let ord = l.to_string().cmp(&r.to_string()); + if ord != Ordering::Equal { + return ord; + } + } + } + } + left.len().cmp(&right.len()) +} + /// Tracks how a individual file will be repartitioned #[derive(Debug, Clone)] struct ToRepartition { @@ -1013,4 +1093,127 @@ mod test { assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort); repartitioned } + + #[test] + fn test_split_by_partition_values() { + use datafusion_common::ScalarValue; + + // Helper to create file with partition values + fn pfile_with_parts( + path: &str, + size: u64, + values: Vec, + ) -> PartitionedFile { + let mut file = pfile(path, size); + file.partition_values = values; + file + } + + // Case 1: Empty group returns empty vec + let empty_group = FileGroup::new(vec![]); + assert_eq!(empty_group.split_by_partition_values().len(), 0); + + // Case 2: Single partition with multiple files + let file1 = pfile_with_parts("a", 100, vec![ScalarValue::Int32(Some(1))]); + let file2 = pfile_with_parts("b", 100, vec![ScalarValue::Int32(Some(1))]); + let file3 = pfile_with_parts("c", 100, vec![ScalarValue::Int32(Some(1))]); + let single_partition = FileGroup::new(vec![file1, file2, file3]); + let result = single_partition.split_by_partition_values(); + assert_eq!(result.len(), 1); + assert_eq!(result[0].len(), 3); + + // Case 3: Multiple partitions, verify sorting + let file_p3 = pfile_with_parts("p3", 100, vec![ScalarValue::Int32(Some(3))]); + let file_p1 = pfile_with_parts("p1", 100, vec![ScalarValue::Int32(Some(1))]); + let file_p2 = pfile_with_parts("p2", 100, vec![ScalarValue::Int32(Some(2))]); + let multi_partition = FileGroup::new(vec![file_p3, file_p1, file_p2]); + let result = multi_partition.split_by_partition_values(); + assert_eq!(result.len(), 3); + // Verify sorted order + assert_eq!( + result[0].files()[0].partition_values, + vec![ScalarValue::Int32(Some(1))] + ); + assert_eq!( + result[1].files()[0].partition_values, + vec![ScalarValue::Int32(Some(2))] + ); + assert_eq!( + result[2].files()[0].partition_values, + vec![ScalarValue::Int32(Some(3))] + ); + + // Case 4: Multi-column partition keys + let file_1a = pfile_with_parts( + "1a", + 100, + vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Utf8(Some("a".to_string())), + ], + ); + let file_1b = pfile_with_parts( + "1b", + 100, + vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Utf8(Some("b".to_string())), + ], + ); + let file_2a = pfile_with_parts( + "2a", + 100, + vec![ + ScalarValue::Int32(Some(2)), + ScalarValue::Utf8(Some("a".to_string())), + ], + ); + let multi_col = FileGroup::new(vec![file_2a, file_1a, file_1b]); + let result = multi_col.split_by_partition_values(); + assert_eq!(result.len(), 3); + // Verify lexicographic ordering + assert_eq!( + result[0].files()[0].partition_values, + vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Utf8(Some("a".to_string())) + ] + ); + assert_eq!( + result[1].files()[0].partition_values, + vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Utf8(Some("b".to_string())) + ] + ); + + // Case 5: Mixed partitioned/unpartitioned files - should fallback + let partitioned = pfile_with_parts("p", 100, vec![ScalarValue::Int32(Some(1))]); + let unpartitioned = pfile("u", 100); // no partition values + let mixed = FileGroup::new(vec![partitioned, unpartitioned]); + let result = mixed.split_by_partition_values(); + // Should fallback to single group containing all files + assert_eq!(result.len(), 1); + assert_eq!(result[0].len(), 2); + + // Case 6: unique_partition_values - all same + let file1 = pfile_with_parts("a", 100, vec![ScalarValue::Int32(Some(42))]); + let file2 = pfile_with_parts("b", 100, vec![ScalarValue::Int32(Some(42))]); + let group = FileGroup::new(vec![file1, file2]); + assert_eq!( + group.unique_partition_values(), + Some(&[ScalarValue::Int32(Some(42))][..]) + ); + + // Case 7: unique_partition_values - different values + let file1 = pfile_with_parts("a", 100, vec![ScalarValue::Int32(Some(1))]); + let file2 = pfile_with_parts("b", 100, vec![ScalarValue::Int32(Some(2))]); + let group = FileGroup::new(vec![file1, file2]); + assert_eq!(group.unique_partition_values(), None); + + // Case 8: unique_partition_values - no partition values + let file1 = pfile("a", 100); + let group = FileGroup::new(vec![file1]); + assert_eq!(group.unique_partition_values(), None); + } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4387996a2981..3a850b8178fe 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -47,7 +47,9 @@ use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; -use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{ + split_conjunction, AcrossPartitions, ConstExpr, EquivalenceProperties, Partitioning, +}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -61,8 +63,9 @@ use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, }; use std::{ - any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter, - fmt::Result as FmtResult, marker::PhantomData, sync::Arc, + any::Any, borrow::Cow, collections::HashMap, collections::HashSet, fmt::Debug, + fmt::Formatter, fmt::Result as FmtResult, marker::PhantomData, sync::Arc, + sync::OnceLock, }; use datafusion_physical_expr::equivalence::project_orderings; @@ -196,6 +199,11 @@ pub struct FileScanConfig { /// would be incorrect if there are filters being applied, thus this should be accessed /// via [`FileScanConfig::statistics`]. pub(crate) statistics: Statistics, + /// Preserve partition column value boundaries when forming file groups. + pub preserve_partition_values: bool, + /// Cached result of key_partition_exprs computation to avoid repeated work + #[allow(clippy::type_complexity)] + key_partition_exprs_cache: OnceLock>>>, } /// A builder for [`FileScanConfig`]'s. @@ -265,6 +273,7 @@ pub struct FileScanConfigBuilder { new_lines_in_values: Option, batch_size: Option, expr_adapter_factory: Option>, + preserve_partition_values: bool, } impl FileScanConfigBuilder { @@ -291,6 +300,7 @@ impl FileScanConfigBuilder { constraints: None, batch_size: None, expr_adapter_factory: None, + preserve_partition_values: false, } } @@ -419,6 +429,12 @@ impl FileScanConfigBuilder { self } + /// Preserve partition boundaries when forming execution groups. + pub fn with_preserve_partition_values(mut self, preserve: bool) -> Self { + self.preserve_partition_values = preserve; + self + } + /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -437,6 +453,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + preserve_partition_values, } = self; let constraints = constraints.unwrap_or_default(); @@ -470,6 +487,8 @@ impl FileScanConfigBuilder { batch_size, expr_adapter_factory: expr_adapter, statistics, + preserve_partition_values, + key_partition_exprs_cache: OnceLock::new(), } } } @@ -491,6 +510,7 @@ impl From for FileScanConfigBuilder { constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, + preserve_partition_values: config.preserve_partition_values, } } } @@ -563,6 +583,9 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { + if self.preserve_partition_values { + return Ok(None); + } let source = self.file_source.repartitioned( target_partitions, repartition_file_min_size, @@ -574,7 +597,11 @@ impl DataSource for FileScanConfig { } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.file_groups.len()) + if let Some(exprs) = self.key_partition_exprs() { + Partitioning::KeyPartitioned(exprs, self.file_groups.len()) + } else { + Partitioning::UnknownPartitioning(self.file_groups.len()) + } } fn eq_properties(&self) -> EquivalenceProperties { @@ -595,6 +622,14 @@ impl DataSource for FileScanConfig { } } } + if let Some(exprs) = self.key_partition_exprs() { + let const_exprs = exprs + .into_iter() + .map(|expr| ConstExpr::new(expr, AcrossPartitions::Heterogeneous)); + if let Err(e) = eq_properties.add_constants(const_exprs) { + warn!("Failed to add partition constants: {e}"); + } + } eq_properties } @@ -731,6 +766,61 @@ impl FileScanConfig { self.file_source.table_schema().table_partition_cols() } + fn key_partition_exprs(&self) -> Option>> { + if !self.preserve_partition_values { + return None; + } + let result = self + .key_partition_exprs_cache + .get_or_init(|| self.compute_key_partition_exprs()) + .clone(); + + if result.is_some() { + debug!( + "KeyPartitioned optimization enabled for {} file groups", + self.file_groups.len() + ); + } + result + } + + fn compute_key_partition_exprs(&self) -> Option>> { + if self.file_groups.is_empty() { + return None; + } + let partition_fields = self.table_partition_cols(); + if partition_fields.is_empty() { + return None; + } + let mut seen = HashSet::with_capacity(self.file_groups.len()); + for group in &self.file_groups { + let values = group.unique_partition_values()?; + if !seen.insert(values) { + return None; + } + } + let schema = self.file_source.table_schema().table_schema(); + let mut exprs = Vec::with_capacity(partition_fields.len()); + for field in partition_fields { + match Column::new_with_schema(field.name(), schema) { + Ok(column) => exprs.push(Arc::new(column) as Arc), + Err(e) => { + debug!( + "Partition column '{}' not found in schema, disabling KeyPartitioned optimization: {}", + field.name(), + e + ); + return None; + } + } + } + if exprs.is_empty() { + None + } else { + Some(exprs) + } + } + fn projection_indices(&self) -> Vec { match &self.projection_exprs { Some(proj) => proj.ordered_column_indices(), diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index d6b2b1b046f7..53b7ee3f8e10 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -18,8 +18,9 @@ //! [`Partitioning`] and [`Distribution`] for `ExecutionPlans` use crate::{ - equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal, - EquivalenceProperties, PhysicalExpr, + equivalence::ProjectionMapping, + expressions::{CastExpr, Column, TryCastExpr, UnKnownColumn}, + physical_exprs_equal, EquivalenceProperties, PhysicalExpr, }; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; use std::fmt; @@ -117,6 +118,9 @@ pub enum Partitioning { /// Allocate rows based on a hash of one of more expressions and the specified number of /// partitions Hash(Vec>, usize), + /// Partitions that are already organized by disjoint key values for the provided expressions. + /// Rows that have the same values for these expressions are guaranteed to be in the same partition. + KeyPartitioned(Vec>, usize), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), } @@ -133,6 +137,14 @@ impl Display for Partitioning { .join(", "); write!(f, "Hash([{phy_exprs_str}], {size})") } + Partitioning::KeyPartitioned(phy_exprs, size) => { + let phy_exprs_str = phy_exprs + .iter() + .map(|e| format!("{e}")) + .collect::>() + .join(", "); + write!(f, "KeyPartitioned([{phy_exprs_str}], {size})") + } Partitioning::UnknownPartitioning(size) => { write!(f, "UnknownPartitioning({size})") } @@ -144,7 +156,10 @@ impl Partitioning { pub fn partition_count(&self) -> usize { use Partitioning::*; match self { - RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, + RoundRobinBatch(n) + | Hash(_, n) + | KeyPartitioned(_, n) + | UnknownPartitioning(n) => *n, } } @@ -160,38 +175,43 @@ impl Partitioning { Distribution::SinglePartition if self.partition_count() == 1 => true, // When partition count is 1, hash requirement is satisfied. Distribution::HashPartitioned(_) if self.partition_count() == 1 => true, - Distribution::HashPartitioned(required_exprs) => { - match self { - // Here we do not check the partition count for hash partitioning and assumes the partition count - // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, - // then we need to have the partition count and hash functions validation. - Partitioning::Hash(partition_exprs, _) => { - let fast_match = - physical_exprs_equal(required_exprs, partition_exprs); - // If the required exprs do not match, need to leverage the eq_properties provided by the child - // and normalize both exprs based on the equivalent groups. - if !fast_match { - let eq_groups = eq_properties.eq_group(); - if !eq_groups.is_empty() { - let normalized_required_exprs = required_exprs - .iter() - .map(|e| eq_groups.normalize_expr(Arc::clone(e))) - .collect::>(); - let normalized_partition_exprs = partition_exprs - .iter() - .map(|e| eq_groups.normalize_expr(Arc::clone(e))) - .collect::>(); - return physical_exprs_equal( - &normalized_required_exprs, - &normalized_partition_exprs, - ); - } + Distribution::HashPartitioned(required_exprs) => match self { + // Here we do not check the partition count for hash partitioning and assumes the partition count + // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, + // then we need to have the partition count and hash functions validation. + Partitioning::Hash(partition_exprs, _) => { + let fast_match = + physical_exprs_equal(required_exprs, partition_exprs); + if !fast_match { + let eq_groups = eq_properties.eq_group(); + if !eq_groups.is_empty() { + let normalized_required_exprs = required_exprs + .iter() + .map(|e| eq_groups.normalize_expr(Arc::clone(e))) + .collect::>(); + let normalized_partition_exprs = partition_exprs + .iter() + .map(|e| eq_groups.normalize_expr(Arc::clone(e))) + .collect::>(); + return physical_exprs_equal( + &normalized_required_exprs, + &normalized_partition_exprs, + ); } - fast_match } - _ => false, + fast_match } - } + Partitioning::KeyPartitioned(partition_exprs, _) => { + partition_exprs.iter().all(|partition_expr| { + exprs_match_with_equivalence( + partition_expr, + required_exprs, + eq_properties, + ) + }) + } + _ => false, + }, _ => false, } } @@ -202,23 +222,76 @@ impl Partitioning { mapping: &ProjectionMapping, input_eq_properties: &EquivalenceProperties, ) -> Self { - if let Partitioning::Hash(exprs, part) = self { - let normalized_exprs = input_eq_properties - .project_expressions(exprs, mapping) - .zip(exprs) - .map(|(proj_expr, expr)| { - proj_expr.unwrap_or_else(|| { - Arc::new(UnKnownColumn::new(&expr.to_string())) + match self { + Partitioning::Hash(exprs, part) + | Partitioning::KeyPartitioned(exprs, part) => { + let normalized_exprs = input_eq_properties + .project_expressions(exprs, mapping) + .zip(exprs) + .map(|(proj_expr, expr)| { + proj_expr.unwrap_or_else(|| { + Arc::new(UnKnownColumn::new(&expr.to_string())) + }) }) - }) - .collect(); - Partitioning::Hash(normalized_exprs, *part) - } else { - self.clone() + .collect(); + match self { + Partitioning::Hash(..) => Partitioning::Hash(normalized_exprs, *part), + Partitioning::KeyPartitioned(..) => { + Partitioning::KeyPartitioned(normalized_exprs, *part) + } + _ => unreachable!(), + } + } + _ => self.clone(), } } } +/// Check if an expression matches any expression in a list, using equivalence properties. +fn exprs_match_with_equivalence( + expr: &Arc, + candidates: &[Arc], + eq_properties: &EquivalenceProperties, +) -> bool { + candidates.iter().any(|candidate| { + if physical_exprs_equal(&[Arc::clone(candidate)], &[Arc::clone(expr)]) { + return true; + } + + let eq_groups = eq_properties.eq_group(); + if !eq_groups.is_empty() { + let normalized_candidate = eq_groups.normalize_expr(Arc::clone(candidate)); + let normalized_expr = eq_groups.normalize_expr(Arc::clone(expr)); + if physical_exprs_equal(&[normalized_candidate], &[normalized_expr]) { + return true; + } + } + + matches!( + (normalize_partition_label(candidate), normalize_partition_label(expr)), + (Some(candidate_label), Some(expr_label)) if candidate_label == expr_label + ) + }) +} + +fn normalize_partition_label(expr: &Arc) -> Option { + if let Some(column) = expr.as_any().downcast_ref::() { + Some(column.name().to_string()) + } else if let Some(unknown) = expr.as_any().downcast_ref::() { + Some(strip_projection_suffix(unknown.name())) + } else if let Some(cast) = expr.as_any().downcast_ref::() { + normalize_partition_label(cast.expr()) + } else if let Some(try_cast) = expr.as_any().downcast_ref::() { + normalize_partition_label(try_cast.expr()) + } else { + None + } +} + +fn strip_projection_suffix(name: &str) -> String { + name.split('@').next().unwrap_or(name).to_string() +} + impl PartialEq for Partitioning { fn eq(&self, other: &Partitioning) -> bool { match (self, other) { @@ -231,6 +304,10 @@ impl PartialEq for Partitioning { { true } + ( + Partitioning::KeyPartitioned(exprs1, count1), + Partitioning::KeyPartitioned(exprs2, count2), + ) if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) => true, _ => false, } } @@ -247,6 +324,9 @@ pub enum Distribution { /// Requires children to be distributed in such a way that the same /// values of the keys end up in the same partition HashPartitioned(Vec>), + /// Requires children to be distributed such that each key (or key range) + /// is guaranteed to be contained within a single partition. + KeyPartitioned(Vec>), } impl Distribution { @@ -260,6 +340,9 @@ impl Distribution { Distribution::HashPartitioned(expr) => { Partitioning::Hash(expr, partition_count) } + Distribution::KeyPartitioned(expr) => { + Partitioning::KeyPartitioned(expr, partition_count) + } } } } @@ -272,6 +355,9 @@ impl Display for Distribution { Distribution::HashPartitioned(exprs) => { write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs)) } + Distribution::KeyPartitioned(exprs) => { + write!(f, "KeyPartitioned[{}])", format_physical_expr_list(exprs)) + } } } } @@ -331,7 +417,7 @@ mod tests { Distribution::SinglePartition => { assert_eq!(result, (true, false, false, false, false)) } - Distribution::HashPartitioned(_) => { + Distribution::HashPartitioned(_) | Distribution::KeyPartitioned(_) => { assert_eq!(result, (true, false, false, true, false)) } } @@ -339,4 +425,153 @@ mod tests { Ok(()) } + + #[test] + fn test_key_partitioned_satisfies() { + use crate::expressions::Column; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + let col_c = Arc::new(Column::new("c", 2)) as Arc; + + let eq_properties = EquivalenceProperties::new(schema); + + // Case 1: Exact match - [a] satisfies [a] + let partitioning = Partitioning::KeyPartitioned(vec![Arc::clone(&col_a)], 4); + let required = vec![Arc::clone(&col_a)]; + assert!(partitioning + .satisfy(&Distribution::HashPartitioned(required), &eq_properties)); + + // Case 2: Superset - [a] satisfies [a, b] (partition columns are subset of required) + let partitioning = Partitioning::KeyPartitioned(vec![Arc::clone(&col_a)], 4); + let required = vec![Arc::clone(&col_a), Arc::clone(&col_b)]; + assert!(partitioning + .satisfy(&Distribution::HashPartitioned(required), &eq_properties)); + + // Case 3: NOT satisfied - [a, b] does not satisfy [a] + let partitioning = + Partitioning::KeyPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4); + let required = vec![Arc::clone(&col_a)]; + assert!(!partitioning + .satisfy(&Distribution::HashPartitioned(required), &eq_properties)); + + // Case 4: Multi-column exact match + let partitioning = + Partitioning::KeyPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4); + let required = vec![Arc::clone(&col_a), Arc::clone(&col_b)]; + assert!(partitioning + .satisfy(&Distribution::HashPartitioned(required), &eq_properties)); + + // Case 5: Multi-column superset + let partitioning = + Partitioning::KeyPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4); + let required = vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)]; + assert!(partitioning + .satisfy(&Distribution::HashPartitioned(required), &eq_properties)); + + // Case 6: Different columns - not satisfied + let partitioning = Partitioning::KeyPartitioned(vec![Arc::clone(&col_a)], 4); + let required = vec![Arc::clone(&col_b)]; + assert!(!partitioning + .satisfy(&Distribution::HashPartitioned(required), &eq_properties)); + } + + #[test] + fn test_key_partitioned_project() { + use crate::expressions::Column; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + let col_c = Arc::new(Column::new("c", 2)) as Arc; + + let eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + + // Case 1: Identity projection - no changes + let partitioning = + Partitioning::KeyPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4); + let mapping = ProjectionMapping::try_new( + vec![ + (Arc::clone(&col_a), "a".to_string()), + (Arc::clone(&col_b), "b".to_string()), + (Arc::clone(&col_c), "c".to_string()), + ], + &schema, + ) + .unwrap(); + + let projected = partitioning.project(&mapping, &eq_properties); + if let Partitioning::KeyPartitioned(exprs, count) = projected { + assert_eq!(count, 4); + assert_eq!(exprs.len(), 2); + } else { + panic!("Expected KeyPartitioned"); + } + + // Case 2: Projection drops a partition column + let partitioning = + Partitioning::KeyPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4); + let mapping = ProjectionMapping::try_new( + vec![ + (Arc::clone(&col_a), "a".to_string()), + (Arc::clone(&col_c), "c".to_string()), + ], + &schema, + ) + .unwrap(); + + let projected = partitioning.project(&mapping, &eq_properties); + if let Partitioning::KeyPartitioned(exprs, count) = projected { + assert_eq!(count, 4); + assert_eq!(exprs.len(), 2); + // First expression should be projected, second becomes UnknownColumn + assert!(exprs[0].as_any().downcast_ref::().is_some()); + } else { + panic!("Expected KeyPartitioned"); + } + + // Case 3: Projection reorders columns + let output_schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + + let partitioning = + Partitioning::KeyPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4); + let col_c_new = Arc::new(Column::new("c", 0)) as Arc; + let col_b_new = Arc::new(Column::new("b", 1)) as Arc; + let col_a_new = Arc::new(Column::new("a", 2)) as Arc; + + let mapping = ProjectionMapping::try_new( + vec![ + (col_c_new, "c".to_string()), + (col_b_new, "b".to_string()), + (col_a_new, "a".to_string()), + ], + &output_schema, + ) + .unwrap(); + + let projected = partitioning.project(&mapping, &eq_properties); + if let Partitioning::KeyPartitioned(exprs, count) = projected { + assert_eq!(count, 4); + assert_eq!(exprs.len(), 2); + } else { + panic!("Expected KeyPartitioned"); + } + } } diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 4464c12ca7bf..b37cf400ed5c 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1070,6 +1070,9 @@ struct RepartitionRequirementStatus { roundrobin_beneficial_stats: bool, /// Designates whether hash partitioning is necessary. hash_necessary: bool, + /// Designates whether the input is already KeyPartitioned on the required keys, + /// satisfying the HashPartitioned requirement without repartitioning. + key_partitioned_satisfies_hash: bool, } /// Calculates the `RepartitionRequirementStatus` for each children to generate @@ -1125,36 +1128,44 @@ fn get_repartition_requirement_status( Precision::Absent => true, }; let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); - // Hash re-partitioning is necessary when the input has more than one - // partitions: let multi_partitions = child.output_partitioning().partition_count() > 1; + + let key_partitioned_satisfies_hash = matches!( + child.output_partitioning(), + Partitioning::KeyPartitioned(..) + ) && child + .output_partitioning() + .satisfy(&requirement, child.equivalence_properties()); + let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats; - needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible); + needs_alignment |= is_hash + && (multi_partitions || roundrobin_sensible) + && !key_partitioned_satisfies_hash; repartition_status_flags.push(( is_hash, + key_partitioned_satisfies_hash, RepartitionRequirementStatus { requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary: is_hash && multi_partitions, + hash_necessary: is_hash + && multi_partitions + && !key_partitioned_satisfies_hash, + key_partitioned_satisfies_hash, }, )); } - // Align hash necessary flags for hash partitions to generate consistent - // hash partitions at each children: + // Align hash partitioning requirements across children, skipping KeyPartitioned inputs if needs_alignment { - // When there is at least one hash requirement that is necessary or - // beneficial according to statistics, make all children require hash - // repartitioning: - for (is_hash, status) in &mut repartition_status_flags { - if *is_hash { + for (is_hash, key_satisfies, status) in &mut repartition_status_flags { + if *is_hash && !*key_satisfies { status.hash_necessary = true; } } } Ok(repartition_status_flags .into_iter() - .map(|(_, status)| status) + .map(|(_, _, status)| status) .collect()) } @@ -1244,14 +1255,14 @@ pub fn ensure_distribution( roundrobin_beneficial, roundrobin_beneficial_stats, hash_necessary, + key_partitioned_satisfies_hash, }, )| { let add_roundrobin = enable_round_robin - // Operator benefits from partitioning (e.g. filter): && roundrobin_beneficial && roundrobin_beneficial_stats - // Unless partitioning increases the partition count, it is not beneficial: - && child.plan.output_partitioning().partition_count() < target_partitions; + && child.plan.output_partitioning().partition_count() < target_partitions + && !key_partitioned_satisfies_hash; // When `repartition_file_scans` is set, attempt to increase // parallelism at the source. @@ -1274,17 +1285,18 @@ pub fn ensure_distribution( } Distribution::HashPartitioned(exprs) => { // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background - if add_roundrobin && !hash_necessary { - // Add round-robin repartitioning on top of the operator - // to increase parallelism. - child = add_roundrobin_on_top(child, target_partitions)?; - } - // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. if hash_necessary { + // Hash repartition required to satisfy distribution requirement child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?; + } else if add_roundrobin { + // Add round-robin repartitioning to increase parallelism + child = add_roundrobin_on_top(child, target_partitions)?; } } + Distribution::KeyPartitioned(_) => { + // Nothing to do: treated as satisfied upstream + } Distribution::UnspecifiedDistribution => { if add_roundrobin { // Add round-robin repartitioning on top of the operator @@ -1333,6 +1345,9 @@ pub fn ensure_distribution( // Since there is no ordering requirement, preserving ordering is pointless child = replace_order_preserving_variants(child)?; } + Distribution::KeyPartitioned(_) => { + child = replace_order_preserving_variants(child)?; + } Distribution::UnspecifiedDistribution => { // Since ordering is lost, trying to preserve ordering is pointless if !maintains || plan.as_any().is::() { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 843d975c7d76..20666de14426 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1099,6 +1099,7 @@ impl ExecutionPlan for RepartitionExec { new_properties.partitioning = match new_properties.partitioning { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), + KeyPartitioned(_, _) => UnknownPartitioning(target_partitions), UnknownPartitioning(_) => UnknownPartitioning(target_partitions), }; Ok(Some(Arc::new(Self { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 34b6d5108f9e..e3996aba5e76 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -1469,6 +1469,7 @@ mod tests { let &partition_count = match properties.output_partitioning() { Partitioning::RoundRobinBatch(partitions) => partitions, Partitioning::Hash(_, partitions) => partitions, + Partitioning::KeyPartitioned(_, partitions) => partitions, Partitioning::UnknownPartitioning(partitions) => partitions, }; let source = CongestedExec { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f9400d14a59c..379ec03414b8 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1024,6 +1024,7 @@ message FileScanExecConf { datafusion_common.Constraints constraints = 11; optional uint64 batch_size = 12; + bool preserve_partition_values = 13; } message ParquetScanExecNode { @@ -1265,6 +1266,11 @@ message PhysicalHashRepartition { uint64 partition_count = 2; } +message PhysicalKeyPartition { + repeated PhysicalExprNode key_expr = 1; + uint64 partition_count = 2; +} + message RepartitionExecNode{ PhysicalPlanNode input = 1; // oneof partition_method { @@ -1280,6 +1286,7 @@ message Partitioning { uint64 round_robin = 1; PhysicalHashRepartition hash = 2; uint64 unknown = 3; + PhysicalKeyPartition key = 4; } } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 4cf834d0601e..1a4465a1c663 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5949,6 +5949,9 @@ impl serde::Serialize for FileScanExecConf { if self.batch_size.is_some() { len += 1; } + if self.preserve_partition_values { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -5982,6 +5985,9 @@ impl serde::Serialize for FileScanExecConf { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("batchSize", ToString::to_string(&v).as_str())?; } + if self.preserve_partition_values { + struct_ser.serialize_field("preservePartitionValues", &self.preserve_partition_values)?; + } struct_ser.end() } } @@ -6007,6 +6013,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "constraints", "batch_size", "batchSize", + "preserve_partition_values", + "preservePartitionValues", ]; #[allow(clippy::enum_variant_names)] @@ -6021,6 +6029,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { OutputOrdering, Constraints, BatchSize, + PreservePartitionValues, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6052,6 +6061,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "outputOrdering" | "output_ordering" => Ok(GeneratedField::OutputOrdering), "constraints" => Ok(GeneratedField::Constraints), "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), + "preservePartitionValues" | "preserve_partition_values" => Ok(GeneratedField::PreservePartitionValues), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6081,6 +6091,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut output_ordering__ = None; let mut constraints__ = None; let mut batch_size__ = None; + let mut preserve_partition_values__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -6148,6 +6159,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::PreservePartitionValues => { + if preserve_partition_values__.is_some() { + return Err(serde::de::Error::duplicate_field("preservePartitionValues")); + } + preserve_partition_values__ = Some(map_.next_value()?); + } } } Ok(FileScanExecConf { @@ -6161,6 +6178,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { output_ordering: output_ordering__.unwrap_or_default(), constraints: constraints__, batch_size: batch_size__, + preserve_partition_values: preserve_partition_values__.unwrap_or_default(), }) } } @@ -14695,6 +14713,9 @@ impl serde::Serialize for Partitioning { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("unknown", ToString::to_string(&v).as_str())?; } + partitioning::PartitionMethod::Key(v) => { + struct_ser.serialize_field("key", v)?; + } } } struct_ser.end() @@ -14711,6 +14732,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { "roundRobin", "hash", "unknown", + "key", ]; #[allow(clippy::enum_variant_names)] @@ -14718,6 +14740,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { RoundRobin, Hash, Unknown, + Key, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -14742,6 +14765,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { "roundRobin" | "round_robin" => Ok(GeneratedField::RoundRobin), "hash" => Ok(GeneratedField::Hash), "unknown" => Ok(GeneratedField::Unknown), + "key" => Ok(GeneratedField::Key), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -14783,6 +14807,13 @@ impl<'de> serde::Deserialize<'de> for Partitioning { } partition_method__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| partitioning::PartitionMethod::Unknown(x.0)); } + GeneratedField::Key => { + if partition_method__.is_some() { + return Err(serde::de::Error::duplicate_field("key")); + } + partition_method__ = map_.next_value::<::std::option::Option<_>>()?.map(partitioning::PartitionMethod::Key) +; + } } } Ok(Partitioning { @@ -16677,6 +16708,120 @@ impl<'de> serde::Deserialize<'de> for PhysicalIsNull { deserializer.deserialize_struct("datafusion.PhysicalIsNull", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalKeyPartition { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.key_expr.is_empty() { + len += 1; + } + if self.partition_count != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalKeyPartition", len)?; + if !self.key_expr.is_empty() { + struct_ser.serialize_field("keyExpr", &self.key_expr)?; + } + if self.partition_count != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("partitionCount", ToString::to_string(&self.partition_count).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalKeyPartition { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "key_expr", + "keyExpr", + "partition_count", + "partitionCount", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + KeyExpr, + PartitionCount, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "keyExpr" | "key_expr" => Ok(GeneratedField::KeyExpr), + "partitionCount" | "partition_count" => Ok(GeneratedField::PartitionCount), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalKeyPartition; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalKeyPartition") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut key_expr__ = None; + let mut partition_count__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::KeyExpr => { + if key_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("keyExpr")); + } + key_expr__ = Some(map_.next_value()?); + } + GeneratedField::PartitionCount => { + if partition_count__.is_some() { + return Err(serde::de::Error::duplicate_field("partitionCount")); + } + partition_count__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(PhysicalKeyPartition { + key_expr: key_expr__.unwrap_or_default(), + partition_count: partition_count__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalKeyPartition", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalLikeExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 12b417627411..99ccba1bf78f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1563,6 +1563,8 @@ pub struct FileScanExecConf { pub constraints: ::core::option::Option, #[prost(uint64, optional, tag = "12")] pub batch_size: ::core::option::Option, + #[prost(bool, tag = "13")] + pub preserve_partition_values: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { @@ -1901,6 +1903,13 @@ pub struct PhysicalHashRepartition { pub partition_count: u64, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalKeyPartition { + #[prost(message, repeated, tag = "1")] + pub key_expr: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "2")] + pub partition_count: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct RepartitionExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, @@ -1914,7 +1923,7 @@ pub struct RepartitionExecNode { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Partitioning { - #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3")] + #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3, 4")] pub partition_method: ::core::option::Option, } /// Nested message and enum types in `Partitioning`. @@ -1927,6 +1936,8 @@ pub mod partitioning { Hash(super::PhysicalHashRepartition), #[prost(uint64, tag = "3")] Unknown(u64), + #[prost(message, tag = "4")] + Key(super::PhysicalKeyPartition), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index f1a9abe6ea7b..19ae208eaaf8 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -443,6 +443,25 @@ pub fn parse_protobuf_hash_partitioning( } } +pub fn parse_protobuf_key_partitioning( + partitioning: Option<&protobuf::PhysicalKeyPartition>, + ctx: &TaskContext, + input_schema: &Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result> { + match partitioning { + Some(key_part) => { + let expr = + parse_physical_exprs(&key_part.key_expr, ctx, input_schema, codec)?; + Ok(Some(Partitioning::KeyPartitioned( + expr, + key_part.partition_count.try_into().unwrap(), + ))) + } + None => Ok(None), + } +} + pub fn parse_protobuf_partitioning( partitioning: Option<&protobuf::Partitioning>, ctx: &TaskContext, @@ -464,6 +483,14 @@ pub fn parse_protobuf_partitioning( codec, ) } + Some(protobuf::partitioning::PartitionMethod::Key(key_partition)) => { + parse_protobuf_key_partitioning( + Some(key_partition), + ctx, + input_schema, + codec, + ) + } Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => { Ok(Some(Partitioning::UnknownPartitioning( *partition_count as usize, @@ -558,6 +585,7 @@ pub fn parse_protobuf_file_scan_config( .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) .with_batch_size(proto.batch_size.map(|s| s as usize)) + .with_preserve_partition_values(proto.preserve_partition_values) .build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 1ae85618b92a..9cc67b610601 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -428,6 +428,17 @@ pub fn serialize_partitioning( )), } } + Partitioning::KeyPartitioned(exprs, partition_count) => { + let serialized_exprs = serialize_physical_exprs(exprs, codec)?; + protobuf::Partitioning { + partition_method: Some(protobuf::partitioning::PartitionMethod::Key( + protobuf::PhysicalKeyPartition { + key_expr: serialized_exprs, + partition_count: *partition_count as u64, + }, + )), + } + } Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning { partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown( *partition_count as u64, @@ -555,6 +566,7 @@ pub fn serialize_file_scan_config( .collect::>(), constraints: Some(conf.constraints.clone().into()), batch_size: conf.batch_size.map(|s| s as u64), + preserve_partition_values: conf.preserve_partition_values, }) } diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index 088c61047ea8..4db959d37f84 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -44,13 +44,11 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +02)--AggregateExec: mode=Final, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +03)----SortPreservingMergeExec: [a@0 ASC NULLS LAST] +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT @@ -64,13 +62,11 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +02)--AggregateExec: mode=Final, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +03)----SortPreservingMergeExec: [a@0 ASC NULLS LAST] +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result @@ -83,13 +79,11 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +02)--AggregateExec: mode=Final, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +03)----SortPreservingMergeExec: [a@0 ASC NULLS LAST] +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a1b868b0b028..4105b856844a 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7435,21 +7435,20 @@ logical_plan 15)------------EmptyRelation: rows=1 physical_plan 01)ProjectionExec: expr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST]@1 as last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))@2 as sum(DISTINCT Int64(1))] -02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5 -05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted -06)----------UnionExec -07)------------ProjectionExec: expr=[1 as id, 2 as foo] -08)--------------PlaceholderRowExec -09)------------ProjectionExec: expr=[1 as id, 4 as foo] -10)--------------PlaceholderRowExec -11)------------ProjectionExec: expr=[1 as id, 5 as foo] -12)--------------PlaceholderRowExec -13)------------ProjectionExec: expr=[1 as id, 3 as foo] -14)--------------PlaceholderRowExec -15)------------ProjectionExec: expr=[1 as id, 2 as foo] -16)--------------PlaceholderRowExec +02)--AggregateExec: mode=Final, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted +05)--------UnionExec +06)----------ProjectionExec: expr=[1 as id, 2 as foo] +07)------------PlaceholderRowExec +08)----------ProjectionExec: expr=[1 as id, 4 as foo] +09)------------PlaceholderRowExec +10)----------ProjectionExec: expr=[1 as id, 5 as foo] +11)------------PlaceholderRowExec +12)----------ProjectionExec: expr=[1 as id, 3 as foo] +13)------------PlaceholderRowExec +14)----------ProjectionExec: expr=[1 as id, 2 as foo] +15)------------PlaceholderRowExec # SortExec is removed if it is coming after one-row producing AggregateExec's having an empty group by expression query TT diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt b/datafusion/sqllogictest/test_files/distinct_on.slt index b4a491619e89..0279feef8f52 100644 --- a/datafusion/sqllogictest/test_files/distinct_on.slt +++ b/datafusion/sqllogictest/test_files/distinct_on.slt @@ -95,14 +95,12 @@ logical_plan 04)------TableScan: aggregate_test_100 projection=[c1, c2, c3] physical_plan 01)ProjectionExec: expr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as c3, first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2] -02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST] -03)----SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]] -05)--------CoalesceBatchesExec: target_batch_size=8192 -06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true +02)--SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false] +03)----AggregateExec: mode=Final, gby=[c1@0 as c1], aggr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]] +04)------CoalescePartitionsExec +05)--------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true # ON expressions are not a sub-set of the ORDER BY expressions query error SELECT DISTINCT ON expressions must match initial ORDER BY expressions diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index ddf081689e9e..ee217b45b682 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2013,19 +2013,17 @@ logical_plan 07)--------SubqueryAlias: r 08)----------TableScan: tab0 projection=[col0, col1] physical_plan -01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST] -02)--SortExec: expr=[col0@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[col0@0 as col0, last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] -04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]] -05)--------CoalesceBatchesExec: target_batch_size=8192 -06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] -10)------------------CoalesceBatchesExec: target_batch_size=8192 -11)--------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(col0@0, col0@0)] -12)----------------------DataSourceExec: partitions=1, partition_sizes=[3] -13)----------------------DataSourceExec: partitions=1, partition_sizes=[3] +01)SortExec: expr=[col0@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[col0@0 as col0, last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] +03)----AggregateExec: mode=Final, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]] +04)------CoalescePartitionsExec +05)--------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] +08)--------------CoalesceBatchesExec: target_batch_size=8192 +09)----------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(col0@0, col0@0)] +10)------------------DataSourceExec: partitions=1, partition_sizes=[3] +11)------------------DataSourceExec: partitions=1, partition_sizes=[3] # Columns in the table are a,b,c,d. Source is DataSourceExec which is ordered by # a,b,c column. Column a has cardinality 2, column b has cardinality 4. @@ -2984,14 +2982,10 @@ logical_plan 03)----Aggregate: groupBy=[[sales_global.country]], aggr=[[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]] 04)------TableScan: sales_global projection=[country, ts, amount] physical_plan -01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] -02)--SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[country@0 as country, first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] -05)--------CoalesceBatchesExec: target_batch_size=8192 -06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=1 -07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] -08)--------------DataSourceExec: partitions=1, partition_sizes=[1] +01)SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[country@0 as country, first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2] +03)----AggregateExec: mode=Single, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] +04)------DataSourceExec: partitions=1, partition_sizes=[1] query TRR SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1, @@ -3019,14 +3013,10 @@ logical_plan 03)----Aggregate: groupBy=[[sales_global.country]], aggr=[[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]] 04)------TableScan: sales_global projection=[country, ts, amount] physical_plan -01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] -02)--SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[country@0 as country, first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] -05)--------CoalesceBatchesExec: target_batch_size=8192 -06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=1 -07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] -08)--------------DataSourceExec: partitions=1, partition_sizes=[1] +01)SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[country@0 as country, first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2] +03)----AggregateExec: mode=Single, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] +04)------DataSourceExec: partitions=1, partition_sizes=[1] query TRR @@ -3182,16 +3172,14 @@ logical_plan 03)----Aggregate: groupBy=[[sales_global.country]], aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]] 04)------TableScan: sales_global projection=[country, amount] physical_plan -01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] -02)--SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] -05)--------CoalesceBatchesExec: target_batch_size=4 -06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] -08)--------------SortExec: expr=[amount@1 ASC NULLS LAST], preserve_partitioning=[true] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -10)------------------DataSourceExec: partitions=1, partition_sizes=[1] +01)SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1] +03)----AggregateExec: mode=Final, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] +04)------CoalescePartitionsExec +05)--------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] +06)----------SortExec: expr=[amount@1 ASC NULLS LAST], preserve_partitioning=[true] +07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +08)--------------DataSourceExec: partitions=1, partition_sizes=[1] query T? SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1 @@ -3218,16 +3206,14 @@ logical_plan 03)----Aggregate: groupBy=[[sales_global.country]], aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]] 04)------TableScan: sales_global projection=[country, amount] physical_plan -01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] -02)--SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] -05)--------CoalesceBatchesExec: target_batch_size=4 -06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] -08)--------------SortExec: expr=[amount@1 DESC], preserve_partitioning=[true] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -10)------------------DataSourceExec: partitions=1, partition_sizes=[1] +01)SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] +03)----AggregateExec: mode=Final, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] +04)------CoalescePartitionsExec +05)--------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] +06)----------SortExec: expr=[amount@1 DESC], preserve_partitioning=[true] +07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +08)--------------DataSourceExec: partitions=1, partition_sizes=[1] query T?RR SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, @@ -3866,12 +3852,11 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan 01)ProjectionExec: expr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, last_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] -02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], last_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]] -03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 -05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], first_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]] -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +02)--AggregateExec: mode=Final, gby=[d@0 as d], aggr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], last_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], first_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]] +05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II rowsort SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a, diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e15163cf6ec7..aaa15316b6a1 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -223,6 +223,7 @@ datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true +datafusion.execution.listing_table_preserve_partition_values false datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.max_spill_file_size_bytes 134217728 datafusion.execution.meta_fetch_concurrency 32 @@ -349,6 +350,7 @@ datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). +datafusion.execution.listing_table_preserve_partition_values false Should a `ListingTable` created through the `ListingTableFactory` keep files that share the same partition column values in the same execution partition when `PARTITIONED BY` columns are declared. Defaults to false to avoid overhead on non-partitioned tables. Optimization is best with moderate partition counts (10-100 partitions) and large partitions (>500K rows). With many small partitions (500+), task scheduling overhead may outweigh shuffle savings. datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.max_spill_file_size_bytes 134217728 Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index f217ba1bd5a0..cd85eee30cd6 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3454,20 +3454,18 @@ logical_plan 07)--------SubqueryAlias: r 08)----------TableScan: annotated_data projection=[a, b] physical_plan -01)SortPreservingMergeExec: [a@0 ASC] -02)--ProjectionExec: expr=[a@0 as a, last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]@3 as last_col1] -03)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) -04)------CoalesceBatchesExec: target_batch_size=2 -05)--------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC -06)----------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) +01)ProjectionExec: expr=[a@0 as a, last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]@3 as last_col1] +02)--AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) +03)----SortPreservingMergeExec: [a@0 ASC] +04)------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) +05)--------CoalesceBatchesExec: target_batch_size=2 +06)----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] 07)------------CoalesceBatchesExec: target_batch_size=2 -08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] -09)----------------CoalesceBatchesExec: target_batch_size=2 -10)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true -11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true -12)----------------CoalesceBatchesExec: target_batch_size=2 -13)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true -14)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], file_type=csv, has_header=true +08)--------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true +09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true +10)------------CoalesceBatchesExec: target_batch_size=2 +11)--------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true +12)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], file_type=csv, has_header=true query TT EXPLAIN SELECT * diff --git a/datafusion/sqllogictest/test_files/partitioned_aggregation.slt b/datafusion/sqllogictest/test_files/partitioned_aggregation.slt new file mode 100644 index 000000000000..915b738cd5d3 --- /dev/null +++ b/datafusion/sqllogictest/test_files/partitioned_aggregation.slt @@ -0,0 +1,382 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This test file verifies the optimization for parallel aggregation on Hive-partitioned tables +# When grouping by partition columns, the plan should use SinglePartitioned (combining Partial+Final per partition) + +statement ok +SET datafusion.execution.target_partitions = 4; + +statement ok +SET datafusion.execution.listing_table_preserve_partition_values = true; + +# Single-column partitioned table +statement ok +CREATE TABLE single_col_staging ( + part_col INT, + val INT +) AS VALUES + (1, 10), (1, 11), (1, 12), + (2, 20), (2, 21), + (3, 30), + (4, 40); + +query I +COPY (SELECT * FROM single_col_staging WHERE part_col = 1) +TO 'test_files/scratch/partitioned_aggregation/part_col=1/0.parquet' +STORED AS PARQUET; +---- +3 + +query I +COPY (SELECT * FROM single_col_staging WHERE part_col = 2) +TO 'test_files/scratch/partitioned_aggregation/part_col=2/1.parquet' +STORED AS PARQUET; +---- +2 + +query I +COPY (SELECT * FROM single_col_staging WHERE part_col = 3) +TO 'test_files/scratch/partitioned_aggregation/part_col=3/2.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT * FROM single_col_staging WHERE part_col = 4) +TO 'test_files/scratch/partitioned_aggregation/part_col=4/3.parquet' +STORED AS PARQUET; +---- +1 + +# Multi-column partitioned table +statement ok +CREATE TABLE multi_col_staging ( + year INT, + month INT, + day INT, + val INT +) AS VALUES + (2024, 1, 1, 100), (2024, 1, 2, 101), + (2024, 2, 1, 200), (2024, 2, 2, 201), + (2025, 1, 1, 300), (2025, 1, 2, 301), + (2025, 2, 1, 400); + +query I +COPY (SELECT * FROM multi_col_staging WHERE year = 2024 AND month = 1) +TO 'test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=1/data.parquet' +STORED AS PARQUET; +---- +2 + +query I +COPY (SELECT * FROM multi_col_staging WHERE year = 2024 AND month = 2) +TO 'test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=2/data.parquet' +STORED AS PARQUET; +---- +2 + +query I +COPY (SELECT * FROM multi_col_staging WHERE year = 2025 AND month = 1) +TO 'test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=1/data.parquet' +STORED AS PARQUET; +---- +2 + +query I +COPY (SELECT * FROM multi_col_staging WHERE year = 2025 AND month = 2) +TO 'test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=2/data.parquet' +STORED AS PARQUET; +---- +1 + +# Create external tables +statement ok +CREATE EXTERNAL TABLE single_col_table ( + part_col INT, + val INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/partitioned_aggregation/' +PARTITIONED BY (part_col); + +statement ok +CREATE EXTERNAL TABLE multi_col_table ( + year INT, + month INT, + day INT, + val INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/partitioned_aggregation/multi_col/' +PARTITIONED BY (year, month); + +# Test 1: Single-column partition - GROUP BY partition column +# Should use SinglePartitioned +query TT +EXPLAIN SELECT part_col, count(*) FROM single_col_table GROUP BY part_col ORDER BY part_col +---- +logical_plan +01)Sort: single_col_table.part_col ASC NULLS LAST +02)--Projection: single_col_table.part_col, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[single_col_table.part_col]], aggr=[[count(Int64(1))]] +04)------TableScan: single_col_table projection=[part_col] +physical_plan +01)SortPreservingMergeExec: [part_col@0 ASC NULLS LAST] +02)--SortExec: expr=[part_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[part_col@0 as part_col, count(Int64(1))@1 as count(*)] +04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@0 as part_col], aggr=[count(Int64(1))] +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet + +query II +SELECT part_col, count(*) FROM single_col_table GROUP BY part_col ORDER BY part_col +---- +1 3 +2 2 +3 1 +4 1 + +# Test 2: Single-column partition - GROUP BY partition + other column (superset) +# Should use SinglePartitioned +query TT +EXPLAIN SELECT part_col, val, count(*) FROM single_col_table GROUP BY part_col, val ORDER BY part_col, val +---- +logical_plan +01)Sort: single_col_table.part_col ASC NULLS LAST, single_col_table.val ASC NULLS LAST +02)--Projection: single_col_table.part_col, single_col_table.val, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[single_col_table.part_col, single_col_table.val]], aggr=[[count(Int64(1))]] +04)------TableScan: single_col_table projection=[val, part_col] +physical_plan +01)SortPreservingMergeExec: [part_col@0 ASC NULLS LAST, val@1 ASC NULLS LAST] +02)--SortExec: expr=[val@1 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[part_col@0 as part_col, val@1 as val, count(Int64(1))@2 as count(*)] +04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@1 as part_col, val@0 as val], aggr=[count(Int64(1))], ordering_mode=PartiallySorted([0]) +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[val, part_col], file_type=parquet + +# Test 3: Single-column partition - GROUP BY non-partition column +# Should use FinalPartitioned with Repartition +query TT +EXPLAIN SELECT val, count(*) FROM single_col_table GROUP BY val ORDER BY val +---- +logical_plan +01)Sort: single_col_table.val ASC NULLS LAST +02)--Projection: single_col_table.val, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[single_col_table.val]], aggr=[[count(Int64(1))]] +04)------TableScan: single_col_table projection=[val] +physical_plan +01)SortPreservingMergeExec: [val@0 ASC NULLS LAST] +02)--SortExec: expr=[val@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[val@0 as val, count(Int64(1))@1 as count(*)] +04)------AggregateExec: mode=FinalPartitioned, gby=[val@0 as val], aggr=[count(Int64(1))] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([val@0], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[val@0 as val], aggr=[count(Int64(1))] +08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[val], file_type=parquet + +# Test 4: Multi-column partition - GROUP BY all partition columns + other (superset) +# Should use SinglePartitioned +query TT +EXPLAIN SELECT year, month, day, count(*) FROM multi_col_table GROUP BY year, month, day ORDER BY year, month, day +---- +logical_plan +01)Sort: multi_col_table.year ASC NULLS LAST, multi_col_table.month ASC NULLS LAST, multi_col_table.day ASC NULLS LAST +02)--Projection: multi_col_table.year, multi_col_table.month, multi_col_table.day, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[multi_col_table.year, multi_col_table.month, multi_col_table.day]], aggr=[[count(Int64(1))]] +04)------TableScan: multi_col_table projection=[day, year, month] +physical_plan +01)SortPreservingMergeExec: [year@0 ASC NULLS LAST, month@1 ASC NULLS LAST, day@2 ASC NULLS LAST] +02)--SortExec: expr=[year@0 ASC NULLS LAST, month@1 ASC NULLS LAST, day@2 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[year@0 as year, month@1 as month, day@2 as day, count(Int64(1))@3 as count(*)] +04)------AggregateExec: mode=SinglePartitioned, gby=[year@1 as year, month@2 as month, day@0 as day], aggr=[count(Int64(1))] +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=1/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=2/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=1/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=2/data.parquet]]}, projection=[day, year, month], file_type=parquet + +query IIII +SELECT year, month, day, count(*) FROM multi_col_table GROUP BY year, month, day ORDER BY year, month, day +---- +2024 1 1 1 +2024 1 2 1 +2024 2 1 1 +2024 2 2 1 +2025 1 1 1 +2025 1 2 1 +2025 2 1 1 + +# Test 5: Multi-column partition - GROUP BY only first partition column (subset) +# Should use FinalPartitioned with Repartition +query TT +EXPLAIN SELECT year, count(*) FROM multi_col_table GROUP BY year +---- +logical_plan +01)Projection: multi_col_table.year, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[multi_col_table.year]], aggr=[[count(Int64(1))]] +03)----TableScan: multi_col_table projection=[year] +physical_plan +01)ProjectionExec: expr=[year@0 as year, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[year@0 as year], aggr=[count(Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([year@0], 4), input_partitions=4 +05)--------AggregateExec: mode=Partial, gby=[year@0 as year], aggr=[count(Int64(1))] +06)----------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=1/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=2/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=1/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=2/data.parquet]]}, projection=[year], file_type=parquet + +query II +SELECT year, count(*) FROM multi_col_table GROUP BY year ORDER BY year +---- +2024 4 +2025 3 + +# Test 6: GROUPING SETS should NOT use SinglePartitioned +query TT +EXPLAIN SELECT year, count(*) FROM multi_col_table GROUP BY GROUPING SETS ((year), (month)) +---- +logical_plan +01)Projection: multi_col_table.year, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[GROUPING SETS ((multi_col_table.year), (multi_col_table.month))]], aggr=[[count(Int64(1))]] +03)----TableScan: multi_col_table projection=[year, month] +physical_plan +01)ProjectionExec: expr=[year@0 as year, count(Int64(1))@3 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[year@0 as year, month@1 as month, __grouping_id@2 as __grouping_id], aggr=[count(Int64(1))] +03)----AggregateExec: mode=Partial, gby=[(year@0 as year, NULL as month), (NULL as year, month@1 as month)], aggr=[count(Int64(1))] +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=1/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2024/month=2/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=1/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/multi_col/year=2025/month=2/data.parquet]]}, projection=[year, month], file_type=parquet + +# Test 7: With preserve_partition_values=false, optimization should NOT apply +statement ok +SET datafusion.execution.listing_table_preserve_partition_values = false; + +statement ok +CREATE EXTERNAL TABLE single_col_table_disabled ( + part_col INT, + val INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/partitioned_aggregation/' +PARTITIONED BY (part_col); + +query TT +EXPLAIN SELECT part_col, count(*) FROM single_col_table_disabled GROUP BY part_col ORDER BY part_col +---- +logical_plan +01)Sort: single_col_table_disabled.part_col ASC NULLS LAST +02)--Projection: single_col_table_disabled.part_col, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[single_col_table_disabled.part_col]], aggr=[[count(Int64(1))]] +04)------TableScan: single_col_table_disabled projection=[part_col] +physical_plan +01)SortPreservingMergeExec: [part_col@0 ASC NULLS LAST] +02)--SortExec: expr=[part_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[part_col@0 as part_col, count(Int64(1))@1 as count(*)] +04)------AggregateExec: mode=FinalPartitioned, gby=[part_col@0 as part_col], aggr=[count(Int64(1))] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([part_col@0], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[part_col@0 as part_col], aggr=[count(Int64(1))] +08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet + +# Test 8: Verify optimization works regardless of target_partitions setting +# With target_partitions=16 (more than the 4 partition files), SinglePartitioned should still be used +statement ok +SET datafusion.execution.listing_table_preserve_partition_values = true; + +statement ok +SET datafusion.execution.target_partitions = 16; + +statement ok +CREATE EXTERNAL TABLE single_col_16_target ( + part_col INT, + val INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/partitioned_aggregation/' +PARTITIONED BY (part_col); + +query TT +EXPLAIN SELECT part_col, count(*) FROM single_col_16_target GROUP BY part_col ORDER BY part_col +---- +logical_plan +01)Sort: single_col_16_target.part_col ASC NULLS LAST +02)--Projection: single_col_16_target.part_col, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[single_col_16_target.part_col]], aggr=[[count(Int64(1))]] +04)------TableScan: single_col_16_target projection=[part_col] +physical_plan +01)SortPreservingMergeExec: [part_col@0 ASC NULLS LAST] +02)--SortExec: expr=[part_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[part_col@0 as part_col, count(Int64(1))@1 as count(*)] +04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@0 as part_col], aggr=[count(Int64(1))] +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet + +query II +SELECT part_col, count(*) FROM single_col_16_target GROUP BY part_col ORDER BY part_col +---- +1 3 +2 2 +3 1 +4 1 + +# Test 9: Verify optimization works when target_partitions < partition count +statement ok +SET datafusion.execution.target_partitions = 2; + +statement ok +CREATE EXTERNAL TABLE single_col_2_target ( + part_col INT, + val INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/partitioned_aggregation/' +PARTITIONED BY (part_col); + +query TT +EXPLAIN SELECT part_col, count(*) FROM single_col_2_target GROUP BY part_col ORDER BY part_col +---- +logical_plan +01)Sort: single_col_2_target.part_col ASC NULLS LAST +02)--Projection: single_col_2_target.part_col, count(Int64(1)) AS count(*) +03)----Aggregate: groupBy=[[single_col_2_target.part_col]], aggr=[[count(Int64(1))]] +04)------TableScan: single_col_2_target projection=[part_col] +physical_plan +01)SortPreservingMergeExec: [part_col@0 ASC NULLS LAST] +02)--SortExec: expr=[part_col@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[part_col@0 as part_col, count(Int64(1))@1 as count(*)] +04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@0 as part_col], aggr=[count(Int64(1))] +05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet + +query II +SELECT part_col, count(*) FROM single_col_2_target GROUP BY part_col ORDER BY part_col +---- +1 3 +2 2 +3 1 +4 1 + +statement ok +DROP TABLE single_col_16_target; + +statement ok +DROP TABLE single_col_2_target; + +# Cleanup +statement ok +DROP TABLE single_col_table; + +statement ok +DROP TABLE single_col_table_disabled; + +statement ok +DROP TABLE multi_col_table; + +statement ok +DROP TABLE single_col_staging; + +statement ok +DROP TABLE multi_col_staging; diff --git a/datafusion/sqllogictest/test_files/subquery_sort.slt b/datafusion/sqllogictest/test_files/subquery_sort.slt index ea7addd8e36f..b9985cf8632a 100644 --- a/datafusion/sqllogictest/test_files/subquery_sort.slt +++ b/datafusion/sqllogictest/test_files/subquery_sort.slt @@ -146,15 +146,13 @@ logical_plan 07)------------TableScan: sink_table projection=[c1, c2, c3, c9] physical_plan 01)ProjectionExec: expr=[c1@0 as c1, c2@1 as c2] -02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c3@2 DESC, c9@3 ASC NULLS LAST] -03)----SortExec: expr=[c1@0 ASC NULLS LAST, c3@2 DESC, c9@3 ASC NULLS LAST], preserve_partitioning=[true] -04)------ProjectionExec: expr=[first_value(sink_table.c1) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@1 as c1, first_value(sink_table.c2) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@2 as c2, first_value(sink_table.c3) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@3 as c3, first_value(sink_table.c9) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@4 as c9] -05)--------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[first_value(sink_table.c1) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c2) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c3) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c9) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]] -06)----------CoalesceBatchesExec: target_batch_size=8192 -07)------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -08)--------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[first_value(sink_table.c1) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c2) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c3) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c9) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c9], file_type=csv, has_header=true +02)--SortExec: expr=[c1@0 ASC NULLS LAST, c3@2 DESC, c9@3 ASC NULLS LAST], preserve_partitioning=[false] +03)----ProjectionExec: expr=[first_value(sink_table.c1) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@1 as c1, first_value(sink_table.c2) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@2 as c2, first_value(sink_table.c3) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@3 as c3, first_value(sink_table.c9) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]@4 as c9] +04)------AggregateExec: mode=Final, gby=[c1@0 as c1], aggr=[first_value(sink_table.c1) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c2) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c3) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c9) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]] +05)--------CoalescePartitionsExec +06)----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[first_value(sink_table.c1) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c2) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c3) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST], first_value(sink_table.c9) ORDER BY [sink_table.c1 ASC NULLS LAST, sink_table.c3 DESC NULLS FIRST, sink_table.c9 ASC NULLS LAST]] +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c9], file_type=csv, has_header=true query TI diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c3eda544a1de..ead82330eba2 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -121,6 +121,7 @@ The following configuration settings are available: | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.listing_table_factory_infer_partitions | true | Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). | +| datafusion.execution.listing_table_preserve_partition_values | false | Should a `ListingTable` created through the `ListingTableFactory` keep files that share the same partition column values in the same execution partition when `PARTITIONED BY` columns are declared. Defaults to false to avoid overhead on non-partitioned tables. Optimization is best with moderate partition counts (10-100 partitions) and large partitions (>500K rows). With many small partitions (500+), task scheduling overhead may outweigh shuffle savings. | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches |