diff --git a/Cargo.lock b/Cargo.lock index c08be6f29ffd7..b0370a3e1bf27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,7 +2620,6 @@ dependencies = [ "chrono", "clap", "datafusion", - "datafusion-datasource", "datafusion-spark", "datafusion-substrait", "env_logger", diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 0ab15e05abba1..4515b950909ff 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -22,6 +22,7 @@ use datafusion_datasource::ListingTableUrl; use datafusion_datasource::file_format::FileFormat; use datafusion_execution::config::SessionConfig; use datafusion_expr::SortExpr; +use datafusion_physical_expr::Partitioning; use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; @@ -42,8 +43,10 @@ pub struct ListingOptions { /// This can add a lot of overhead as it will usually require files /// to be opened and at least partially parsed. pub collect_stat: bool, - /// Group files to avoid that the number of partitions exceeds - /// this limit + /// Group files to avoid that the number of partitions exceeds this limit. + /// + /// If [`Self::output_partitioning`] is set, its partition count is used + /// instead, even when it exceeds this value. pub target_partitions: usize, /// Optional pre-known sort order(s). Must be `SortExpr`s. /// @@ -61,6 +64,17 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Optional declared output partitioning for this table. + /// + /// Expressions are specified against the full table schema. When set, + /// [`ListingTable`](crate::ListingTable) creates one scan file group per + /// declared output partition instead of using [`Self::target_partitions`]. + /// Empty file groups are added when needed to preserve that count. + /// + /// Files are sorted by path before grouping. DataFusion does not validate + /// that rows match the declaration, so callers must ensure file group `i` + /// contains only rows for declared output partition `i`. + pub output_partitioning: Option, } impl ListingOptions { @@ -78,6 +92,7 @@ impl ListingOptions { collect_stat: false, target_partitions: 1, file_sort_order: vec![], + output_partitioning: None, } } @@ -136,6 +151,18 @@ impl ListingOptions { self } + /// Set declared output partitioning on [`ListingOptions`] and returns self. + /// + /// See [`Self::output_partitioning`]. Empty file groups are added when + /// needed to preserve the declared partition count. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set `table partition columns` on [`ListingOptions`] and returns self. /// /// "partition columns," used to support [Hive Partitioning], are @@ -224,6 +251,8 @@ impl ListingOptions { /// Set number of target partitions on [`ListingOptions`] and returns self. /// + /// This controls file grouping when no explicit output partitioning is set. + /// /// ``` /// # use std::sync::Arc; /// # use datafusion_catalog_listing::ListingOptions; diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index dd3675bd2b39d..ad85edf550822 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -43,6 +43,7 @@ use datafusion_physical_expr::create_lex_ordering; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::Partitioning; use datafusion_physical_plan::empty::EmptyExec; use futures::{Stream, StreamExt, TryStreamExt, future, stream}; use object_store::ObjectStore; @@ -505,12 +506,31 @@ impl TableProvider for ListingTable { // at the same time. This is because the limit should be applied after the filters are applied. let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let declared_output_partitioning = if partition_filters.is_empty() { + self.options.output_partitioning.clone() + } else { + // Partition pruning can remove files before grouping. Without a + // stable file-to-declared-partition mapping, regrouping the + // remaining files could shift them into the wrong partition index. + None + }; + let target_partitions = declared_output_partitioning + .as_ref() + .map(Partitioning::partition_count) + .unwrap_or(self.options.target_partitions); + let ListFilesResult { file_groups: mut partitioned_file_lists, statistics, grouped_by_partition: partitioned_by_file_group, } = self - .list_files_for_scan(state, &partition_filters, statistic_file_limit) + .list_files_for_scan_with_target( + state, + &partition_filters, + statistic_file_limit, + target_partitions, + declared_output_partitioning.is_some(), + ) .await?; // if no files need to be read, return an `EmptyExec` @@ -523,17 +543,19 @@ impl TableProvider for ListingTable { state.execution_props(), &partitioned_file_lists, )?; - match state - .config_options() - .execution - .split_file_groups_by_statistics + let split_file_groups_by_statistics = declared_output_partitioning.is_none() + && state + .config_options() + .execution + .split_file_groups_by_statistics; + match split_file_groups_by_statistics .then(|| { output_ordering.first().map(|output_ordering| { FileScanConfig::split_groups_by_statistics_with_target_partitions( &self.table_schema, &partitioned_file_lists, output_ordering, - self.options.target_partitions, + target_partitions, ) }) }) @@ -541,7 +563,7 @@ impl TableProvider for ListingTable { { Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), Some(Ok(new_groups)) => { - if new_groups.len() <= self.options.target_partitions { + if new_groups.len() <= target_partitions { partitioned_file_lists = new_groups; } else { log::debug!( @@ -561,24 +583,27 @@ impl TableProvider for ListingTable { }; let file_source = self.create_file_source(); + let mut scan_config_builder = + FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) + .with_statistics(statistics) + .with_projection_indices(projection)? + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_output_partitioning(declared_output_partitioning) + .with_expr_adapter(self.expr_adapter_factory.clone()); + if partitioned_by_file_group { + scan_config_builder = + scan_config_builder.with_partitioned_by_file_group(true); + } + let scan_config = scan_config_builder.build(); // create the execution plan let plan = self .options .format - .create_physical_plan( - state, - FileScanConfigBuilder::new(object_store_url, file_source) - .with_file_groups(partitioned_file_lists) - .with_constraints(self.constraints.clone()) - .with_statistics(statistics) - .with_projection_indices(projection)? - .with_limit(limit) - .with_output_ordering(output_ordering) - .with_expr_adapter(self.expr_adapter_factory.clone()) - .with_partitioned_by_file_group(partitioned_by_file_group) - .build(), - ) + .create_physical_plan(state, scan_config) .await?; Ok(ScanResult::new(plan)) @@ -690,12 +715,45 @@ impl ListingTable { /// Get the list of files for a scan as well as the file level statistics. /// The list is grouped to let the execution plan know how the files should /// be distributed to different threads / executors. + /// + /// If [`ListingOptions::output_partitioning`] is set, the returned file + /// groups preserve that declared partition count, including empty trailing + /// groups when needed, rather than using + /// [`ListingOptions::target_partitions`]. pub async fn list_files_for_scan<'a>( &'a self, ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, ) -> datafusion_common::Result { + let declared_output_partitioning = self.options.output_partitioning.as_ref(); + let target_partitions = declared_output_partitioning + .map(Partitioning::partition_count) + .unwrap_or(self.options.target_partitions); + self.list_files_for_scan_with_target( + ctx, + filters, + limit, + target_partitions, + declared_output_partitioning.is_some(), + ) + .await + } + + async fn list_files_for_scan_with_target<'a>( + &'a self, + ctx: &'a dyn Session, + filters: &'a [Expr], + limit: Option, + target_partitions: usize, + preserve_partition_count: bool, + ) -> datafusion_common::Result { + if target_partitions == 0 { + return plan_err!( + "ListingTable requires target_partitions to be greater than zero" + ); + } + let store = if let Some(url) = self.table_paths.first() { ctx.runtime_env().object_store(url)? } else { @@ -747,27 +805,26 @@ impl ListingTable { // hash repartitioning for aggregates and joins on partition columns. let threshold = ctx.config_options().optimizer.preserve_file_partitions; - let (file_groups, grouped_by_partition) = if threshold > 0 - && !self.options.table_partition_cols.is_empty() - { - let grouped = - file_group.group_by_partition_values(self.options.target_partitions); + let (mut file_groups, grouped_by_partition) = if preserve_partition_count { + (file_group.split_files(target_partitions), false) + } else if threshold > 0 && !self.options.table_partition_cols.is_empty() { + let grouped = file_group.group_by_partition_values(target_partitions); if grouped.len() >= threshold { (grouped, true) } else { let all_files: Vec<_> = grouped.into_iter().flat_map(|g| g.into_inner()).collect(); ( - FileGroup::new(all_files).split_files(self.options.target_partitions), + FileGroup::new(all_files).split_files(target_partitions), false, ) } } else { - ( - file_group.split_files(self.options.target_partitions), - false, - ) + (file_group.split_files(target_partitions), false) }; + if preserve_partition_count && !file_groups.is_empty() { + file_groups.resize_with(target_partitions, || FileGroup::new(vec![])); + } let (file_groups, stats) = compute_all_files_statistics( file_groups, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d14ec1f56dce2..d95970b48f356 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -141,10 +141,12 @@ mod tests { use datafusion_expr::dml::InsertOp; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::PhysicalSortExpr; - use datafusion_physical_expr::expressions::binary; + use datafusion_physical_expr::expressions::{Column, binary}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; - use datafusion_physical_plan::{ExecutionPlanProperties, collect}; + use datafusion_physical_plan::{ + ExecutionPlanProperties, Partitioning, RangePartitioning, SplitPoint, collect, + }; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -1286,6 +1288,80 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_list_files_uses_declared_output_partitioning_count() -> Result<()> { + let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_target_partitions(1) + .with_output_partitioning(Some(Partitioning::RoundRobinBatch(4))); + + let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let group_sizes = result + .file_groups + .iter() + .map(|group| group.len()) + .collect::>(); + + assert_eq!(group_sizes, vec![1, 1, 0, 0]); + + Ok(()) + } + + #[tokio::test] + async fn test_partition_filter_drops_declared_output_partitioning() -> Result<()> { + let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"]; + + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); + + let output_partitioning = Partitioning::Range(RangePartitioning::try_new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("pid", 1)), + SortOptions::default(), + )]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])], + )?); + + let opt = ListingOptions::new(Arc::new(JsonFormat::default())) + .with_file_extension_opt(Some("")) + .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]) + .with_output_partitioning(Some(output_partitioning.clone())); + + let table_path = ListingTableUrl::parse("test:///bucket/test/")?; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let unfiltered = table.scan(&ctx.state(), None, &[], None).await?; + assert_eq!(unfiltered.output_partitioning(), &output_partitioning); + + let filter = Expr::eq(col("pid"), lit(2_i32)); + let filtered = table.scan(&ctx.state(), None, &[filter], None).await?; + assert!(matches!( + filtered.output_partitioning(), + Partitioning::UnknownPartitioning(1) + )); + + Ok(()) + } + #[tokio::test] async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> { let files = [ diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 3ebd588a0770f..6f317c959549f 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -40,7 +40,7 @@ use datafusion_expr::Operator; use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExprs, ProjectionMapping}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -206,6 +206,13 @@ pub struct FileScanConfig { /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, + /// Optional declared output partitioning of this file scan. + /// + /// Expressions are in terms of the full table schema, before scan + /// projection or filtering. If the partition count does not match the + /// number of file groups, [`DataSource::output_partitioning`] falls back to + /// [`Partitioning::UnknownPartitioning`]. + pub output_partitioning: Option, } /// A builder for [`FileScanConfig`]'s. @@ -274,6 +281,7 @@ pub struct FileScanConfigBuilder { file_groups: Vec, statistics: Option, output_ordering: Vec, + output_partitioning: Option, file_compression_type: Option, batch_size: Option, expr_adapter_factory: Option>, @@ -297,6 +305,7 @@ impl FileScanConfigBuilder { file_groups: vec![], statistics: None, output_ordering: vec![], + output_partitioning: None, file_compression_type: None, limit: None, preserve_order: false, @@ -463,6 +472,17 @@ impl FileScanConfigBuilder { self } + /// Set declared output partitioning for this scan. + /// + /// See [`FileScanConfig::output_partitioning`]. + pub fn with_output_partitioning( + mut self, + output_partitioning: Option, + ) -> Self { + self.output_partitioning = output_partitioning; + self + } + /// Set the file compression type pub fn with_file_compression_type( mut self, @@ -521,6 +541,7 @@ impl FileScanConfigBuilder { file_groups, statistics, output_ordering, + output_partitioning, file_compression_type, batch_size, expr_adapter_factory: expr_adapter, @@ -536,7 +557,6 @@ impl FileScanConfigBuilder { // If there is an output ordering, we should preserve it. let preserve_order = preserve_order || !output_ordering.is_empty(); - FileScanConfig { object_store_url, file_source, @@ -550,6 +570,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + output_partitioning, } } } @@ -562,6 +583,7 @@ impl From for FileScanConfigBuilder { file_groups: config.file_groups, statistics: Some(config.statistics), output_ordering: config.output_ordering, + output_partitioning: config.output_partitioning, file_compression_type: Some(config.file_compression_type), limit: config.limit, preserve_order: config.preserve_order, @@ -573,6 +595,52 @@ impl From for FileScanConfigBuilder { } } +fn hash_partitioning_from_partition_fields( + schema: &Schema, + partition_cols: &Fields, + partition_count: usize, +) -> Option { + if partition_cols.is_empty() { + return None; + } + + let mut exprs: Vec> = Vec::with_capacity(partition_cols.len()); + for partition_col in partition_cols { + let name = partition_col.name(); + let idx = schema + .fields() + .iter() + .position(|field| field.name() == name)?; + exprs.push(Arc::new(Column::new(name, idx))); + } + + Some(Partitioning::Hash(exprs, partition_count)) +} + +fn project_output_partitioning( + partitioning: &Partitioning, + mapping: &ProjectionMapping, + input_schema: &SchemaRef, + partition_count: usize, +) -> Partitioning { + let input_eq_properties = EquivalenceProperties::new(Arc::clone(input_schema)); + match partitioning { + Partitioning::Hash(exprs, _) => { + let projected_exprs = input_eq_properties + .project_expressions(exprs, mapping) + .collect::>>(); + projected_exprs + .map(|exprs| Partitioning::Hash(exprs, partition_count)) + .unwrap_or_else(|| Partitioning::UnknownPartitioning(partition_count)) + } + Partitioning::Range(_) + | Partitioning::RoundRobinBatch(_) + | Partitioning::UnknownPartitioning(_) => { + partitioning.project(mapping, &input_eq_properties) + } + } +} + impl DataSource for FileScanConfig { fn open( &self, @@ -660,6 +728,10 @@ impl DataSource for FileScanConfig { display_orderings(f, &orderings)?; + if self.output_partitioning.is_some() { + write!(f, ", output_partitioning={}", self.output_partitioning())?; + } + if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; } @@ -683,10 +755,9 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - // When files are grouped by partition values, we cannot allow byte-range - // splitting. It would mix rows from different partition values across - // file groups, breaking the Hash partitioning. - if self.partitioned_by_file_group { + // When file groups define output partitioning, repartitioning files + // would invalidate the partition-to-file-group mapping. + if self.output_partitioning.is_some() || self.partitioned_by_file_group { return Ok(None); } @@ -702,54 +773,60 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// - /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on - /// the Hive partition columns, allowing the optimizer to skip hash repartitioning - /// for aggregates and joins on those columns. + /// When output partitioning is declared, this returns it after remapping + /// through the scan projection. Otherwise, when `partitioned_by_file_group` + /// is true, this returns `Partitioning::Hash` on the Hive partition + /// columns, allowing the optimizer to skip repartitioning for compatible + /// aggregates and joins. /// /// Tradeoffs - /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with - /// `GROUP BY` or `ORDER BY` on partition columns. - /// - Cost: Files are grouped by partition values rather than split by byte - /// ranges, which may reduce I/O parallelism when partition sizes are uneven. - /// For simple aggregations without `ORDER BY`, this cost may outweigh the benefit. + /// - Benefit: Eliminates `RepartitionExec` for compatible queries. + /// - Cost: File groups must remain intact, so byte-range file splitting + /// and sibling work stealing are disabled. /// /// Follow-up Work - /// - Idea: Could allow byte-range splitting within partition-aware groups, + /// - Idea: Could allow byte-range splitting within each output partition, /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { - if self.partitioned_by_file_group { - let partition_cols = self.table_partition_cols(); - if !partition_cols.is_empty() { - let projected_schema = match self.projected_schema() { - Ok(schema) => schema, - Err(_) => { - debug!( - "Could not get projected schema, falling back to UnknownPartitioning." - ); - return Partitioning::UnknownPartitioning(self.file_groups.len()); - } - }; - - // Build Column expressions for partition columns based on their - // position in the projected schema - let mut exprs: Vec> = Vec::new(); - for partition_col in partition_cols { - if let Some((idx, _)) = projected_schema - .fields() - .iter() - .enumerate() - .find(|(_, f)| f.name() == partition_col.name()) - { - exprs.push(Arc::new(Column::new(partition_col.name(), idx))); - } - } + let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { + self.partitioned_by_file_group.then(|| { + hash_partitioning_from_partition_fields( + self.file_source.table_schema().table_schema(), + self.table_partition_cols(), + self.file_groups.len(), + ) + })? + }) else { + return Partitioning::UnknownPartitioning(self.file_groups.len()); + }; + if output_partitioning.partition_count() != self.file_groups.len() { + debug!( + "Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.", + output_partitioning.partition_count(), + self.file_groups.len() + ); + return Partitioning::UnknownPartitioning(self.file_groups.len()); + } - if exprs.len() == partition_cols.len() { - return Partitioning::Hash(exprs, self.file_groups.len()); + if let Some(projection) = self.file_source.projection() { + let schema = self.file_source.table_schema().table_schema(); + return match projection.projection_mapping(schema) { + Ok(mapping) => project_output_partitioning( + &output_partitioning, + &mapping, + schema, + self.file_groups.len(), + ), + Err(e) => { + debug!( + "Could not project output partitioning, falling back to UnknownPartitioning: {e}" + ); + Partitioning::UnknownPartitioning(self.file_groups.len()) } - } + }; } - Partitioning::UnknownPartitioning(self.file_groups.len()) + + output_partitioning } /// Computes the effective equivalence properties of this file scan, taking @@ -1043,7 +1120,10 @@ impl DataSource for FileScanConfig { /// when file order must be preserved or the file groups define the output /// partitioning needed for the rest of the plan fn create_sibling_state(&self) -> Option> { - if self.preserve_order || self.partitioned_by_file_group { + if self.preserve_order + || self.output_partitioning.is_some() + || self.partitioned_by_file_group + { return None; } @@ -2457,6 +2537,58 @@ mod tests { assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); } + #[test] + fn test_declared_output_partitioning_projects_with_scan() { + let file_schema = aggr_test_schema(); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4); + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![1, 2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = Some(output_partitioning); + + match config.output_partitioning() { + Partitioning::Hash(exprs, num_partitions) => { + assert_eq!(num_partitions, 4); + assert_eq!(exprs.len(), 1); + let column = exprs[0].downcast_ref::().unwrap(); + assert_eq!(column.name(), "c2"); + assert_eq!(column.index(), 0); + } + _ => panic!("Expected Hash partitioning"), + } + + let mut config = config_for_projection( + Arc::clone(&file_schema), + Some(vec![2]), + Statistics::new_unknown(&file_schema), + vec![], + ); + config.file_groups = vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![PartitionedFile::new("f4.parquet".to_string(), 1024)]), + ]; + config.output_partitioning = + Some(Partitioning::Hash(vec![Arc::new(Column::new("c2", 1))], 4)); + + assert!(matches!( + config.output_partitioning(), + Partitioning::UnknownPartitioning(4) + )); + } + #[test] fn test_output_partitioning_no_partition_columns() { let file_schema = aggr_test_schema(); diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index ebae6c1abb970..5fec29d500bf1 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1130,6 +1130,7 @@ message FileScanExecConf { optional ProjectionExprs projection_exprs = 13; optional bool partitioned_by_file_group = 14; + optional Partitioning output_partitioning = 15; } message ParquetScanExecNode { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 6e1901b1e4571..fb770e8775732 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -6907,6 +6907,9 @@ impl serde::Serialize for FileScanExecConf { if self.partitioned_by_file_group.is_some() { len += 1; } + if self.output_partitioning.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -6946,6 +6949,9 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.partitioned_by_file_group.as_ref() { struct_ser.serialize_field("partitionedByFileGroup", v)?; } + if let Some(v) = self.output_partitioning.as_ref() { + struct_ser.serialize_field("outputPartitioning", v)?; + } struct_ser.end() } } @@ -6975,6 +6981,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "projectionExprs", "partitioned_by_file_group", "partitionedByFileGroup", + "output_partitioning", + "outputPartitioning", ]; #[allow(clippy::enum_variant_names)] @@ -6991,6 +6999,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { BatchSize, ProjectionExprs, PartitionedByFileGroup, + OutputPartitioning, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7024,6 +7033,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), "projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs), "partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup), + "outputPartitioning" | "output_partitioning" => Ok(GeneratedField::OutputPartitioning), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7055,6 +7065,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut batch_size__ = None; let mut projection_exprs__ = None; let mut partitioned_by_file_group__ = None; + let mut output_partitioning__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -7134,6 +7145,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } partitioned_by_file_group__ = map_.next_value()?; } + GeneratedField::OutputPartitioning => { + if output_partitioning__.is_some() { + return Err(serde::de::Error::duplicate_field("outputPartitioning")); + } + output_partitioning__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -7149,6 +7166,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { batch_size: batch_size__, projection_exprs: projection_exprs__, partitioned_by_file_group: partitioned_by_file_group__, + output_partitioning: output_partitioning__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index d2b25695cb1f4..0eb38029226ec 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -1693,6 +1693,8 @@ pub struct FileScanExecConf { pub projection_exprs: ::core::option::Option, #[prost(bool, optional, tag = "14")] pub partitioned_by_file_group: ::core::option::Option, + #[prost(message, optional, tag = "15")] + pub output_partitioning: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 75311e244073f..c945a4c623c2a 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -653,6 +653,12 @@ pub fn parse_protobuf_file_scan_config( )?; output_ordering.extend(LexOrdering::new(sort_exprs)); } + let output_partitioning = parse_protobuf_partitioning( + proto.output_partitioning.as_ref(), + ctx, + &schema, + proto_converter, + )?; // Parse projection expressions if present and apply to file source let file_source = if let Some(proto_projection_exprs) = &proto.projection_exprs { @@ -681,15 +687,18 @@ pub fn parse_protobuf_file_scan_config( file_source }; - let config = FileScanConfigBuilder::new(object_store_url, file_source) + let mut config_builder = FileScanConfigBuilder::new(object_store_url, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) - .with_batch_size(proto.batch_size.map(|s| s as usize)) - .with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false)) - .build(); + .with_output_partitioning(output_partitioning) + .with_batch_size(proto.batch_size.map(|s| s as usize)); + if proto.partitioned_by_file_group.unwrap_or(false) { + config_builder = config_builder.with_partitioned_by_file_group(true); + } + let config = config_builder.build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index cb7580269bc6e..a30b3b863bce0 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -603,6 +603,11 @@ pub fn serialize_file_scan_config( serialize_physical_sort_exprs(order.to_vec(), codec, proto_converter)?; output_orderings.push(ordering) } + let output_partitioning = conf + .output_partitioning + .as_ref() + .map(|partitioning| serialize_partitioning(partitioning, codec, proto_converter)) + .transpose()?; // Fields must be added to the schema so that they can persist in the protobuf, // and then they are to be removed from the schema in `parse_protobuf_file_scan_config` @@ -663,6 +668,7 @@ pub fn serialize_file_scan_config( batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, partitioned_by_file_group: Some(conf.partitioned_by_file_group), + output_partitioning, }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index bd996eb692f71..b268ee34d664d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4087,22 +4087,27 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { } #[test] -fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { +fn roundtrip_parquet_exec_output_partitioning() -> Result<()> { use datafusion::datasource::physical_plan::FileScanConfig; let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = + Partitioning::Hash(vec![Arc::new(Column::new("col", 0))], 1); let scan_config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( "/path/to/file.parquet".to_string(), 1024, )])]) - .with_partitioned_by_file_group(true) + .with_output_partitioning(Some(output_partitioning.clone())) .build(); - assert!(scan_config.partitioned_by_file_group); + assert_eq!( + scan_config.output_partitioning, + Some(output_partitioning.clone()) + ); let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); @@ -4129,7 +4134,72 @@ fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { .downcast_ref::() .expect("Expected FileScanConfig"); - assert!(file_scan_config.partitioned_by_file_group); + assert_eq!( + file_scan_config.output_partitioning, + Some(output_partitioning) + ); + + Ok(()) +} + +#[test] +fn roundtrip_parquet_exec_range_output_partitioning() -> Result<()> { + use datafusion::datasource::physical_plan::FileScanConfig; + + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let output_partitioning = Partitioning::Range(RangePartitioning::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "col", 0, + )))]) + .unwrap(), + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![ + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-1.parquet".to_string(), + 1024, + )]), + FileGroup::new(vec![PartitionedFile::new( + "/path/to/file-2.parquet".to_string(), + 1024, + )]), + ]) + .with_output_partitioning(Some(output_partitioning.clone())) + .build(); + + let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&exec_plan), + &codec, + &proto_converter, + )?; + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), + &codec, + &proto_converter, + )?; + + let data_source_exec = result_plan + .downcast_ref::() + .expect("Expected DataSourceExec"); + let file_scan_config = data_source_exec + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + assert_eq!( + file_scan_config.output_partitioning, + Some(output_partitioning) + ); Ok(()) } diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index a642fbe22a6e3..e2ffe1415a1fb 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -47,7 +47,6 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } -datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs index 88e49708baf60..55e4f09663d90 100644 --- a/datafusion/sqllogictest/src/test_context/range_partitioning.rs +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -15,236 +15,102 @@ // specific language governing permissions and limitations // under the License. -use std::fmt; +use std::fs::{create_dir_all, remove_dir_all, write}; +use std::path::Path; use std::sync::Arc; -use arrow::array::Int32Array; use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use datafusion::catalog::Session; -use datafusion::common::{Result, ScalarValue, project_schema}; -use datafusion::datasource::source::{DataSource, DataSourceExec}; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::Expr; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_expr::expressions::col as physical_col; -use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::execution_plan::SchedulingType; -use datafusion::physical_plan::projection::ProjectionExprs; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, - SendableRecordBatchStream, SplitPoint, Statistics, +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::ScalarValue; +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::{Partitioning, RangePartitioning, SplitPoint}; use datafusion::prelude::SessionContext; -use datafusion_datasource::memory::MemorySourceConfig; // ============================================================================== // Range Partitioned Table (sqllogictest-only) // ============================================================================== -/// Simple range-partitioned table for testing before declaring such tables is -/// supported via SQL. -#[derive(Debug)] -struct RangePartitionedTable { - schema: SchemaRef, - partitions: Vec>, - range_column_index: usize, - split_points: Vec, -} - -#[async_trait] -impl TableProvider for RangePartitionedTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let projected_schema = project_schema(&self.schema, projection)?; - let mut source = MemorySourceConfig::try_new( - &self.partitions, - Arc::clone(&self.schema), - projection.cloned(), - )?; - source = source.with_show_sizes(state.config_options().explain.show_sizes); - - let output_partitioning = - self.output_partitioning(projection, &projected_schema)?; - let source = RangePartitionedSource { - inner: source, - output_partitioning, - }; - - Ok(DataSourceExec::from_data_source(source)) - } -} - -impl RangePartitionedTable { - fn output_partitioning( - &self, - projection: Option<&Vec>, - projected_schema: &SchemaRef, - ) -> Result { - let Some(projected_range_index) = - projected_index(self.range_column_index, projection) - else { - return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); - }; - - let range_column = projected_schema.field(projected_range_index).name(); - let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - physical_col(range_column, projected_schema)?, - SortOptions::default(), - )]) - .expect("range ordering should not be empty"); - - Ok(Partitioning::Range(RangePartitioning::try_new( - ordering, - self.split_points.clone(), - )?)) - } -} - -fn projected_index( - column_index: usize, - projection: Option<&Vec>, -) -> Option { - projection - .map(|projection| projection.iter().position(|idx| *idx == column_index)) - .unwrap_or(Some(column_index)) -} - -#[derive(Clone, Debug)] -struct RangePartitionedSource { - inner: MemorySourceConfig, - output_partitioning: Partitioning, -} - -impl DataSource for RangePartitionedSource { - fn open( - &self, - partition: usize, - context: Arc, - ) -> Result { - self.inner.open(partition, context) - } - - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt_as(t, f)?; - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, ", output_partitioning={}", self.output_partitioning) - } - DisplayFormatType::TreeRender => Ok(()), - } - } - - fn output_partitioning(&self) -> Partitioning { - self.output_partitioning.clone() - } - - fn eq_properties(&self) -> EquivalenceProperties { - self.inner.eq_properties() - } - - fn scheduling_type(&self) -> SchedulingType { - self.inner.scheduling_type() - } - - fn partition_statistics(&self, partition: Option) -> Result> { - self.inner.partition_statistics(partition) - } - - fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(Self { - inner: self.inner.clone().with_limit(limit), - output_partitioning: self.output_partitioning.clone(), - })) - } - - fn fetch(&self) -> Option { - self.inner.fetch() - } - - fn try_swapping_with_projection( - &self, - _projection: &ProjectionExprs, - ) -> Result>> { - // Range partitioning metadata is projection-sensitive. This fixture - // computes it in TableProvider::scan, so do not rewrite later - // ProjectionExec nodes into the source. - Ok(None) - } -} - +/// Registers a simple range-partitioned listing table for testing before +/// declaring such tables is supported via SQL. pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { let schema = Arc::new(Schema::new(vec![ Field::new("range_key", DataType::Int32, false), Field::new("non_range_key", DataType::Int32, false), Field::new("value", DataType::Int32, false), ])); - let partitions = vec![ - vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], - vec![range_partition_batch( - &schema, - &[10, 15], - &[1, 2], - &[100, 150], - )], - vec![range_partition_batch( - &schema, - &[20, 25], - &[1, 2], - &[200, 250], - )], - vec![range_partition_batch( - &schema, - &[30, 35], - &[1, 2], - &[300, 350], - )], - ]; - let split_points = vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), - ]; - let table = RangePartitionedTable { + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("range_key", 0)), + SortOptions::default(), + )]) + .expect("range ordering should not be empty"); + let output_partitioning = Partitioning::Range( + RangePartitioning::try_new( + ordering, + vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ], + ) + .expect("range partitioning should be valid"), + ); + + register_csv_listing_table( + ctx, + "range_partitioned", + Path::new(env!("CARGO_MANIFEST_DIR")) + .join("test_files/scratch_range_partitioning/range_partitioned"), schema, - partitions, - range_column_index: 0, - split_points, - }; - - ctx.register_table("range_partitioned", Arc::new(table)) - .expect("range partitioned table registration should succeed"); + [ + "1,1,10\n5,2,50\n", + "10,1,100\n15,2,150\n", + "20,1,200\n25,2,250\n", + "30,1,300\n35,2,350\n", + ], + Some(output_partitioning), + ); } -fn range_partition_batch( - schema: &SchemaRef, - range_key: &[i32], - non_range_key: &[i32], - value: &[i32], -) -> RecordBatch { - RecordBatch::try_new( - Arc::clone(schema), - vec![ - Arc::new(Int32Array::from(range_key.to_vec())), - Arc::new(Int32Array::from(non_range_key.to_vec())), - Arc::new(Int32Array::from(value.to_vec())), - ], - ) - .expect("range partition batch should be valid") +fn register_csv_listing_table( + ctx: &SessionContext, + name: &str, + table_dir: impl AsRef, + schema: Arc, + partitions: impl IntoIterator, + output_partitioning: Option, +) { + let table_dir = table_dir.as_ref(); + if table_dir.exists() { + remove_dir_all(table_dir).expect("test table dir should be removable"); + } + create_dir_all(table_dir).expect("test table dir should be created"); + for (idx, rows) in partitions.into_iter().enumerate() { + write(table_dir.join(format!("part-{idx}.csv")), rows) + .expect("test table csv partition should be written"); + } + + let table_path = format!( + "{}/", + table_dir + .to_str() + .expect("test table path should be valid utf8") + ); + let table_url = + ListingTableUrl::parse(&table_path).expect("test table url should parse"); + let options = + ListingOptions::new(Arc::new(CsvFormat::default().with_has_header(false))) + .with_output_partitioning(output_partitioning); + let config = ListingTableConfig::new(table_url) + .with_listing_options(options) + .with_schema(schema); + let table = + ListingTable::try_new(config).expect("test listing table should be valid"); + + ctx.register_table(name, Arc::new(table)) + .expect("test listing table registration should succeed"); } diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt index a61f17a039eb8..2b7a2cfdf4083 100644 --- a/datafusion/sqllogictest/test_files/range_partitioning.slt +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -16,7 +16,7 @@ # under the License. # The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) -# as an in-memory source with four physical source partitions: +# as a CSV ListingTable with four declared range-partitioned file groups: # # partition 0: range_key in [..., 10), rows (1, 1, 10), (5, 2, 50) # partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) @@ -40,7 +40,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER BY range_key; @@ -69,7 +69,7 @@ physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] 02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] -04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=UnknownPartitioning(4) +04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[non_range_key, value], output_partitioning=UnknownPartitioning(4), file_type=csv, has_header=false query II SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key ORDER BY non_range_key; @@ -104,8 +104,8 @@ SELECT range_key, value FROM range_partitioned; ---- physical_plan 01)UnionExec -02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) -03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false +03)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-0.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-2.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch_range_partitioning/range_partitioned/part-3.csv]]}, projection=[range_key, value], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4), file_type=csv, has_header=false query II SELECT range_key, value FROM range_partitioned