Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12402: [Rust] [DataFusion] Implement SQL metrics example #10049

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions rust/datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines the execution plan for the hash aggregate operation

use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use ahash::RandomState;
Expand All @@ -28,7 +28,7 @@ use futures::{
};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr};
use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr};

use arrow::{
Expand Down Expand Up @@ -94,6 +94,8 @@ pub struct HashAggregateExec {
/// same as input.schema() but for the final aggregate it will be the same as the input
/// to the partial aggregate
input_schema: SchemaRef,
/// Metric to track number of output rows
output_rows: Arc<Mutex<SQLMetric>>,
}

fn create_schema(
Expand Down Expand Up @@ -142,13 +144,19 @@ impl HashAggregateExec {

let schema = Arc::new(schema);

let output_rows = Arc::new(Mutex::new(SQLMetric::new(
"outputRows",
MetricType::Counter,
)));

Ok(HashAggregateExec {
mode,
group_expr,
aggr_expr,
input,
schema,
input_schema,
output_rows,
})
}

Expand Down Expand Up @@ -223,6 +231,7 @@ impl ExecutionPlan for HashAggregateExec {
group_expr,
self.aggr_expr.clone(),
input,
self.output_rows.clone(),
)))
}
}
Expand All @@ -244,6 +253,15 @@ impl ExecutionPlan for HashAggregateExec {
)),
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics
}
}

/*
Expand Down Expand Up @@ -277,6 +295,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
output_rows: Arc<Mutex<SQLMetric>>,
}
}

Expand Down Expand Up @@ -628,6 +647,7 @@ impl GroupedHashAggregateStream {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
output_rows: Arc<Mutex<SQLMetric>>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

Expand All @@ -648,6 +668,7 @@ impl GroupedHashAggregateStream {
schema,
output: rx,
finished: false,
output_rows,
}
}
}
Expand All @@ -667,6 +688,8 @@ impl Stream for GroupedHashAggregateStream {
return Poll::Ready(None);
}

let output_rows = self.output_rows.clone();

// is the output ready?
let this = self.project();
let output_poll = this.output.poll(cx);
Expand All @@ -680,6 +703,12 @@ impl Stream for GroupedHashAggregateStream {
Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
Ok(result) => result,
};

if let Ok(batch) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows())
}

Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
Expand Down Expand Up @@ -1255,6 +1284,11 @@ mod tests {
];

assert_batches_sorted_eq!(&expected, &result);

let metrics = merged_aggregate.metrics();
let output_rows = metrics.get("outputRows").unwrap();
assert_eq!(3, output_rows.value());

Ok(())
}

Expand Down
46 changes: 46 additions & 0 deletions rust/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use async_trait::async_trait;
use futures::stream::Stream;

use self::merge::MergeExec;
use hashbrown::HashMap;

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
Expand All @@ -46,6 +47,46 @@ pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;

/// SQL metric type
#[derive(Debug, Clone)]
pub enum MetricType {
/// Simple counter
Counter,
}

/// SQL metric such as counter (number of input or output rows) or timing information about
/// a physical operator.
#[derive(Debug, Clone)]
pub struct SQLMetric {
/// Metric name
name: String,
/// Metric value
value: usize,
/// Metric type
metric_type: MetricType,
}

impl SQLMetric {
/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
Self {
name: name.to_owned(),
value: 0,
metric_type,
}
}

/// Add to the value
pub fn add(&mut self, n: usize) {
self.value += n;
}

/// Get the current value
pub fn value(&self) -> usize {
self.value
}
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
Expand Down Expand Up @@ -84,6 +125,11 @@ pub trait ExecutionPlan: Debug + Send + Sync {

/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;

/// Return a snapshot of the metrics collected during execution
fn metrics(&self) -> HashMap<String, SQLMetric> {
HashMap::new()
}
}

/// Execute the [ExecutionPlan] and collect the results in memory
Expand Down