From 7f0949d702c357a7ee2fd82cc3ca793cea05afca Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Thu, 4 Jun 2026 09:36:10 -0400 Subject: [PATCH] Add logical range partitioning representation --- datafusion/common/src/lib.rs | 2 + datafusion/common/src/partitioning.rs | 104 +++++++ datafusion/core/src/physical_planner.rs | 32 +- datafusion/expr/src/logical_plan/display.rs | 17 ++ datafusion/expr/src/logical_plan/mod.rs | 6 +- datafusion/expr/src/logical_plan/plan.rs | 289 +++++++++++++++++- datafusion/expr/src/logical_plan/tree_node.rs | 15 + datafusion/physical-expr/src/lib.rs | 3 +- datafusion/physical-expr/src/partitioning.rs | 142 +-------- datafusion/proto/src/logical_plan/mod.rs | 3 + .../logical_plan/producer/rel/exchange_rel.rs | 6 + 11 files changed, 475 insertions(+), 144 deletions(-) create mode 100644 datafusion/common/src/partitioning.rs diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index e865c548bb554..2f6d9848b6e55 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -30,6 +30,7 @@ mod dfschema; mod functional_dependencies; mod join_type; mod param_value; +mod partitioning; mod schema_reference; mod table_reference; mod unnest; @@ -92,6 +93,7 @@ pub use join_type::{JoinConstraint, JoinSide, JoinType}; pub use nested_struct::cast_column; pub use null_equality::NullEquality; pub use param_value::ParamValues; +pub use partitioning::{SplitPoint, validate_range_split_points}; pub use scalar::{ScalarType, ScalarValue}; pub use schema_reference::SchemaReference; pub use spans::{Location, Span, Spans}; diff --git a/datafusion/common/src/partitioning.rs b/datafusion/common/src/partitioning.rs new file mode 100644 index 0000000000000..8a7212c2e3089 --- /dev/null +++ b/datafusion/common/src/partitioning.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::compare_rows; +use crate::{Result, ScalarValue, error::_plan_err}; +use arrow::compute::SortOptions; +use std::cmp::Ordering; +use std::fmt::{self, Display}; + +/// A boundary between adjacent range partitions. +/// +/// A split point is a tuple with one [`ScalarValue`] per partitioning +/// expression. Split points are interpreted lexicographically according to the +/// ordering of the range partitioning that owns them. +/// +/// `N` split points define `N + 1` partitions: +/// +/// ```text +/// partition 0: key < split_points[0] +/// partition 1: split_points[0] <= key < split_points[1] +/// ... +/// partition N - 1: split_points[N - 2] <= key < split_points[N - 1] +/// partition N: split_points[N - 1] <= key +/// ``` +/// +/// Values equal to split point `i` belong to partition `i + 1`, so interior +/// partitions are lower-inclusive and upper-exclusive. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub struct SplitPoint { + values: Vec, +} + +impl SplitPoint { + /// Creates a new split point from its tuple values. + pub fn new(values: Vec) -> Self { + Self { values } + } + + /// Returns the tuple values for this split point. + pub fn values(&self) -> &[ScalarValue] { + &self.values + } +} + +impl Display for SplitPoint { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let values = self + .values + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "); + write!(f, "({values})") + } +} + +/// Validates that split points match the ordering width and are strictly +/// ordered according to the provided sort options. +pub fn validate_range_split_points( + split_points: &[SplitPoint], + sort_options: &[SortOptions], +) -> Result<()> { + let width = sort_options.len(); + for (idx, split_point) in split_points.iter().enumerate() { + let split_point_width = split_point.values().len(); + if split_point_width != width { + return _plan_err!( + "Range partitioning split point {idx} has width {split_point_width}, but ordering has width {width}" + ); + } + } + + for (idx, split_points) in split_points.windows(2).enumerate() { + if compare_rows( + split_points[0].values(), + split_points[1].values(), + sort_options, + )? != Ordering::Less + { + return _plan_err!( + "Range partitioning split points must be strictly ordered: split point {idx} ({}) must be less than split point {} ({})", + split_points[0], + idx + 1, + split_points[1] + ); + } + } + + Ok(()) +} diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index dd741ee6ff12e..8129ad6502397 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1264,6 +1264,11 @@ impl DefaultPhysicalPlanner { .collect::>>()?; Partitioning::Hash(runtime_expr, *n) } + LogicalPartitioning::Range(_) => { + return not_impl_err!( + "Physical plan does not support Range repartitioning" + ); + } LogicalPartitioning::DistributeBy(_) => { return not_impl_err!( "Physical plan does not support DistributeBy partitioning" @@ -3245,8 +3250,8 @@ mod tests { use arrow_schema::{FieldRef, SchemaRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::{ - DFSchemaRef, ScalarValue, TableReference, ToDFSchema as _, assert_batches_eq, - assert_contains, + DFSchemaRef, ScalarValue, SplitPoint, TableReference, ToDFSchema as _, + assert_batches_eq, assert_contains, }; use datafusion_execution::TaskContext; use datafusion_execution::runtime_env::RuntimeEnv; @@ -3255,8 +3260,8 @@ mod tests { use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::{ Accumulator, AggregateUDF, AggregateUDFImpl, ExprFunctionExt, LogicalPlanBuilder, - Signature, TableSource, UserDefinedLogicalNodeCore, Volatility, - WindowFunctionDefinition, col, lit, + RangePartitioning, Signature, TableSource, UserDefinedLogicalNodeCore, + Volatility, WindowFunctionDefinition, col, lit, }; use datafusion_functions_aggregate::count::{count_all, count_udaf}; use datafusion_functions_aggregate::expr_fn::sum; @@ -3300,6 +3305,25 @@ mod tests { aggregate_explain(&logical_plan).await } + #[tokio::test] + async fn logical_range_repartition_is_not_supported() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let logical_plan = scan_empty(None, &schema, None)? + .repartition(LogicalPartitioning::Range(RangePartitioning::try_new( + vec![col("a").sort(true, true)], + vec![SplitPoint::new(vec![ScalarValue::Int32(Some(10))])], + )?))? + .build()?; + + let err = plan(&logical_plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Physical plan does not support Range repartitioning" + ); + + Ok(()) + } + fn int64_field(name: &str, nullable: bool) -> Field { Field::new(name, DataType::Int64, nullable) } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 58c7feb616179..27b86a6d8cdd5 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -515,6 +515,23 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Partitioning Key": hash_expr }) } + Partitioning::Range(range) => { + let range_expr: Vec = + range.ordering().iter().map(|e| format!("{e}")).collect(); + let split_points: Vec = range + .split_points() + .iter() + .map(|e| format!("{e}")) + .collect(); + + json!({ + "Node Type": "Repartition", + "Partitioning Scheme": "Range", + "Partition Count": range.partition_count(), + "Partitioning Key": range_expr, + "Split Points": split_points + }) + } Partitioning::DistributeBy(expr) => { let dist_by_expr: Vec = expr.iter().map(|e| format!("{e}")).collect(); diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5087b25178ab6..e0e51d7e470c3 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -41,9 +41,9 @@ pub use plan::{ Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, - RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, - SubqueryAlias, TableScan, TableScanBuilder, ToStringifiedPlan, Union, Unnest, Values, - Window, projection_schema, + RangePartitioning, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, + Subquery, SubqueryAlias, TableScan, TableScanBuilder, ToStringifiedPlan, Union, + Unnest, Values, Window, projection_schema, }; pub use statement::{ Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1bfecd06c2228..84f8e19cb4337 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -51,6 +51,7 @@ use crate::{ }; use crate::statistics::StatisticsRequest; +use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion_common::cse::{NormalizeEq, Normalizeable}; use datafusion_common::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType}; @@ -61,8 +62,9 @@ use datafusion_common::tree_node::{ use datafusion_common::{ Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result, - ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies, - assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err, + ScalarValue, Spans, SplitPoint, TableReference, UnnestOptions, + aggregate_functional_dependencies, assert_eq_or_internal_err, assert_or_internal_err, + internal_err, plan_err, validate_range_split_points, }; use indexmap::IndexSet; @@ -869,6 +871,32 @@ impl LogicalPlan { input: Arc::new(input), })) } + Partitioning::Range(range) => { + if expr.len() != range.ordering().len() { + return internal_err!( + "Incorrect number of expressions for Range partitioning" + ); + } + let input = self.only_input(inputs)?; + let ordering = range + .ordering() + .iter() + .zip(expr) + .map(|(sort_expr, expr)| SortExpr { + expr, + asc: sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }) + .collect(); + let range = RangePartitioning::try_new( + ordering, + range.split_points().to_vec(), + )?; + Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::Range(range), + input: Arc::new(input), + })) + } Partitioning::DistributeBy(_) => { let input = self.only_input(inputs)?; Ok(LogicalPlan::Repartition(Repartition { @@ -2101,6 +2129,9 @@ impl LogicalPlan { n ) } + Partitioning::Range(range) => { + write!(f, "Repartition: {range}") + } Partitioning::DistributeBy(expr) => { let dist_by_expr: Vec = expr.iter().map(|e| format!("{e}")).collect(); @@ -4397,11 +4428,16 @@ impl Debug for Subquery { } } -/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`] +/// Logical partitioning schemes. /// -/// See [`Partitioning`] for more details on partitioning +/// A scheme can describe either requested repartitioning in +/// [`LogicalPlan::Repartition`] or a partitioning property declared by a source. +/// Some schemes are only valid as metadata until planner support is added. /// -/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html# +/// For physical execution partitioning, see +/// [`datafusion_physical_expr::Partitioning`]. +/// +/// [`datafusion_physical_expr::Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html# #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] pub enum Partitioning { /// Allocate batches using a round-robin algorithm and the specified number of partitions @@ -4409,10 +4445,125 @@ pub enum Partitioning { /// Allocate rows based on a hash of one of more expressions and the specified number /// of partitions. Hash(Vec, usize), + /// Partition rows by ranges. + /// See [`RangePartitioning`] for the logical contract. + Range(RangePartitioning), /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions DistributeBy(Vec), } +impl Partitioning { + /// Return the number of partitions, if known. + pub fn partition_count(&self) -> Option { + match self { + Self::RoundRobinBatch(partition_count) | Self::Hash(_, partition_count) => { + Some(*partition_count) + } + Self::Range(range) => Some(range.partition_count()), + Self::DistributeBy(_) => None, + } + } +} + +/// Logical range partitioning. +/// +/// [`RangePartitioning`] describes an ordered logical key space with split points. +/// +/// - `ordering` defines the partitioning key and ordering using logical +/// [`SortExpr`]s. +/// - `split_points` define the boundaries between adjacent partitions. +/// +/// Comparisons use the lexicographic order defined by `ordering`, +/// including `ASC`/`DESC` and null ordering. Split points must be ordered +/// according to that ordering, and each split point must have one value per +/// ordering expression. See [`SplitPoint`] for the shared boundary contract. +/// +/// The expressions are resolved against the declaring plan's schema. This +/// constructor does not validate split point value types against the resolved +/// expression types. Like other user-specified data properties such as +/// sortedness, if a source declares range partitioning, it is responsible for +/// placing each row in the partition described by the split points. DataFusion +/// will not validate this is upheld. +/// +/// NOTE: Planning [`LogicalPlan::Repartition`] with range partitioning is not +/// currently supported. Range-aware optimizer and execution behavior will be +/// introduced incrementally. See +/// . +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub struct RangePartitioning { + /// Ordered logical partitioning key. + ordering: Vec, + /// Boundaries between adjacent partitions. + split_points: Vec, +} + +impl RangePartitioning { + /// Creates logical range partitioning metadata and validates split point + /// shape and ordering. + pub fn try_new( + ordering: Vec, + split_points: Vec, + ) -> Result { + if ordering.is_empty() { + return plan_err!("Range partitioning requires non-empty ordering"); + } + + validate_range_split_points(&split_points, &logical_sort_options(&ordering))?; + + Ok(Self { + ordering, + split_points, + }) + } + + /// Return the number of partitions. + pub fn partition_count(&self) -> usize { + self.split_points.len() + 1 + } + + /// Returns the ordering that defines the range key. + pub fn ordering(&self) -> &[SortExpr] { + &self.ordering + } + + /// Returns the ordered split points between partitions. + pub fn split_points(&self) -> &[SplitPoint] { + &self.split_points + } +} + +fn logical_sort_options(ordering: &[SortExpr]) -> Vec { + ordering + .iter() + .map(|sort_expr| SortOptions { + descending: !sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }) + .collect() +} + +impl Display for RangePartitioning { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let ordering = self + .ordering() + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "); + let split_points = self + .split_points() + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "); + write!( + f, + "Range([{ordering}], [{split_points}], {})", + self.partition_count() + ) + } +} + /// Represent the unnesting operation on a list column, such as the recursion depth and /// the output column name after unnesting /// @@ -4750,6 +4901,134 @@ mod tests { ]) } + fn i32_split_point(value: i32) -> SplitPoint { + SplitPoint::new(vec![ScalarValue::Int32(Some(value))]) + } + + fn null_i32_split_point() -> SplitPoint { + SplitPoint::new(vec![ScalarValue::Int32(None)]) + } + + #[test] + fn logical_range_partitioning_validates_shape() { + let range = RangePartitioning::try_new( + vec![col("id").sort(true, true)], + vec![i32_split_point(10), i32_split_point(20)], + ) + .unwrap(); + assert_eq!(range.partition_count(), 3); + + let range = RangePartitioning::try_new( + vec![col("id").sort(false, true)], + vec![i32_split_point(20), i32_split_point(10)], + ) + .unwrap(); + assert_eq!(range.partition_count(), 3); + + let err = RangePartitioning::try_new(vec![], vec![]).unwrap_err(); + assert!(err.to_string().contains("non-empty ordering")); + + let err = RangePartitioning::try_new( + vec![col("id").sort(true, true), col("salary").sort(true, true)], + vec![i32_split_point(10)], + ) + .unwrap_err(); + assert!( + err.to_string() + .contains("split point 0 has width 1, but ordering has width 2") + ); + + let err = RangePartitioning::try_new( + vec![col("id").sort(true, true)], + vec![i32_split_point(20), i32_split_point(10)], + ) + .unwrap_err(); + assert!( + err.to_string() + .contains("split points must be strictly ordered") + ); + + let err = RangePartitioning::try_new( + vec![col("id").sort(true, true)], + vec![i32_split_point(10), i32_split_point(10)], + ) + .unwrap_err(); + assert!( + err.to_string() + .contains("split points must be strictly ordered") + ); + + let range = RangePartitioning::try_new( + vec![col("id").sort(true, true)], + vec![null_i32_split_point(), i32_split_point(10)], + ) + .unwrap(); + assert_eq!(range.partition_count(), 3); + } + + #[test] + fn logical_partitioning_reports_known_partition_count() -> Result<()> { + let range = RangePartitioning::try_new( + vec![col("id").sort(true, true)], + vec![i32_split_point(10)], + )?; + + assert_eq!(Partitioning::RoundRobinBatch(4).partition_count(), Some(4)); + assert_eq!( + Partitioning::Hash(vec![col("id")], 8).partition_count(), + Some(8) + ); + assert_eq!(Partitioning::Range(range).partition_count(), Some(2)); + assert_eq!( + Partitioning::DistributeBy(vec![col("id")]).partition_count(), + None + ); + + Ok(()) + } + + #[test] + fn logical_range_partitioning_participates_in_expression_rewrite() -> Result<()> { + let input = + table_scan(Some("employee_csv"), &employee_schema(), None)?.build()?; + let plan = LogicalPlan::Repartition(Repartition { + input: Arc::new(input), + partitioning_scheme: Partitioning::Range(RangePartitioning::try_new( + vec![col("id").sort(true, true)], + vec![i32_split_point(10)], + )?), + }); + + let mut visited_exprs = vec![]; + plan.apply_expressions(|expr| { + visited_exprs.push(expr.to_string()); + Ok(TreeNodeRecursion::Continue) + })?; + assert_eq!(visited_exprs, vec!["id"]); + + let plan = plan + .map_expressions(|expr| { + if expr == col("id") { + Ok(Transformed::yes(col("salary"))) + } else { + Ok(Transformed::no(expr)) + } + })? + .data; + + let LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::Range(range), + .. + }) = plan + else { + unreachable!("expected range repartition"); + }; + assert_eq!(range.ordering()[0].expr, col("salary")); + assert_eq!(range.partition_count(), 2); + + Ok(()) + } + fn display_plan() -> Result { let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))? .build()?; diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index e0cdec9e2c088..cba2dac24b610 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -37,6 +37,7 @@ //! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions +use crate::logical_plan::plan::RangePartitioning; use crate::{ Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit, @@ -427,6 +428,7 @@ impl LogicalPlan { Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => { expr.apply_elements(f) } + Partitioning::Range(range) => range.ordering().to_vec().apply_elements(f), Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue), }, LogicalPlan::Window(Window { window_expr, .. }) => { @@ -532,6 +534,19 @@ impl LogicalPlan { Partitioning::DistributeBy(expr) => expr .map_elements(f)? .update_data(Partitioning::DistributeBy), + Partitioning::Range(range) => { + let split_points = range.split_points().to_vec(); + range + .ordering() + .to_vec() + .map_elements(f)? + .map_data(|ordering| { + Ok(Partitioning::Range(RangePartitioning::try_new( + ordering, + split_points, + )?)) + })? + } Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme), } .update_data(|partitioning_scheme| { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index c82d1c64dd0d9..b55bd70bdf185 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -58,11 +58,12 @@ pub mod execution_props { pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{AnalysisContext, ExprBoundaries, analyze}; +pub use datafusion_common::SplitPoint; pub use equivalence::{ AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union, }; pub use expressions::{DynamicFilterTracker, DynamicFilterTracking}; -pub use partitioning::{Distribution, Partitioning, RangePartitioning, SplitPoint}; +pub use partitioning::{Distribution, Partitioning, RangePartitioning}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, create_ordering, create_physical_sort_expr, create_physical_sort_exprs, diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 616b4905b497b..fbf39e05d1ec3 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -21,10 +21,10 @@ use crate::{ EquivalenceProperties, PhysicalExpr, equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal, }; -use datafusion_common::{Result, ScalarValue, plan_err}; +pub use datafusion_common::SplitPoint; +use datafusion_common::{Result, validate_range_split_points}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; -use std::cmp::Ordering; use std::fmt; use std::fmt::Display; use std::sync::Arc; @@ -156,20 +156,7 @@ impl Display for Partitioning { /// Comparisons use the lexicographic order defined by `ordering`, including /// `ASC`/`DESC` and null ordering. Split points must be strictly ordered /// according to that ordering, and each split point must have one value per -/// ordering expression. -/// -/// `N` split points define `N + 1` partitions: -/// -/// ```text -/// partition 0: key < split_points[0] -/// partition 1: split_points[0] <= key < split_points[1] -/// ... -/// partition N - 1: split_points[N - 2] <= key < split_points[N - 1] -/// partition N: split_points[N - 1] <= key -/// ``` -/// -/// Values equal to split point `i` belong to partition `i + 1`, so interior -/// partitions are lower-inclusive and upper-exclusive. +/// ordering expression. See [`SplitPoint`] for the shared boundary convention. /// /// Like other user-specified data properties such as sortedness, if a source /// declares range partitioning, it is responsible for placing each row in the @@ -217,39 +204,6 @@ pub struct RangePartitioning { split_points: Vec, } -/// A boundary between adjacent range partitions. -/// -/// A split point is a tuple with one [`ScalarValue`] per sort expression in the -/// parent [`RangePartitioning`] ordering. -#[derive(Debug, Clone, PartialEq)] -pub struct SplitPoint { - values: Vec, -} - -impl SplitPoint { - /// Creates a new split point from its tuple values. - pub fn new(values: Vec) -> Self { - Self { values } - } - - /// Returns the tuple values for this split point. - pub fn values(&self) -> &[ScalarValue] { - &self.values - } -} - -impl Display for SplitPoint { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let values = self - .values - .iter() - .map(ToString::to_string) - .collect::>() - .join(", "); - write!(f, "({values})") - } -} - impl RangePartitioning { /// Creates range partitioning metadata without validating split points. /// @@ -265,7 +219,13 @@ impl RangePartitioning { /// Creates range partitioning metadata and validates split point shape and /// ordering. pub fn try_new(ordering: LexOrdering, split_points: Vec) -> Result { - validate_range_split_points(&ordering, &split_points)?; + validate_range_split_points( + &split_points, + &ordering + .iter() + .map(|sort_expr| sort_expr.options) + .collect::>(), + )?; Ok(Self::new(ordering, split_points)) } @@ -336,86 +296,6 @@ fn format_range_split_points(split_points: &[SplitPoint]) -> String { .join(", ") } -fn validate_range_split_points( - ordering: &LexOrdering, - split_points: &[SplitPoint], -) -> Result<()> { - let width = ordering.len(); - for (idx, split_point) in split_points.iter().enumerate() { - let split_point_width = split_point.values.len(); - if split_point_width != width { - return plan_err!( - "Range partitioning split point {idx} has width {split_point_width}, but ordering has width {width}" - ); - } - } - - for (idx, split_points) in split_points.windows(2).enumerate() { - if compare_split_points(ordering, &split_points[0], &split_points[1])? - != Ordering::Less - { - return plan_err!( - "Range partitioning split points must be strictly ordered: split point {idx} ({}) must be less than split point {} ({})", - split_points[0], - idx + 1, - split_points[1] - ); - } - } - - Ok(()) -} - -fn compare_split_points( - ordering: &LexOrdering, - left: &SplitPoint, - right: &SplitPoint, -) -> Result { - for ((left_value, right_value), sort_expr) in - left.values.iter().zip(&right.values).zip(ordering.iter()) - { - let value_ordering = - compare_scalar_values_for_sort(left_value, right_value, sort_expr)?; - if value_ordering != Ordering::Equal { - return Ok(value_ordering); - } - } - - Ok(Ordering::Equal) -} - -fn compare_scalar_values_for_sort( - left: &ScalarValue, - right: &ScalarValue, - sort_expr: &PhysicalSortExpr, -) -> Result { - match (left.is_null(), right.is_null()) { - (true, true) => Ok(Ordering::Equal), - (true, false) => Ok(if sort_expr.options.nulls_first { - Ordering::Less - } else { - Ordering::Greater - }), - (false, true) => Ok(if sort_expr.options.nulls_first { - Ordering::Greater - } else { - Ordering::Less - }), - (false, false) => { - let Some(ordering) = left.partial_cmp(right) else { - return plan_err!( - "Range partitioning split point values are not comparable: {left:?} and {right:?}" - ); - }; - Ok(if sort_expr.options.descending { - ordering.reverse() - } else { - ordering - }) - } - } -} - /// Represents how a [`Partitioning`] satisfies a [`Distribution`] requirement. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PartitioningSatisfaction { @@ -652,7 +532,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; + use datafusion_common::{Result, ScalarValue}; struct PartitioningTestFixture { schema: SchemaRef, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 49593a6c6a56a..ed0b00c91487e 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1751,6 +1751,9 @@ impl AsLogicalPlan for LogicalPlanNode { Partitioning::RoundRobinBatch(partition_count) => { PartitionMethod::RoundRobin(*partition_count as u64) } + Partitioning::Range(_) => { + return not_impl_err!("Range repartition"); + } Partitioning::DistributeBy(_) => { return not_impl_err!("DistributeBy"); } diff --git a/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs index 50c4b3da86cbe..60f8c62b8cd7b 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/exchange_rel.rs @@ -32,6 +32,9 @@ pub fn from_repartition( let partition_count = match repartition.partitioning_scheme { Partitioning::RoundRobinBatch(num) => num, Partitioning::Hash(_, num) => num, + Partitioning::Range(_) => { + return not_impl_err!("Substrait does not support Range repartitioning"); + } Partitioning::DistributeBy(_) => { return not_impl_err!( "Physical plan does not support DistributeBy partitioning" @@ -50,6 +53,9 @@ pub fn from_repartition( .collect::>>()?; ExchangeKind::ScatterByFields(ScatterFields { fields }) } + Partitioning::Range(_) => { + return not_impl_err!("Substrait does not support Range repartitioning"); + } Partitioning::DistributeBy(_) => { return not_impl_err!( "Physical plan does not support DistributeBy partitioning"