diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 1a4cb17ea39a7..b78e8bca55085 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -18,7 +18,7 @@ //! Defines the execution plan for the hash aggregate operation use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use ahash::RandomState; @@ -28,7 +28,7 @@ use futures::{ }; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{Accumulator, AggregateExpr}; +use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric}; use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr}; use arrow::{ @@ -94,6 +94,8 @@ pub struct HashAggregateExec { /// same as input.schema() but for the final aggregate it will be the same as the input /// to the partial aggregate input_schema: SchemaRef, + /// Metric to track number of output rows + output_rows: Arc>, } fn create_schema( @@ -142,6 +144,11 @@ impl HashAggregateExec { let schema = Arc::new(schema); + let output_rows = Arc::new(Mutex::new(SQLMetric::new( + "outputRows", + MetricType::Counter, + ))); + Ok(HashAggregateExec { mode, group_expr, @@ -149,6 +156,7 @@ impl HashAggregateExec { input, schema, input_schema, + output_rows, }) } @@ -223,6 +231,7 @@ impl ExecutionPlan for HashAggregateExec { group_expr, self.aggr_expr.clone(), input, + self.output_rows.clone(), ))) } } @@ -244,6 +253,15 @@ impl ExecutionPlan for HashAggregateExec { )), } } + + fn metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert( + "outputRows".to_owned(), + self.output_rows.lock().unwrap().clone(), + ); + metrics + } } /* @@ -277,6 +295,7 @@ pin_project! { #[pin] output: futures::channel::oneshot::Receiver>, finished: bool, + output_rows: Arc>, } } @@ -628,6 +647,7 @@ impl GroupedHashAggregateStream { group_expr: Vec>, aggr_expr: Vec>, input: SendableRecordBatchStream, + output_rows: Arc>, ) -> Self { let (tx, rx) = futures::channel::oneshot::channel(); @@ -648,6 +668,7 @@ impl GroupedHashAggregateStream { schema, output: rx, finished: false, + output_rows, } } } @@ -667,6 +688,8 @@ impl Stream for GroupedHashAggregateStream { return Poll::Ready(None); } + let output_rows = self.output_rows.clone(); + // is the output ready? let this = self.project(); let output_poll = this.output.poll(cx); @@ -680,6 +703,12 @@ impl Stream for GroupedHashAggregateStream { Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving Ok(result) => result, }; + + if let Ok(batch) = &result { + let mut output_rows = output_rows.lock().unwrap(); + output_rows.add(batch.num_rows()) + } + Poll::Ready(Some(result)) } Poll::Pending => Poll::Pending, @@ -1255,6 +1284,11 @@ mod tests { ]; assert_batches_sorted_eq!(&expected, &result); + + let metrics = merged_aggregate.metrics(); + let output_rows = metrics.get("outputRows").unwrap(); + assert_eq!(3, output_rows.value()); + Ok(()) } diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index d529e98f75da5..054d585e8e375 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -33,6 +33,7 @@ use async_trait::async_trait; use futures::stream::Stream; use self::merge::MergeExec; +use hashbrown::HashMap; /// Trait for types that stream [arrow::record_batch::RecordBatch] pub trait RecordBatchStream: Stream> { @@ -46,6 +47,46 @@ pub trait RecordBatchStream: Stream> { /// Trait for a stream of record batches. pub type SendableRecordBatchStream = Pin>; +/// SQL metric type +#[derive(Debug, Clone)] +pub enum MetricType { + /// Simple counter + Counter, +} + +/// SQL metric such as counter (number of input or output rows) or timing information about +/// a physical operator. +#[derive(Debug, Clone)] +pub struct SQLMetric { + /// Metric name + name: String, + /// Metric value + value: usize, + /// Metric type + metric_type: MetricType, +} + +impl SQLMetric { + /// Create a new SQLMetric + pub fn new(name: &str, metric_type: MetricType) -> Self { + Self { + name: name.to_owned(), + value: 0, + metric_type, + } + } + + /// Add to the value + pub fn add(&mut self, n: usize) { + self.value += n; + } + + /// Get the current value + pub fn value(&self) -> usize { + self.value + } +} + /// Physical query planner that converts a `LogicalPlan` to an /// `ExecutionPlan` suitable for execution. pub trait PhysicalPlanner { @@ -84,6 +125,11 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// creates an iterator async fn execute(&self, partition: usize) -> Result; + + /// Return a snapshot of the metrics collected during execution + fn metrics(&self) -> HashMap { + HashMap::new() + } } /// Execute the [ExecutionPlan] and collect the results in memory