diff --git a/Cargo.lock b/Cargo.lock index b0370a3e1bf27..c08be6f29ffd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,6 +2620,7 @@ dependencies = [ "chrono", "clap", "datafusion", + "datafusion-datasource", "datafusion-spark", "datafusion-substrait", "env_logger", diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index bb46b8a95703d..616b4905b497b 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -171,6 +171,11 @@ impl Display for Partitioning { /// Values equal to split point `i` belong to partition `i + 1`, so interior /// partitions are lower-inclusive and upper-exclusive. /// +/// Like other user-specified data properties such as sortedness, if a source +/// declares range partitioning, it is responsible for placing each row in the +/// partition described by the split points. DataFusion will not validate this is +/// upheld. +/// /// For a single range key: /// /// ```text diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index e2ffe1415a1fb..a642fbe22a6e3 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -47,6 +47,7 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 0edde71b939f4..a83db2bfb947f 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -53,6 +53,8 @@ use datafusion::{ use datafusion_spark::SessionStateBuilderSpark; use crate::is_spark_path; +use range_partitioning::register_range_partitioned_table; + use async_trait::async_trait; use datafusion::common::cast::as_float64_array; use datafusion::execution::SessionStateBuilder; @@ -61,6 +63,8 @@ use log::info; use sqlparser::ast; use tempfile::TempDir; +mod range_partitioning; + /// Context for running tests pub struct TestContext { /// Context for running queries @@ -167,6 +171,10 @@ impl TestContext { info!("Registering table with many types"); register_table_with_many_types(test_ctx.session_ctx()).await; } + "range_partitioning.slt" => { + info!("Registering range partitioned table"); + register_range_partitioned_table(test_ctx.session_ctx()); + } "metadata.slt" | "arrow_field.slt" => { info!("Registering metadata table tables"); register_metadata_tables(test_ctx.session_ctx()).await; diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs new file mode 100644 index 0000000000000..88e49708baf60 --- /dev/null +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; +use std::sync::Arc; + +use arrow::array::Int32Array; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion::catalog::Session; +use datafusion::common::{Result, ScalarValue, project_schema}; +use datafusion::datasource::source::{DataSource, DataSourceExec}; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::Expr; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_expr::expressions::col as physical_col; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::execution_plan::SchedulingType; +use datafusion::physical_plan::projection::ProjectionExprs; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, + SendableRecordBatchStream, SplitPoint, Statistics, +}; +use datafusion::prelude::SessionContext; +use datafusion_datasource::memory::MemorySourceConfig; + +// ============================================================================== +// Range Partitioned Table (sqllogictest-only) +// ============================================================================== + +/// Simple range-partitioned table for testing before declaring such tables is +/// supported via SQL. +#[derive(Debug)] +struct RangePartitionedTable { + schema: SchemaRef, + partitions: Vec>, + range_column_index: usize, + split_points: Vec, +} + +#[async_trait] +impl TableProvider for RangePartitionedTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let projected_schema = project_schema(&self.schema, projection)?; + let mut source = MemorySourceConfig::try_new( + &self.partitions, + Arc::clone(&self.schema), + projection.cloned(), + )?; + source = source.with_show_sizes(state.config_options().explain.show_sizes); + + let output_partitioning = + self.output_partitioning(projection, &projected_schema)?; + let source = RangePartitionedSource { + inner: source, + output_partitioning, + }; + + Ok(DataSourceExec::from_data_source(source)) + } +} + +impl RangePartitionedTable { + fn output_partitioning( + &self, + projection: Option<&Vec>, + projected_schema: &SchemaRef, + ) -> Result { + let Some(projected_range_index) = + projected_index(self.range_column_index, projection) + else { + return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); + }; + + let range_column = projected_schema.field(projected_range_index).name(); + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + physical_col(range_column, projected_schema)?, + SortOptions::default(), + )]) + .expect("range ordering should not be empty"); + + Ok(Partitioning::Range(RangePartitioning::try_new( + ordering, + self.split_points.clone(), + )?)) + } +} + +fn projected_index( + column_index: usize, + projection: Option<&Vec>, +) -> Option { + projection + .map(|projection| projection.iter().position(|idx| *idx == column_index)) + .unwrap_or(Some(column_index)) +} + +#[derive(Clone, Debug)] +struct RangePartitionedSource { + inner: MemorySourceConfig, + output_partitioning: Partitioning, +} + +impl DataSource for RangePartitionedSource { + fn open( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.inner.open(partition, context) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_as(t, f)?; + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, ", output_partitioning={}", self.output_partitioning) + } + DisplayFormatType::TreeRender => Ok(()), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.output_partitioning.clone() + } + + fn eq_properties(&self) -> EquivalenceProperties { + self.inner.eq_properties() + } + + fn scheduling_type(&self) -> SchedulingType { + self.inner.scheduling_type() + } + + fn partition_statistics(&self, partition: Option) -> Result> { + self.inner.partition_statistics(partition) + } + + fn with_fetch(&self, limit: Option) -> Option> { + Some(Arc::new(Self { + inner: self.inner.clone().with_limit(limit), + output_partitioning: self.output_partitioning.clone(), + })) + } + + fn fetch(&self) -> Option { + self.inner.fetch() + } + + fn try_swapping_with_projection( + &self, + _projection: &ProjectionExprs, + ) -> Result>> { + // Range partitioning metadata is projection-sensitive. This fixture + // computes it in TableProvider::scan, so do not rewrite later + // ProjectionExec nodes into the source. + Ok(None) + } +} + +pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { + let schema = Arc::new(Schema::new(vec![ + Field::new("range_key", DataType::Int32, false), + Field::new("non_range_key", DataType::Int32, false), + Field::new("value", DataType::Int32, false), + ])); + let partitions = vec![ + vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], + vec![range_partition_batch( + &schema, + &[10, 15], + &[1, 2], + &[100, 150], + )], + vec![range_partition_batch( + &schema, + &[20, 25], + &[1, 2], + &[200, 250], + )], + vec![range_partition_batch( + &schema, + &[30, 35], + &[1, 2], + &[300, 350], + )], + ]; + let split_points = vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ]; + let table = RangePartitionedTable { + schema, + partitions, + range_column_index: 0, + split_points, + }; + + ctx.register_table("range_partitioned", Arc::new(table)) + .expect("range partitioned table registration should succeed"); +} + +fn range_partition_batch( + schema: &SchemaRef, + range_key: &[i32], + non_range_key: &[i32], + value: &[i32], +) -> RecordBatch { + RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(Int32Array::from(range_key.to_vec())), + Arc::new(Int32Array::from(non_range_key.to_vec())), + Arc::new(Int32Array::from(value.to_vec())), + ], + ) + .expect("range partition batch should be valid") +} diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt new file mode 100644 index 0000000000000..a61f17a039eb8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) +# as an in-memory source with four physical source partitions: +# +# partition 0: range_key in [..., 10), rows (1, 1, 10), (5, 2, 50) +# partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) +# partition 2: range_key in [20, 30), rows (20, 1, 200), (25, 2, 250) +# partition 3: range_key in [30, ...), rows (30, 1, 300), (35, 2, 350) + +statement ok +set datafusion.explain.physical_plan_only = true; + +########## +# TEST 1: Aggregate on Range Partition Column +# Scanning range_key preserves source Range partitioning metadata. +# Planning still inserts Hash repartitioning today; later optimizer PRs can +# use this baseline to show when the repartition is removed. +########## + +query TT +EXPLAIN SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key; +---- +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] +02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] +04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) + +query II +SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER BY range_key; +---- +1 10 +5 50 +10 100 +15 150 +20 200 +25 250 +30 300 +35 350 + + +########## +# TEST 2: Aggregate on Non-Range Column +# Projecting away range_key means the scan output no longer contains the +# expression needed to describe range partitioning, so it reports +# UnknownPartitioning with the same partition count. +########## + +query TT +EXPLAIN SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key; +---- +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] +02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] +04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=UnknownPartitioning(4) + +query II +SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key ORDER BY non_range_key; +---- +1 610 +2 800 + + +########## +# TEST 3: Join on Range Partition Column +# Both inputs expose Range partitioning on range_key. Join planning currently +# reaches the unsupported Range output-partitioning path; later optimizer PRs +# can replace this baseline with a successful plan and result test. +########## + +query error This feature is not implemented: Join output partitioning with range partitioning is not implemented +SELECT l.range_key, l.value, r.value +FROM range_partitioned l +JOIN range_partitioned r ON l.range_key = r.range_key; + +########## +# TEST 4: Union of Range Partitioned Inputs +# Each input exposes Range partitioning on range_key. This records current +# UNION ALL behavior before later PRs decide whether compatible range inputs can +# preserve Range partitioning across the union. +########## + +query TT +EXPLAIN SELECT range_key, value FROM range_partitioned +UNION ALL +SELECT range_key, value FROM range_partitioned; +---- +physical_plan +01)UnionExec +02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) + +query II +SELECT range_key, value FROM range_partitioned +UNION ALL +SELECT range_key, value FROM range_partitioned +ORDER BY range_key, value; +---- +1 10 +1 10 +5 50 +5 50 +10 100 +10 100 +15 150 +15 150 +20 200 +20 200 +25 250 +25 250 +30 300 +30 300 +35 350 +35 350 + +statement ok +reset datafusion.explain.physical_plan_only;