Skip to content

Commit

Permalink
Use atomics for SQLMetric implementation, remove unused names
Browse files Browse the repository at this point in the history
  • Loading branch information
returnString committed Apr 21, 2021
1 parent c365a4f commit 0a4d560
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 45 deletions.
16 changes: 6 additions & 10 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines the execution plan for the hash aggregate operation

use std::any::Any;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};

use ahash::RandomState;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub struct HashAggregateExec {
/// to the partial aggregate
input_schema: SchemaRef,
/// Metric to track number of output rows
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
}

fn create_schema(
Expand Down Expand Up @@ -144,7 +144,7 @@ impl HashAggregateExec {

let schema = Arc::new(schema);

let output_rows = SQLMetric::counter("outputRows");
let output_rows = SQLMetric::counter();

Ok(HashAggregateExec {
mode,
Expand Down Expand Up @@ -253,10 +253,7 @@ impl ExecutionPlan for HashAggregateExec {

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
metrics
}
}
Expand Down Expand Up @@ -292,7 +289,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
}
}

Expand Down Expand Up @@ -644,7 +641,7 @@ impl GroupedHashAggregateStream {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

Expand Down Expand Up @@ -702,7 +699,6 @@ impl Stream for GroupedHashAggregateStream {
};

if let Ok(batch) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows())
}

Expand Down
40 changes: 25 additions & 15 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.

use std::fmt::{Debug, Display};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{any::Any, pin::Pin};

use crate::execution::context::ExecutionContextState;
Expand Down Expand Up @@ -58,44 +59,53 @@ pub enum MetricType {

/// SQL metric such as counter (number of input or output rows) or timing information about
/// a physical operator.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct SQLMetric {
/// Metric name
name: String,
/// Metric value
value: usize,
value: AtomicUsize,
/// Metric type
metric_type: MetricType,
}

impl Clone for SQLMetric {
fn clone(&self) -> Self {
Self {
value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
metric_type: self.metric_type.clone(),
}
}
}

impl SQLMetric {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops

/// Create a new metric for tracking a counter
pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
pub fn counter() -> Arc<SQLMetric> {
Arc::new(SQLMetric::new(MetricType::Counter))
}

/// Create a new metric for tracking time in nanoseconds
pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
pub fn time_nanos() -> Arc<SQLMetric> {
Arc::new(SQLMetric::new(MetricType::TimeNanos))
}

/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
pub fn new(metric_type: MetricType) -> Self {
Self {
name: name.to_owned(),
value: 0,
value: AtomicUsize::new(0),
metric_type,
}
}

/// Add to the value
pub fn add(&mut self, n: usize) {
self.value += n;
pub fn add(&self, n: usize) {
self.value.fetch_add(n, Ordering::Relaxed);
}

/// Get the current value
pub fn value(&self) -> usize {
self.value
self.value.load(Ordering::Relaxed)
}
}

Expand Down
32 changes: 12 additions & 20 deletions datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::any::Any;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

Expand Down Expand Up @@ -52,9 +52,9 @@ pub struct SortExec {
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Output rows
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
/// Time to sort batches
sort_time_nanos: Arc<Mutex<SQLMetric>>,
sort_time_nanos: Arc<SQLMetric>,
}

impl SortExec {
Expand All @@ -66,8 +66,8 @@ impl SortExec {
Ok(Self {
expr,
input,
output_rows: SQLMetric::counter("outputRows"),
sort_time_nanos: SQLMetric::time_nanos("sortTime"),
output_rows: SQLMetric::counter(),
sort_time_nanos: SQLMetric::time_nanos(),
})
}

Expand Down Expand Up @@ -147,14 +147,8 @@ impl ExecutionPlan for SortExec {

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics.insert(
"sortTime".to_owned(),
self.sort_time_nanos.lock().unwrap().clone(),
);
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
metrics
}
}
Expand Down Expand Up @@ -224,16 +218,16 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
}
}

impl SortStream {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
output_rows: Arc<Mutex<SQLMetric>>,
sort_time: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
sort_time: Arc<SQLMetric>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

Expand All @@ -246,7 +240,6 @@ impl SortStream {
.and_then(move |batches| {
let now = Instant::now();
let result = sort_batches(&batches, &schema, &expr);
let mut sort_time = sort_time.lock().unwrap();
sort_time.add(now.elapsed().as_nanos() as usize);
result
});
Expand Down Expand Up @@ -288,7 +281,6 @@ impl Stream for SortStream {
};

if let Some(Ok(batch)) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows());
}

Expand Down Expand Up @@ -431,8 +423,8 @@ mod tests {
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());

let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
assert_eq!(result.len(), 1);

let columns = result[0].columns();
Expand Down

0 comments on commit 0a4d560

Please sign in to comment.