diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index ce56ca4f7dfd..7de8afe264e1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -28,7 +28,7 @@ use datafusion_common::Result; use datafusion_expr::EmitTo; -pub(crate) mod multi_group_by; +pub mod multi_group_by; mod row; mod single_group_by; @@ -84,7 +84,7 @@ mod null_builder; /// Each distinct group in a hash aggregation is identified by a unique group id /// (usize) which is assigned by instances of this trait. Group ids are /// continuous without gaps, starting from 0. -pub(crate) trait GroupValues: Send { +pub trait GroupValues: Send { /// Calculates the group id for each input row of `cols`, assigning new /// group ids as necessary. /// diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index ac96a98edfe1..c303f6bca1a5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -23,7 +23,7 @@ mod primitive; use std::mem::{self, size_of}; -use crate::aggregates::group_values::multi_group_by::{ +pub use crate::aggregates::group_values::multi_group_by::{ bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder, }; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 6ed0a0f5d901..b5ad5873713f 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -51,7 +51,7 @@ use datafusion_physical_expr::{ use itertools::Itertools; use tracing_futures::Instrument; -pub(crate) mod group_values; +pub mod group_values; mod no_grouping; pub mod order; mod row_hash; @@ -213,7 +213,7 @@ impl PhysicalGroupBy { } /// The number of expressions in the output schema. - fn num_output_exprs(&self) -> usize { + pub fn num_output_exprs(&self) -> usize { let mut num_exprs = self.expr.len(); if !self.is_single() { num_exprs += 1 @@ -242,7 +242,7 @@ impl PhysicalGroupBy { } /// Returns the number expression as grouping keys. - fn num_group_exprs(&self) -> usize { + pub fn num_group_exprs(&self) -> usize { if self.is_single() { self.expr.len() } else { @@ -285,7 +285,7 @@ impl PhysicalGroupBy { /// /// This might be different from the `group_fields` that might contain internal expressions that /// should not be part of the output schema. - fn output_fields(&self, input_schema: &Schema) -> Result> { + pub fn output_fields(&self, input_schema: &Schema) -> Result> { let mut fields = self.group_fields(input_schema)?; fields.truncate(self.num_output_exprs()); Ok(fields) @@ -339,9 +339,15 @@ enum StreamType { impl From for SendableRecordBatchStream { fn from(stream: StreamType) -> Self { match stream { - StreamType::AggregateStream(stream) => Box::pin(stream.instrument(tracing::trace_span!("AggregateStream"))), - StreamType::GroupedHash(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedHashAggregateStream"))), - StreamType::GroupedPriorityQueue(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedTopKAggregateStream"))), + StreamType::AggregateStream(stream) => { + Box::pin(stream.instrument(tracing::trace_span!("AggregateStream"))) + } + StreamType::GroupedHash(stream) => Box::pin( + stream.instrument(tracing::trace_span!("GroupedHashAggregateStream")), + ), + StreamType::GroupedPriorityQueue(stream) => Box::pin( + stream.instrument(tracing::trace_span!("GroupedTopKAggregateStream")), + ), } } }