From 0a4d560d801f252e5f3c0384368bf98bd52d571b Mon Sep 17 00:00:00 2001 From: Ruan Pearce-Authers Date: Wed, 21 Apr 2021 17:51:01 +0100 Subject: [PATCH] Use atomics for SQLMetric implementation, remove unused names --- .../src/physical_plan/hash_aggregate.rs | 16 +++----- datafusion/src/physical_plan/mod.rs | 40 ++++++++++++------- datafusion/src/physical_plan/sort.rs | 32 ++++++--------- 3 files changed, 43 insertions(+), 45 deletions(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 234265022ef7..fd20b5c65ef2 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -18,7 +18,7 @@ //! Defines the execution plan for the hash aggregate operation use std::any::Any; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use ahash::RandomState; @@ -95,7 +95,7 @@ pub struct HashAggregateExec { /// to the partial aggregate input_schema: SchemaRef, /// Metric to track number of output rows - output_rows: Arc>, + output_rows: Arc, } fn create_schema( @@ -144,7 +144,7 @@ impl HashAggregateExec { let schema = Arc::new(schema); - let output_rows = SQLMetric::counter("outputRows"); + let output_rows = SQLMetric::counter(); Ok(HashAggregateExec { mode, @@ -253,10 +253,7 @@ impl ExecutionPlan for HashAggregateExec { fn metrics(&self) -> HashMap { let mut metrics = HashMap::new(); - metrics.insert( - "outputRows".to_owned(), - self.output_rows.lock().unwrap().clone(), - ); + metrics.insert("outputRows".to_owned(), (*self.output_rows).clone()); metrics } } @@ -292,7 +289,7 @@ pin_project! { #[pin] output: futures::channel::oneshot::Receiver>, finished: bool, - output_rows: Arc>, + output_rows: Arc, } } @@ -644,7 +641,7 @@ impl GroupedHashAggregateStream { group_expr: Vec>, aggr_expr: Vec>, input: SendableRecordBatchStream, - output_rows: Arc>, + output_rows: Arc, ) -> Self { let (tx, rx) = futures::channel::oneshot::channel(); @@ -702,7 +699,6 @@ impl Stream for GroupedHashAggregateStream { }; if let Ok(batch) = &result { - let mut output_rows = output_rows.lock().unwrap(); output_rows.add(batch.num_rows()) } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 5036dcb921bb..80dfe6e473b6 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -18,7 +18,8 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. use std::fmt::{Debug, Display}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::{any::Any, pin::Pin}; use crate::execution::context::ExecutionContextState; @@ -58,44 +59,53 @@ pub enum MetricType { /// SQL metric such as counter (number of input or output rows) or timing information about /// a physical operator. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct SQLMetric { - /// Metric name - name: String, /// Metric value - value: usize, + value: AtomicUsize, /// Metric type metric_type: MetricType, } +impl Clone for SQLMetric { + fn clone(&self) -> Self { + Self { + value: AtomicUsize::new(self.value.load(Ordering::Relaxed)), + metric_type: self.metric_type.clone(), + } + } +} + impl SQLMetric { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + /// Create a new metric for tracking a counter - pub fn counter(name: &str) -> Arc> { - Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter))) + pub fn counter() -> Arc { + Arc::new(SQLMetric::new(MetricType::Counter)) } /// Create a new metric for tracking time in nanoseconds - pub fn time_nanos(name: &str) -> Arc> { - Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos))) + pub fn time_nanos() -> Arc { + Arc::new(SQLMetric::new(MetricType::TimeNanos)) } /// Create a new SQLMetric - pub fn new(name: &str, metric_type: MetricType) -> Self { + pub fn new(metric_type: MetricType) -> Self { Self { - name: name.to_owned(), - value: 0, + value: AtomicUsize::new(0), metric_type, } } /// Add to the value - pub fn add(&mut self, n: usize) { - self.value += n; + pub fn add(&self, n: usize) { + self.value.fetch_add(n, Ordering::Relaxed); } /// Get the current value pub fn value(&self) -> usize { - self.value + self.value.load(Ordering::Relaxed) } } diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 26855b354db0..010e4068638b 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -19,7 +19,7 @@ use std::any::Any; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; @@ -52,9 +52,9 @@ pub struct SortExec { /// Sort expressions expr: Vec, /// Output rows - output_rows: Arc>, + output_rows: Arc, /// Time to sort batches - sort_time_nanos: Arc>, + sort_time_nanos: Arc, } impl SortExec { @@ -66,8 +66,8 @@ impl SortExec { Ok(Self { expr, input, - output_rows: SQLMetric::counter("outputRows"), - sort_time_nanos: SQLMetric::time_nanos("sortTime"), + output_rows: SQLMetric::counter(), + sort_time_nanos: SQLMetric::time_nanos(), }) } @@ -147,14 +147,8 @@ impl ExecutionPlan for SortExec { fn metrics(&self) -> HashMap { let mut metrics = HashMap::new(); - metrics.insert( - "outputRows".to_owned(), - self.output_rows.lock().unwrap().clone(), - ); - metrics.insert( - "sortTime".to_owned(), - self.sort_time_nanos.lock().unwrap().clone(), - ); + metrics.insert("outputRows".to_owned(), (*self.output_rows).clone()); + metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone()); metrics } } @@ -224,7 +218,7 @@ pin_project! { output: futures::channel::oneshot::Receiver>>, finished: bool, schema: SchemaRef, - output_rows: Arc>, + output_rows: Arc, } } @@ -232,8 +226,8 @@ impl SortStream { fn new( input: SendableRecordBatchStream, expr: Vec, - output_rows: Arc>, - sort_time: Arc>, + output_rows: Arc, + sort_time: Arc, ) -> Self { let (tx, rx) = futures::channel::oneshot::channel(); @@ -246,7 +240,6 @@ impl SortStream { .and_then(move |batches| { let now = Instant::now(); let result = sort_batches(&batches, &schema, &expr); - let mut sort_time = sort_time.lock().unwrap(); sort_time.add(now.elapsed().as_nanos() as usize); result }); @@ -288,7 +281,6 @@ impl Stream for SortStream { }; if let Some(Ok(batch)) = &result { - let mut output_rows = output_rows.lock().unwrap(); output_rows.add(batch.num_rows()); } @@ -431,8 +423,8 @@ mod tests { assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); let result: Vec = collect(sort_exec.clone()).await?; - assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0); - assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8); + assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0); + assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8); assert_eq!(result.len(), 1); let columns = result[0].columns();