Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions rust/datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use futures::{
};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr};

use arrow::{
Expand Down Expand Up @@ -144,10 +144,7 @@ impl HashAggregateExec {

let schema = Arc::new(schema);

let output_rows = Arc::new(Mutex::new(SQLMetric::new(
"outputRows",
MetricType::Counter,
)));
let output_rows = SQLMetric::counter("outputRows");

Ok(HashAggregateExec {
mode,
Expand Down
14 changes: 13 additions & 1 deletion rust/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.

use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{any::Any, pin::Pin};

use crate::execution::context::ExecutionContextState;
Expand Down Expand Up @@ -52,6 +52,8 @@ pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync
pub enum MetricType {
/// Simple counter
Counter,
/// Wall clock time in nanoseconds
TimeNanos,
}

/// SQL metric such as counter (number of input or output rows) or timing information about
Expand All @@ -67,6 +69,16 @@ pub struct SQLMetric {
}

impl SQLMetric {
/// Create a new metric for tracking a counter
pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
}

/// Create a new metric for tracking time in nanoseconds
pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
}

/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
Self {
Expand Down
71 changes: 62 additions & 9 deletions rust/datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Instant;

use async_trait::async_trait;
use futures::stream::Stream;
use futures::Future;
use hashbrown::HashMap;

use pin_project_lite::pin_project;

Expand All @@ -37,9 +40,9 @@ use arrow::{array::ArrayRef, error::ArrowError};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning};

use async_trait::async_trait;
use crate::physical_plan::{
common, Distribution, ExecutionPlan, Partitioning, SQLMetric,
};

/// Sort execution plan
#[derive(Debug)]
Expand All @@ -48,6 +51,10 @@ pub struct SortExec {
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Output rows
output_rows: Arc<Mutex<SQLMetric>>,
/// Time to sort batches
sort_time_nanos: Arc<Mutex<SQLMetric>>,
}

impl SortExec {
Expand All @@ -56,7 +63,12 @@ impl SortExec {
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Ok(Self { expr, input })
Ok(Self {
expr,
input,
output_rows: SQLMetric::counter("outputRows"),
sort_time_nanos: SQLMetric::time_nanos("sortTime"),
Comment on lines +69 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given Rust's focus on compile time type checking, what would you think about using typed counters rather than String keys?

So make the code look something like:

Suggested change
output_rows: SQLMetric::counter("outputRows"),
sort_time_nanos: SQLMetric::time_nanos("sortTime"),
output_rows: SQLMetric<OutputRows>::new(),
sort_time_nanos: SQLMetric<SortTime>::new(),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply adding an enum for the metrics? This might limit extensibility for users that want to add custom metrics.

})
}

/// Input schema
Expand Down Expand Up @@ -125,7 +137,25 @@ impl ExecutionPlan for SortExec {
}
let input = self.input.execute(0).await?;

Ok(Box::pin(SortStream::new(input, self.expr.clone())))
Ok(Box::pin(SortStream::new(
input,
self.expr.clone(),
self.output_rows.clone(),
self.sort_time_nanos.clone(),
)))
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics.insert(
"sortTime".to_owned(),
self.sort_time_nanos.lock().unwrap().clone(),
);
metrics
}
}

Expand Down Expand Up @@ -194,11 +224,17 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
output_rows: Arc<Mutex<SQLMetric>>,
}
}

impl SortStream {
fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) -> Self {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
output_rows: Arc<Mutex<SQLMetric>>,
sort_time: Arc<Mutex<SQLMetric>>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

let schema = input.schema();
Expand All @@ -207,7 +243,13 @@ impl SortStream {
let sorted_batch = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)
.and_then(move |batches| sort_batches(&batches, &schema, &expr));
.and_then(move |batches| {
let now = Instant::now();
let result = sort_batches(&batches, &schema, &expr);
let mut sort_time = sort_time.lock().unwrap();
sort_time.add(now.elapsed().as_nanos() as usize);
result
});

tx.send(sorted_batch)
});
Expand All @@ -216,6 +258,7 @@ impl SortStream {
output: rx,
finished: false,
schema,
output_rows,
}
}
}
Expand All @@ -224,6 +267,8 @@ impl Stream for SortStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let output_rows = self.output_rows.clone();

if self.finished {
return Poll::Ready(None);
}
Expand All @@ -241,6 +286,12 @@ impl Stream for SortStream {
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
Ok(result) => result.transpose(),
};

if let Some(Ok(batch)) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows());
}

Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
Expand Down Expand Up @@ -379,7 +430,9 @@ mod tests {
assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());

let result: Vec<RecordBatch> = collect(sort_exec).await?;
let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
assert_eq!(result.len(), 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also assert that the time counter was greater than zero?


let columns = result[0].columns();
Expand Down