diff --git a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs new file mode 100644 index 000000000000..c4e29ea71060 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics for the various group-by implementations. + +use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; + +pub(crate) struct GroupByMetrics { + /// Time spent calculating the group IDs from the evaluated grouping columns. + pub(crate) time_calculating_group_ids: Time, + /// Time spent evaluating the inputs to the aggregate functions. + pub(crate) aggregate_arguments_time: Time, + /// Time spent evaluating the aggregate expressions themselves + /// (e.g. summing all elements and counting number of elements for `avg` aggregate). + pub(crate) aggregation_time: Time, + /// Time spent emitting the final results and constructing the record batch + /// which includes finalizing the grouping expressions + /// (e.g. emit from the hash table in case of hash aggregation) and the accumulators + pub(crate) emitting_time: Time, +} + +impl GroupByMetrics { + pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + time_calculating_group_ids: MetricBuilder::new(metrics) + .subset_time("time_calculating_group_ids", partition), + aggregate_arguments_time: MetricBuilder::new(metrics) + .subset_time("aggregate_arguments_time", partition), + aggregation_time: MetricBuilder::new(metrics) + .subset_time("aggregation_time", partition), + emitting_time: MetricBuilder::new(metrics) + .subset_time("emitting_time", partition), + } + } +} + +#[cfg(test)] +mod tests { + use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; + use crate::metrics::MetricsSet; + use crate::test::TestMemoryExec; + use crate::{collect, ExecutionPlan}; + use arrow::array::{Float64Array, UInt32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_common::Result; + use datafusion_execution::TaskContext; + use datafusion_functions_aggregate::count::count_udaf; + use datafusion_functions_aggregate::sum::sum_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; + use datafusion_physical_expr::expressions::col; + use std::sync::Arc; + + /// Helper function to verify all three GroupBy metrics exist and have non-zero values + fn assert_groupby_metrics(metrics: &MetricsSet) { + let agg_arguments_time = metrics.sum_by_name("aggregate_arguments_time"); + assert!(agg_arguments_time.is_some()); + assert!(agg_arguments_time.unwrap().as_usize() > 0); + + let aggregation_time = metrics.sum_by_name("aggregation_time"); + assert!(aggregation_time.is_some()); + assert!(aggregation_time.unwrap().as_usize() > 0); + + let emitting_time = metrics.sum_by_name("emitting_time"); + assert!(emitting_time.is_some()); + assert!(emitting_time.unwrap().as_usize() > 0); + } + + #[tokio::test] + async fn test_groupby_metrics_partial_mode() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Create multiple batches to ensure metrics accumulate + let batches = (0..5) + .map(|i| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(UInt32Array::from(vec![1, 2, 3, 4])), + Arc::new(Float64Array::from(vec![ + i as f64, + (i + 1) as f64, + (i + 2) as f64, + (i + 3) as f64, + ])), + ], + ) + .unwrap() + }) + .collect::>(); + + let input = TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?; + + let group_by = + PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); + + let aggregates = vec![ + Arc::new( + AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("SUM(b)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("COUNT(b)") + .build()?, + ), + ]; + + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggregates, + vec![None, None], + input, + schema, + )?); + + let task_ctx = Arc::new(TaskContext::default()); + let _result = + collect(Arc::clone(&aggregate_exec) as _, Arc::clone(&task_ctx)).await?; + + let metrics = aggregate_exec.metrics().unwrap(); + assert_groupby_metrics(&metrics); + + Ok(()) + } + + #[tokio::test] + async fn test_groupby_metrics_final_mode() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::Float64, false), + ])); + + let batches = (0..3) + .map(|i| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(UInt32Array::from(vec![1, 2, 3])), + Arc::new(Float64Array::from(vec![ + i as f64, + (i + 1) as f64, + (i + 2) as f64, + ])), + ], + ) + .unwrap() + }) + .collect::>(); + + let partial_input = + TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?; + + let group_by = + PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); + + let aggregates = vec![Arc::new( + AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("SUM(b)") + .build()?, + )]; + + // Create partial aggregate + let partial_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by.clone(), + aggregates.clone(), + vec![None], + partial_input, + Arc::clone(&schema), + )?); + + // Create final aggregate + let final_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Final, + group_by.as_final(), + aggregates, + vec![None], + partial_aggregate, + schema, + )?); + + let task_ctx = Arc::new(TaskContext::default()); + let _result = + collect(Arc::clone(&final_aggregate) as _, Arc::clone(&task_ctx)).await?; + + let metrics = final_aggregate.metrics().unwrap(); + assert_groupby_metrics(&metrics); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 316fbe11ae31..5f2a2faa1112 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -46,8 +46,11 @@ use crate::aggregates::{ order::GroupOrdering, }; +mod metrics; mod null_builder; +pub(crate) use metrics::GroupByMetrics; + /// Stores the group values during hash aggregation. /// /// # Background diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 6132a8b0add5..98c8cb235ca4 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -23,7 +23,7 @@ use std::vec; use super::order::GroupOrdering; use super::AggregateExec; -use crate::aggregates::group_values::{new_group_values, GroupValues}; +use crate::aggregates::group_values::{new_group_values, GroupByMetrics, GroupValues}; use crate::aggregates::order::GroupOrderingFull; use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, AggregateMode, @@ -49,6 +49,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_common::instant::Instant; use futures::ready; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -430,6 +431,9 @@ pub(crate) struct GroupedHashAggregateStream { /// Execution metrics baseline_metrics: BaselineMetrics, + + /// Aggregation-specific metrics + group_by_metrics: GroupByMetrics, } impl GroupedHashAggregateStream { @@ -447,6 +451,7 @@ impl GroupedHashAggregateStream { let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(&context))?; let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); + let group_by_metrics = GroupByMetrics::new(&agg.metrics, partition); let timer = baseline_metrics.elapsed_compute().timer(); @@ -609,6 +614,7 @@ impl GroupedHashAggregateStream { current_group_indices: Default::default(), exec_state, baseline_metrics, + group_by_metrics, batch_size, group_ordering, input_done: false, @@ -830,12 +836,25 @@ impl GroupedHashAggregateStream { evaluate_group_by(&self.group_by, &batch)? }; + // Only create the timer if there are actual aggregate arguments to evaluate + let timer = match ( + self.spill_state.is_stream_merging, + self.spill_state.merging_aggregate_arguments.is_empty(), + self.aggregate_arguments.is_empty(), + ) { + (true, false, _) | (false, _, false) => { + Some(self.group_by_metrics.aggregate_arguments_time.timer()) + } + _ => None, + }; + // Evaluate the aggregation expressions. let input_values = if self.spill_state.is_stream_merging { evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)? } else { evaluate_many(&self.aggregate_arguments, &batch)? }; + drop(timer); // Evaluate the filter expressions, if any, against the inputs let filter_values = if self.spill_state.is_stream_merging { @@ -846,6 +865,8 @@ impl GroupedHashAggregateStream { }; for group_values in &group_by_values { + let groups_start_time = Instant::now(); + // calculate the group indices for each input row let starting_num_groups = self.group_values.len(); self.group_values @@ -862,6 +883,12 @@ impl GroupedHashAggregateStream { )?; } + // Use this instant for both measurements to save a syscall + let agg_start_time = Instant::now(); + self.group_by_metrics + .time_calculating_group_ids + .add_duration(agg_start_time - groups_start_time); + // Gather the inputs to call the actual accumulator let t = self .accumulators @@ -897,6 +924,9 @@ impl GroupedHashAggregateStream { acc.merge_batch(values, group_indices, None, total_num_groups)?; } } + self.group_by_metrics + .aggregation_time + .add_elapsed(agg_start_time); } } @@ -941,6 +971,7 @@ impl GroupedHashAggregateStream { return Ok(None); } + let timer = self.group_by_metrics.emitting_time.timer(); let mut output = self.group_values.emit(emit_to)?; if let EmitTo::First(n) = emit_to { self.group_ordering.remove_groups(n); @@ -961,12 +992,14 @@ impl GroupedHashAggregateStream { | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?), } } + drop(timer); // emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is // over the target memory size after emission, we can emit again rather than returning Err. let _ = self.update_memory_reservation(); let batch = RecordBatch::try_new(schema, output)?; debug_assert!(batch.num_rows() > 0); + Ok(Some(batch)) } diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 9aaadfd52b96..eb1b7543cbfd 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -17,11 +17,13 @@ //! A memory-conscious aggregation implementation that limits group buckets to a fixed number +use crate::aggregates::group_values::GroupByMetrics; use crate::aggregates::topk::priority_map::PriorityMap; use crate::aggregates::{ aggregate_expressions, evaluate_group_by, evaluate_many, AggregateExec, PhysicalGroupBy, }; +use crate::metrics::BaselineMetrics; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; @@ -42,6 +44,8 @@ pub struct GroupedTopKAggregateStream { started: bool, schema: SchemaRef, input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + group_by_metrics: GroupByMetrics, aggregate_arguments: Vec>>, group_by: PhysicalGroupBy, priority_map: PriorityMap, @@ -57,6 +61,8 @@ impl GroupedTopKAggregateStream { let agg_schema = Arc::clone(&aggr.schema); let group_by = aggr.group_by.clone(); let input = aggr.input.execute(partition, Arc::clone(&context))?; + let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); + let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); let aggregate_arguments = aggregate_expressions(&aggr.aggr_expr, &aggr.mode, group_by.expr.len())?; let (val_field, desc) = aggr @@ -75,6 +81,8 @@ impl GroupedTopKAggregateStream { row_count: 0, schema: agg_schema, input, + baseline_metrics, + group_by_metrics, aggregate_arguments, group_by, priority_map, @@ -90,6 +98,8 @@ impl RecordBatchStream for GroupedTopKAggregateStream { impl GroupedTopKAggregateStream { fn intern(&mut self, ids: ArrayRef, vals: ArrayRef) -> Result<()> { + let _timer = self.group_by_metrics.time_calculating_group_ids.timer(); + let len = ids.len(); self.priority_map.set_batch(ids, Arc::clone(&vals)); @@ -111,7 +121,10 @@ impl Stream for GroupedTopKAggregateStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let emitting_time = self.group_by_metrics.emitting_time.clone(); while let Poll::Ready(res) = self.input.poll_next_unpin(cx) { + let _timer = elapsed_compute.timer(); match res { // got a batch, convert to rows and append to our TreeMap Some(Ok(batch)) => { @@ -140,10 +153,15 @@ impl Stream for GroupedTopKAggregateStream { "Exactly 1 group value required" ); let group_by_values = Arc::clone(&group_by_values[0][0]); - let input_values = evaluate_many( - &self.aggregate_arguments, - batches.first().unwrap(), - )?; + let input_values = { + let _timer = (!self.aggregate_arguments.is_empty()).then(|| { + self.group_by_metrics.aggregate_arguments_time.timer() + }); + evaluate_many( + &self.aggregate_arguments, + batches.first().unwrap(), + )? + }; assert_eq!(input_values.len(), 1, "Exactly 1 input required"); assert_eq!(input_values[0].len(), 1, "Exactly 1 input required"); let input_values = Arc::clone(&input_values[0][0]); @@ -157,8 +175,11 @@ impl Stream for GroupedTopKAggregateStream { trace!("partition {} emit None", self.partition); return Poll::Ready(None); } - let cols = self.priority_map.emit()?; - let batch = RecordBatch::try_new(Arc::clone(&self.schema), cols)?; + let batch = { + let _timer = emitting_time.timer(); + let cols = self.priority_map.emit()?; + RecordBatch::try_new(Arc::clone(&self.schema), cols)? + }; trace!( "partition {} emit batch with {} rows", self.partition,