From ada1564b931bcaafe444508dcd695ffd4649f2ba Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Thu, 28 May 2026 15:00:53 -0400 Subject: [PATCH 1/3] add range table and initial slt test --- Cargo.lock | 1 + datafusion/physical-expr/src/partitioning.rs | 4 + datafusion/sqllogictest/Cargo.toml | 1 + datafusion/sqllogictest/src/test_context.rs | 227 +++++++++++++++++- .../test_files/range_partitioning.slt | 134 +++++++++++ 5 files changed, 365 insertions(+), 2 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/range_partitioning.slt diff --git a/Cargo.lock b/Cargo.lock index b0370a3e1bf27..c08be6f29ffd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,6 +2620,7 @@ dependencies = [ "chrono", "clap", "datafusion", + "datafusion-datasource", "datafusion-spark", "datafusion-substrait", "env_logger", diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index bb46b8a95703d..f58ce12d04b83 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -171,6 +171,10 @@ impl Display for Partitioning { /// Values equal to split point `i` belong to partition `i + 1`, so interior /// partitions are lower-inclusive and upper-exclusive. /// +/// If a source declares range partitioning, it is responsible for placing each +/// row in the partition described by the split points, DataFusion will not validate this is +/// upheld. +/// /// For a single range key: /// /// ```text diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index e2ffe1415a1fb..a642fbe22a6e3 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -47,6 +47,7 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 0edde71b939f4..031eb56b9aec7 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashMap; +use std::fmt; use std::fs::File; use std::io::Write; use std::path::Path; @@ -28,6 +29,7 @@ use arrow::array::{ TimestampNanosecondArray, UInt32Array, UnionArray, }; use arrow::buffer::ScalarBuffer; +use arrow::compute::SortOptions; use arrow::datatypes::{ DataType, Field, FieldRef, Fields, Schema, SchemaRef, TimeUnit, UInt32Type, UnionFields, @@ -36,7 +38,9 @@ use arrow::record_batch::RecordBatch; use datafusion::catalog::{ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, Session, }; -use datafusion::common::{DataFusionError, Result, not_impl_err}; +use datafusion::common::{DataFusionError, Result, ScalarValue, not_impl_err}; +use datafusion::datasource::source::{DataSource, DataSourceExec}; +use datafusion::execution::context::TaskContext; use datafusion::functions::math::abs; use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; use datafusion::logical_expr::planner::TypePlanner; @@ -44,12 +48,21 @@ use datafusion::logical_expr::{ ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, create_udf, }; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_expr::expressions::col as physical_col; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::execution_plan::SchedulingType; +use datafusion::physical_plan::projection::ProjectionExprs; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, + SendableRecordBatchStream, SplitPoint, Statistics, project_schema, +}; use datafusion::prelude::*; use datafusion::{ datasource::{MemTable, TableProvider, TableType}, prelude::{CsvReadOptions, SessionContext}, }; +use datafusion_datasource::memory::MemorySourceConfig; use datafusion_spark::SessionStateBuilderSpark; use crate::is_spark_path; @@ -167,6 +180,10 @@ impl TestContext { info!("Registering table with many types"); register_table_with_many_types(test_ctx.session_ctx()).await; } + "range_partitioning.slt" => { + info!("Registering range partitioned table"); + register_range_partitioned_table(test_ctx.session_ctx()); + } "metadata.slt" | "arrow_field.slt" => { info!("Registering metadata table tables"); register_metadata_tables(test_ctx.session_ctx()).await; @@ -286,6 +303,212 @@ fn register_strict_schema_provider(ctx: &SessionContext) { ); } +// ============================================================================== +// Range Partitioned Table (sqllogictest-only) +// ============================================================================== + +#[derive(Debug)] +struct RangePartitionedTable { + schema: SchemaRef, + partitions: Vec>, + range_column_index: usize, + split_points: Vec, +} + +#[async_trait] +impl TableProvider for RangePartitionedTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let projected_schema = project_schema(&self.schema, projection)?; + let mut source = MemorySourceConfig::try_new( + &self.partitions, + Arc::clone(&self.schema), + projection.cloned(), + )?; + source = source.with_show_sizes(state.config_options().explain.show_sizes); + + let output_partitioning = + self.output_partitioning(projection, &projected_schema)?; + let source = RangePartitionedSource { + inner: source, + output_partitioning, + }; + + Ok(DataSourceExec::from_data_source(source)) + } +} + +impl RangePartitionedTable { + fn output_partitioning( + &self, + projection: Option<&Vec>, + projected_schema: &SchemaRef, + ) -> Result { + let Some(projected_range_index) = + projected_index(self.range_column_index, projection) + else { + return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); + }; + + let range_column = projected_schema.field(projected_range_index).name(); + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + physical_col(range_column, projected_schema)?, + SortOptions::default(), + )]) + .expect("range ordering should not be empty"); + + Ok(Partitioning::Range(RangePartitioning::try_new( + ordering, + self.split_points.clone(), + )?)) + } +} + +fn projected_index( + column_index: usize, + projection: Option<&Vec>, +) -> Option { + projection + .map(|projection| projection.iter().position(|idx| *idx == column_index)) + .unwrap_or(Some(column_index)) +} + +#[derive(Clone, Debug)] +struct RangePartitionedSource { + inner: MemorySourceConfig, + output_partitioning: Partitioning, +} + +impl DataSource for RangePartitionedSource { + fn open( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.inner.open(partition, context) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_as(t, f)?; + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, ", output_partitioning={}", self.output_partitioning) + } + DisplayFormatType::TreeRender => Ok(()), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.output_partitioning.clone() + } + + fn eq_properties(&self) -> EquivalenceProperties { + self.inner.eq_properties() + } + + fn scheduling_type(&self) -> SchedulingType { + self.inner.scheduling_type() + } + + fn partition_statistics(&self, partition: Option) -> Result> { + self.inner.partition_statistics(partition) + } + + fn with_fetch(&self, limit: Option) -> Option> { + Some(Arc::new(Self { + inner: self.inner.clone().with_limit(limit), + output_partitioning: self.output_partitioning.clone(), + })) + } + + fn fetch(&self) -> Option { + self.inner.fetch() + } + + fn try_swapping_with_projection( + &self, + _projection: &ProjectionExprs, + ) -> Result>> { + // Range partitioning metadata is projection-sensitive. This fixture + // computes it in TableProvider::scan, so do not rewrite later + // ProjectionExec nodes into the source. + Ok(None) + } +} + +fn register_range_partitioned_table(ctx: &SessionContext) { + let schema = Arc::new(Schema::new(vec![ + Field::new("range_key", DataType::Int32, false), + Field::new("non_range_key", DataType::Int32, false), + Field::new("value", DataType::Int32, false), + ])); + let partitions = vec![ + vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], + vec![range_partition_batch( + &schema, + &[10, 15], + &[1, 2], + &[100, 150], + )], + vec![range_partition_batch( + &schema, + &[20, 25], + &[1, 2], + &[200, 250], + )], + vec![range_partition_batch( + &schema, + &[30, 35], + &[1, 2], + &[300, 350], + )], + ]; + let split_points = vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ]; + let table = RangePartitionedTable { + schema, + partitions, + range_column_index: 0, + split_points, + }; + + ctx.register_table("range_partitioned", Arc::new(table)) + .expect("range partitioned table registration should succeed"); +} + +fn range_partition_batch( + schema: &SchemaRef, + range_key: &[i32], + non_range_key: &[i32], + value: &[i32], +) -> RecordBatch { + RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(Int32Array::from(range_key.to_vec())), + Arc::new(Int32Array::from(non_range_key.to_vec())), + Arc::new(Int32Array::from(value.to_vec())), + ], + ) + .expect("range partition batch should be valid") +} + #[cfg(feature = "avro")] pub async fn register_avro_tables(ctx: &mut TestContext) { use datafusion::prelude::AvroReadOptions; diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt new file mode 100644 index 0000000000000..f92701d291f17 --- /dev/null +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) +# as an in-memory source with four physical source partitions: +# +# partition 0: range_key in [-inf, 10), rows (1, 1, 10), (5, 2, 50) +# partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) +# partition 2: range_key in [20, 30), rows (20, 1, 200), (25, 2, 250) +# partition 3: range_key in [30, +inf), rows (30, 1, 300), (35, 2, 350) + +statement ok +set datafusion.explain.physical_plan_only = true; + +########## +# TEST 1: Aggregate on Range Partition Column +# Scanning range_key preserves source Range partitioning metadata. +# Planning still inserts Hash repartitioning today; later optimizer PRs can +# use this baseline to show when the repartition is removed. +########## + +query TT +EXPLAIN SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key; +---- +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] +02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], aggr=[sum(range_partitioned.value)] +04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) + +query II +SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER BY range_key; +---- +1 10 +5 50 +10 100 +15 150 +20 200 +25 250 +30 300 +35 350 + + +########## +# TEST 2: Aggregate on Non-Range Column +# Projecting away range_key means the scan output no longer contains the +# expression needed to describe range partitioning, so it reports +# UnknownPartitioning with the same partition count. +########## + +query TT +EXPLAIN SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key; +---- +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] +02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], aggr=[sum(range_partitioned.value)] +04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=UnknownPartitioning(4) + +query II +SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key ORDER BY non_range_key; +---- +1 610 +2 800 + + +########## +# TEST 3: Join on Range Partition Column +# Both inputs expose Range partitioning on range_key. Join planning currently +# reaches the unsupported Range output-partitioning path; later optimizer PRs +# can replace this baseline with a successful plan and result test. +########## + +query error This feature is not implemented: Join output partitioning with range partitioning is not implemented +SELECT l.range_key, l.value, r.value +FROM range_partitioned l +JOIN range_partitioned r ON l.range_key = r.range_key; + +########## +# TEST 4: Union of Range Partitioned Inputs +# Each input exposes Range partitioning on range_key. This records current +# UNION ALL behavior before later PRs decide whether compatible range inputs can +# preserve Range partitioning across the union. +########## + +query TT +EXPLAIN SELECT range_key, value FROM range_partitioned +UNION ALL +SELECT range_key, value FROM range_partitioned; +---- +physical_plan +01)UnionExec +02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) +03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4) + +query II +SELECT range_key, value FROM range_partitioned +UNION ALL +SELECT range_key, value FROM range_partitioned +ORDER BY range_key, value; +---- +1 10 +1 10 +5 50 +5 50 +10 100 +10 100 +15 150 +15 150 +20 200 +20 200 +25 250 +25 250 +30 300 +30 300 +35 350 +35 350 + +statement ok +reset datafusion.explain.physical_plan_only; From 91ab2ab213f53c40626caaccd762726bd1baefbb Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Fri, 29 May 2026 07:24:18 -0400 Subject: [PATCH 2/3] Clarify range partitioning SLT fixture comments --- datafusion/physical-expr/src/partitioning.rs | 5 +++-- datafusion/sqllogictest/src/test_context.rs | 2 ++ datafusion/sqllogictest/test_files/range_partitioning.slt | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index f58ce12d04b83..616b4905b497b 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -171,8 +171,9 @@ impl Display for Partitioning { /// Values equal to split point `i` belong to partition `i + 1`, so interior /// partitions are lower-inclusive and upper-exclusive. /// -/// If a source declares range partitioning, it is responsible for placing each -/// row in the partition described by the split points, DataFusion will not validate this is +/// Like other user-specified data properties such as sortedness, if a source +/// declares range partitioning, it is responsible for placing each row in the +/// partition described by the split points. DataFusion will not validate this is /// upheld. /// /// For a single range key: diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 031eb56b9aec7..87e04d1d92e5a 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -307,6 +307,8 @@ fn register_strict_schema_provider(ctx: &SessionContext) { // Range Partitioned Table (sqllogictest-only) // ============================================================================== +/// Simple range-partitioned table for testing before declaring such tables is +/// supported via SQL. #[derive(Debug)] struct RangePartitionedTable { schema: SchemaRef, diff --git a/datafusion/sqllogictest/test_files/range_partitioning.slt b/datafusion/sqllogictest/test_files/range_partitioning.slt index f92701d291f17..a61f17a039eb8 100644 --- a/datafusion/sqllogictest/test_files/range_partitioning.slt +++ b/datafusion/sqllogictest/test_files/range_partitioning.slt @@ -18,10 +18,10 @@ # The sqllogictest harness registers range_partitioned(range_key, non_range_key, value) # as an in-memory source with four physical source partitions: # -# partition 0: range_key in [-inf, 10), rows (1, 1, 10), (5, 2, 50) +# partition 0: range_key in [..., 10), rows (1, 1, 10), (5, 2, 50) # partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150) # partition 2: range_key in [20, 30), rows (20, 1, 200), (25, 2, 250) -# partition 3: range_key in [30, +inf), rows (30, 1, 300), (35, 2, 350) +# partition 3: range_key in [30, ...), rows (30, 1, 300), (35, 2, 350) statement ok set datafusion.explain.physical_plan_only = true; From fd632b266a9cc483f9f6d2a27f37fa196779eb8d Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Fri, 29 May 2026 08:29:19 -0400 Subject: [PATCH 3/3] Move range partitioning SLT fixture to module --- datafusion/sqllogictest/src/test_context.rs | 229 +--------------- .../src/test_context/range_partitioning.rs | 250 ++++++++++++++++++ 2 files changed, 256 insertions(+), 223 deletions(-) create mode 100644 datafusion/sqllogictest/src/test_context/range_partitioning.rs diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 87e04d1d92e5a..a83db2bfb947f 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::HashMap; -use std::fmt; use std::fs::File; use std::io::Write; use std::path::Path; @@ -29,7 +28,6 @@ use arrow::array::{ TimestampNanosecondArray, UInt32Array, UnionArray, }; use arrow::buffer::ScalarBuffer; -use arrow::compute::SortOptions; use arrow::datatypes::{ DataType, Field, FieldRef, Fields, Schema, SchemaRef, TimeUnit, UInt32Type, UnionFields, @@ -38,9 +36,7 @@ use arrow::record_batch::RecordBatch; use datafusion::catalog::{ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, Session, }; -use datafusion::common::{DataFusionError, Result, ScalarValue, not_impl_err}; -use datafusion::datasource::source::{DataSource, DataSourceExec}; -use datafusion::execution::context::TaskContext; +use datafusion::common::{DataFusionError, Result, not_impl_err}; use datafusion::functions::math::abs; use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; use datafusion::logical_expr::planner::TypePlanner; @@ -48,24 +44,17 @@ use datafusion::logical_expr::{ ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, create_udf, }; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_expr::expressions::col as physical_col; -use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::execution_plan::SchedulingType; -use datafusion::physical_plan::projection::ProjectionExprs; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, - SendableRecordBatchStream, SplitPoint, Statistics, project_schema, -}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use datafusion::{ datasource::{MemTable, TableProvider, TableType}, prelude::{CsvReadOptions, SessionContext}, }; -use datafusion_datasource::memory::MemorySourceConfig; use datafusion_spark::SessionStateBuilderSpark; use crate::is_spark_path; +use range_partitioning::register_range_partitioned_table; + use async_trait::async_trait; use datafusion::common::cast::as_float64_array; use datafusion::execution::SessionStateBuilder; @@ -74,6 +63,8 @@ use log::info; use sqlparser::ast; use tempfile::TempDir; +mod range_partitioning; + /// Context for running tests pub struct TestContext { /// Context for running queries @@ -303,214 +294,6 @@ fn register_strict_schema_provider(ctx: &SessionContext) { ); } -// ============================================================================== -// Range Partitioned Table (sqllogictest-only) -// ============================================================================== - -/// Simple range-partitioned table for testing before declaring such tables is -/// supported via SQL. -#[derive(Debug)] -struct RangePartitionedTable { - schema: SchemaRef, - partitions: Vec>, - range_column_index: usize, - split_points: Vec, -} - -#[async_trait] -impl TableProvider for RangePartitionedTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let projected_schema = project_schema(&self.schema, projection)?; - let mut source = MemorySourceConfig::try_new( - &self.partitions, - Arc::clone(&self.schema), - projection.cloned(), - )?; - source = source.with_show_sizes(state.config_options().explain.show_sizes); - - let output_partitioning = - self.output_partitioning(projection, &projected_schema)?; - let source = RangePartitionedSource { - inner: source, - output_partitioning, - }; - - Ok(DataSourceExec::from_data_source(source)) - } -} - -impl RangePartitionedTable { - fn output_partitioning( - &self, - projection: Option<&Vec>, - projected_schema: &SchemaRef, - ) -> Result { - let Some(projected_range_index) = - projected_index(self.range_column_index, projection) - else { - return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); - }; - - let range_column = projected_schema.field(projected_range_index).name(); - let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( - physical_col(range_column, projected_schema)?, - SortOptions::default(), - )]) - .expect("range ordering should not be empty"); - - Ok(Partitioning::Range(RangePartitioning::try_new( - ordering, - self.split_points.clone(), - )?)) - } -} - -fn projected_index( - column_index: usize, - projection: Option<&Vec>, -) -> Option { - projection - .map(|projection| projection.iter().position(|idx| *idx == column_index)) - .unwrap_or(Some(column_index)) -} - -#[derive(Clone, Debug)] -struct RangePartitionedSource { - inner: MemorySourceConfig, - output_partitioning: Partitioning, -} - -impl DataSource for RangePartitionedSource { - fn open( - &self, - partition: usize, - context: Arc, - ) -> Result { - self.inner.open(partition, context) - } - - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt_as(t, f)?; - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, ", output_partitioning={}", self.output_partitioning) - } - DisplayFormatType::TreeRender => Ok(()), - } - } - - fn output_partitioning(&self) -> Partitioning { - self.output_partitioning.clone() - } - - fn eq_properties(&self) -> EquivalenceProperties { - self.inner.eq_properties() - } - - fn scheduling_type(&self) -> SchedulingType { - self.inner.scheduling_type() - } - - fn partition_statistics(&self, partition: Option) -> Result> { - self.inner.partition_statistics(partition) - } - - fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(Self { - inner: self.inner.clone().with_limit(limit), - output_partitioning: self.output_partitioning.clone(), - })) - } - - fn fetch(&self) -> Option { - self.inner.fetch() - } - - fn try_swapping_with_projection( - &self, - _projection: &ProjectionExprs, - ) -> Result>> { - // Range partitioning metadata is projection-sensitive. This fixture - // computes it in TableProvider::scan, so do not rewrite later - // ProjectionExec nodes into the source. - Ok(None) - } -} - -fn register_range_partitioned_table(ctx: &SessionContext) { - let schema = Arc::new(Schema::new(vec![ - Field::new("range_key", DataType::Int32, false), - Field::new("non_range_key", DataType::Int32, false), - Field::new("value", DataType::Int32, false), - ])); - let partitions = vec![ - vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], - vec![range_partition_batch( - &schema, - &[10, 15], - &[1, 2], - &[100, 150], - )], - vec![range_partition_batch( - &schema, - &[20, 25], - &[1, 2], - &[200, 250], - )], - vec![range_partition_batch( - &schema, - &[30, 35], - &[1, 2], - &[300, 350], - )], - ]; - let split_points = vec![ - SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), - SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), - ]; - let table = RangePartitionedTable { - schema, - partitions, - range_column_index: 0, - split_points, - }; - - ctx.register_table("range_partitioned", Arc::new(table)) - .expect("range partitioned table registration should succeed"); -} - -fn range_partition_batch( - schema: &SchemaRef, - range_key: &[i32], - non_range_key: &[i32], - value: &[i32], -) -> RecordBatch { - RecordBatch::try_new( - Arc::clone(schema), - vec![ - Arc::new(Int32Array::from(range_key.to_vec())), - Arc::new(Int32Array::from(non_range_key.to_vec())), - Arc::new(Int32Array::from(value.to_vec())), - ], - ) - .expect("range partition batch should be valid") -} - #[cfg(feature = "avro")] pub async fn register_avro_tables(ctx: &mut TestContext) { use datafusion::prelude::AvroReadOptions; diff --git a/datafusion/sqllogictest/src/test_context/range_partitioning.rs b/datafusion/sqllogictest/src/test_context/range_partitioning.rs new file mode 100644 index 0000000000000..88e49708baf60 --- /dev/null +++ b/datafusion/sqllogictest/src/test_context/range_partitioning.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; +use std::sync::Arc; + +use arrow::array::Int32Array; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion::catalog::Session; +use datafusion::common::{Result, ScalarValue, project_schema}; +use datafusion::datasource::source::{DataSource, DataSourceExec}; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::Expr; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_expr::expressions::col as physical_col; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::execution_plan::SchedulingType; +use datafusion::physical_plan::projection::ProjectionExprs; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RangePartitioning, + SendableRecordBatchStream, SplitPoint, Statistics, +}; +use datafusion::prelude::SessionContext; +use datafusion_datasource::memory::MemorySourceConfig; + +// ============================================================================== +// Range Partitioned Table (sqllogictest-only) +// ============================================================================== + +/// Simple range-partitioned table for testing before declaring such tables is +/// supported via SQL. +#[derive(Debug)] +struct RangePartitionedTable { + schema: SchemaRef, + partitions: Vec>, + range_column_index: usize, + split_points: Vec, +} + +#[async_trait] +impl TableProvider for RangePartitionedTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let projected_schema = project_schema(&self.schema, projection)?; + let mut source = MemorySourceConfig::try_new( + &self.partitions, + Arc::clone(&self.schema), + projection.cloned(), + )?; + source = source.with_show_sizes(state.config_options().explain.show_sizes); + + let output_partitioning = + self.output_partitioning(projection, &projected_schema)?; + let source = RangePartitionedSource { + inner: source, + output_partitioning, + }; + + Ok(DataSourceExec::from_data_source(source)) + } +} + +impl RangePartitionedTable { + fn output_partitioning( + &self, + projection: Option<&Vec>, + projected_schema: &SchemaRef, + ) -> Result { + let Some(projected_range_index) = + projected_index(self.range_column_index, projection) + else { + return Ok(Partitioning::UnknownPartitioning(self.partitions.len())); + }; + + let range_column = projected_schema.field(projected_range_index).name(); + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + physical_col(range_column, projected_schema)?, + SortOptions::default(), + )]) + .expect("range ordering should not be empty"); + + Ok(Partitioning::Range(RangePartitioning::try_new( + ordering, + self.split_points.clone(), + )?)) + } +} + +fn projected_index( + column_index: usize, + projection: Option<&Vec>, +) -> Option { + projection + .map(|projection| projection.iter().position(|idx| *idx == column_index)) + .unwrap_or(Some(column_index)) +} + +#[derive(Clone, Debug)] +struct RangePartitionedSource { + inner: MemorySourceConfig, + output_partitioning: Partitioning, +} + +impl DataSource for RangePartitionedSource { + fn open( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.inner.open(partition, context) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_as(t, f)?; + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, ", output_partitioning={}", self.output_partitioning) + } + DisplayFormatType::TreeRender => Ok(()), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.output_partitioning.clone() + } + + fn eq_properties(&self) -> EquivalenceProperties { + self.inner.eq_properties() + } + + fn scheduling_type(&self) -> SchedulingType { + self.inner.scheduling_type() + } + + fn partition_statistics(&self, partition: Option) -> Result> { + self.inner.partition_statistics(partition) + } + + fn with_fetch(&self, limit: Option) -> Option> { + Some(Arc::new(Self { + inner: self.inner.clone().with_limit(limit), + output_partitioning: self.output_partitioning.clone(), + })) + } + + fn fetch(&self) -> Option { + self.inner.fetch() + } + + fn try_swapping_with_projection( + &self, + _projection: &ProjectionExprs, + ) -> Result>> { + // Range partitioning metadata is projection-sensitive. This fixture + // computes it in TableProvider::scan, so do not rewrite later + // ProjectionExec nodes into the source. + Ok(None) + } +} + +pub(super) fn register_range_partitioned_table(ctx: &SessionContext) { + let schema = Arc::new(Schema::new(vec![ + Field::new("range_key", DataType::Int32, false), + Field::new("non_range_key", DataType::Int32, false), + Field::new("value", DataType::Int32, false), + ])); + let partitions = vec![ + vec![range_partition_batch(&schema, &[1, 5], &[1, 2], &[10, 50])], + vec![range_partition_batch( + &schema, + &[10, 15], + &[1, 2], + &[100, 150], + )], + vec![range_partition_batch( + &schema, + &[20, 25], + &[1, 2], + &[200, 250], + )], + vec![range_partition_batch( + &schema, + &[30, 35], + &[1, 2], + &[300, 350], + )], + ]; + let split_points = vec![ + SplitPoint::new(vec![ScalarValue::Int32(Some(10))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(20))]), + SplitPoint::new(vec![ScalarValue::Int32(Some(30))]), + ]; + let table = RangePartitionedTable { + schema, + partitions, + range_column_index: 0, + split_points, + }; + + ctx.register_table("range_partitioned", Arc::new(table)) + .expect("range partitioned table registration should succeed"); +} + +fn range_partition_batch( + schema: &SchemaRef, + range_key: &[i32], + non_range_key: &[i32], + value: &[i32], +) -> RecordBatch { + RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(Int32Array::from(range_key.to_vec())), + Arc::new(Int32Array::from(non_range_key.to_vec())), + Arc::new(Int32Array::from(value.to_vec())), + ], + ) + .expect("range partition batch should be valid") +}