From ce7fc922d372a37d2020d24da34f3cf1d383c528 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 15 Aug 2024 14:01:57 -0700 Subject: [PATCH 1/4] Minor: make some physical-plan properties public --- datafusion/expr/src/logical_plan/builder.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 20 +++++++++---------- .../src/aggregates/order/full.rs | 2 +- .../physical-plan/src/aggregates/order/mod.rs | 8 ++++---- .../src/aggregates/order/partial.rs | 2 +- datafusion/physical-plan/src/filter.rs | 2 +- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2e53a682854c..f9769560b251 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1288,7 +1288,7 @@ pub fn build_join_schema( /// /// This allows MySQL style selects like /// `SELECT col FROM t WHERE pk = 5` if col is unique -fn add_group_by_exprs_from_dependencies( +pub fn add_group_by_exprs_from_dependencies( mut group_expr: Vec, schema: &DFSchemaRef, ) -> Result> { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4d39eff42b5f..96cff44a2ccd 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -48,9 +48,9 @@ use datafusion_physical_expr::{ use itertools::Itertools; -mod group_values; +pub mod group_values; mod no_grouping; -mod order; +pub mod order; mod row_hash; mod topk; mod topk_stream; @@ -128,14 +128,14 @@ impl AggregateMode { #[derive(Clone, Debug, Default)] pub struct PhysicalGroupBy { /// Distinct (Physical Expr, Alias) in the grouping set - expr: Vec<(Arc, String)>, + pub expr: Vec<(Arc, String)>, /// Corresponding NULL expressions for expr - null_expr: Vec<(Arc, String)>, + pub null_expr: Vec<(Arc, String)>, /// Null mask for each group in this grouping set. Each group is /// composed of either one of the group expressions in expr or a null /// expression in null_expr. If `groups[i][j]` is true, then the the /// j-th expression in the i-th group is NULL, otherwise it is `expr[j]`. - groups: Vec>, + pub groups: Vec>, } impl PhysicalGroupBy { @@ -925,7 +925,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. -fn get_finer_aggregate_exprs_requirement( +pub fn get_finer_aggregate_exprs_requirement( aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, @@ -998,7 +998,7 @@ fn get_finer_aggregate_exprs_requirement( /// The expressions are different depending on `mode`: /// * Partial: AggregateExpr::expressions /// * Final: columns of `AggregateExpr::state_fields()` -fn aggregate_expressions( +pub fn aggregate_expressions( aggr_expr: &[Arc], mode: &AggregateMode, col_idx_base: usize, @@ -1051,9 +1051,9 @@ fn merge_expressions( }) } -pub(crate) type AccumulatorItem = Box; +pub type AccumulatorItem = Box; -fn create_accumulators( +pub fn create_accumulators( aggr_expr: &[Arc], ) -> Result> { aggr_expr @@ -1064,7 +1064,7 @@ fn create_accumulators( /// returns a vector of ArrayRefs, where each entry corresponds to either the /// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) -fn finalize_aggregation( +pub fn finalize_aggregation( accumulators: &mut [AccumulatorItem], mode: &AggregateMode, ) -> Result> { diff --git a/datafusion/physical-plan/src/aggregates/order/full.rs b/datafusion/physical-plan/src/aggregates/order/full.rs index c15538e8ab8e..28378c323488 100644 --- a/datafusion/physical-plan/src/aggregates/order/full.rs +++ b/datafusion/physical-plan/src/aggregates/order/full.rs @@ -54,7 +54,7 @@ use datafusion_expr::EmitTo; /// `0..12` can be emitted. Note that `13` can not yet be emitted as /// there may be more values in the next batch with the same group_id. #[derive(Debug)] -pub(crate) struct GroupOrderingFull { +pub struct GroupOrderingFull { state: State, } diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index 1d94d56df138..483150ee61af 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -25,12 +25,12 @@ mod full; mod partial; use crate::InputOrderMode; -pub(crate) use full::GroupOrderingFull; -pub(crate) use partial::GroupOrderingPartial; +pub use full::GroupOrderingFull; +pub use partial::GroupOrderingPartial; /// Ordering information for each group in the hash table #[derive(Debug)] -pub(crate) enum GroupOrdering { +pub enum GroupOrdering { /// Groups are not ordered None, /// Groups are ordered by some pre-set of the group keys @@ -117,7 +117,7 @@ impl GroupOrdering { } /// Return the size of memory used by the ordering state, in bytes - pub(crate) fn size(&self) -> usize { + pub fn size(&self) -> usize { std::mem::size_of::() + match self { GroupOrdering::None => 0, diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs b/datafusion/physical-plan/src/aggregates/order/partial.rs index f8fd86ff8b50..73a157f3aa96 100644 --- a/datafusion/physical-plan/src/aggregates/order/partial.rs +++ b/datafusion/physical-plan/src/aggregates/order/partial.rs @@ -60,7 +60,7 @@ use std::sync::Arc; /// order) recent group index ///``` #[derive(Debug)] -pub(crate) struct GroupOrderingPartial { +pub struct GroupOrderingPartial { /// State machine state: State, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 568987b14798..6aba3d817710 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -347,7 +347,7 @@ struct FilterExecStream { baseline_metrics: BaselineMetrics, } -pub(crate) fn batch_filter( +pub fn batch_filter( batch: &RecordBatch, predicate: &Arc, ) -> Result { From 099efcd6a59284270137dcfd75e1bec95e038555 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 15 Aug 2024 15:41:14 -0700 Subject: [PATCH 2/4] add Default for GroupOrderingFull --- datafusion/physical-plan/src/aggregates/order/full.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/order/full.rs b/datafusion/physical-plan/src/aggregates/order/full.rs index 28378c323488..e86d7677479a 100644 --- a/datafusion/physical-plan/src/aggregates/order/full.rs +++ b/datafusion/physical-plan/src/aggregates/order/full.rs @@ -142,3 +142,9 @@ impl GroupOrderingFull { std::mem::size_of::() } } + +impl Default for GroupOrderingFull { + fn default() -> Self { + Self::new() + } +} From 36028dcbd9a4872ac6bf83f4b44f8b8cfb78299c Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 16 Aug 2024 13:44:06 -0700 Subject: [PATCH 3/4] make groups and null_expr private again --- datafusion/physical-plan/src/aggregates/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 96cff44a2ccd..c18206cbc74d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -130,12 +130,12 @@ pub struct PhysicalGroupBy { /// Distinct (Physical Expr, Alias) in the grouping set pub expr: Vec<(Arc, String)>, /// Corresponding NULL expressions for expr - pub null_expr: Vec<(Arc, String)>, + null_expr: Vec<(Arc, String)>, /// Null mask for each group in this grouping set. Each group is /// composed of either one of the group expressions in expr or a null /// expression in null_expr. If `groups[i][j]` is true, then the the /// j-th expression in the i-th group is NULL, otherwise it is `expr[j]`. - pub groups: Vec>, + groups: Vec>, } impl PhysicalGroupBy { From c1d20905d6a32438f02f2bc1e5ab79f4fdcc0d28 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 16 Aug 2024 14:51:05 -0700 Subject: [PATCH 4/4] remove pub label --- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c18206cbc74d..89d4c452cca6 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -128,7 +128,7 @@ impl AggregateMode { #[derive(Clone, Debug, Default)] pub struct PhysicalGroupBy { /// Distinct (Physical Expr, Alias) in the grouping set - pub expr: Vec<(Arc, String)>, + expr: Vec<(Arc, String)>, /// Corresponding NULL expressions for expr null_expr: Vec<(Arc, String)>, /// Null mask for each group in this grouping set. Each group is