diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 2599690bfc00..e5747dda88b7 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -21,10 +21,11 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, collections::HashMap, vec}; +use std::time::Instant; +use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric}; use arrow::record_batch::RecordBatch; use arrow::{array::Array, error::Result as ArrowResult}; use arrow::{compute::take, datatypes::SchemaRef}; @@ -35,6 +36,7 @@ use async_trait::async_trait; use futures::stream::Stream; use futures::StreamExt; +use hashbrown::HashMap; use tokio::sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, Mutex, @@ -58,6 +60,12 @@ pub struct RepartitionExec { HashMap, UnboundedReceiver)>, >, >, + /// Time in nanos to execute child operator and fetch batches + fetch_time_nanos: Arc, + /// Time in nanos to perform repartitioning + repart_time_nanos: Arc, + /// Time in nanos for sending resulting batches to channels + send_time_nanos: Arc, } impl RepartitionExec { @@ -136,26 +144,46 @@ impl ExecutionPlan for RepartitionExec { for i in 0..num_input_partitions { let random_state = random.clone(); let input = self.input.clone(); + let fetch_time = self.fetch_time_nanos.clone(); + let repart_time = self.repart_time_nanos.clone(); + let send_time = self.send_time_nanos.clone(); let mut txs: HashMap<_, _> = channels .iter() .map(|(partition, (tx, _rx))| (*partition, tx.clone())) .collect(); let partitioning = self.partitioning.clone(); let _: JoinHandle> = tokio::spawn(async move { + // execute the child operator + let now = Instant::now(); let mut stream = input.execute(i).await?; + fetch_time.add(now.elapsed().as_nanos() as usize); + let mut counter = 0; let hashes_buf = &mut vec![]; - while let Some(result) = stream.next().await { + loop { + // fetch the next batch + let now = Instant::now(); + let result = stream.next().await; + fetch_time.add(now.elapsed().as_nanos() as usize); + + if result.is_none() { + break; + } + let result = result.unwrap(); + match &partitioning { Partitioning::RoundRobinBatch(_) => { + let now = Instant::now(); let output_partition = counter % num_output_partitions; let tx = txs.get_mut(&output_partition).unwrap(); tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; + send_time.add(now.elapsed().as_nanos() as usize); } Partitioning::Hash(exprs, _) => { + let now = Instant::now(); let input_batch = result?; let arrays = exprs .iter() @@ -176,9 +204,11 @@ impl ExecutionPlan for RepartitionExec { [(*hash % num_output_partitions as u64) as usize] .push(index as u64) } + repart_time.add(now.elapsed().as_nanos() as usize); for (num_output_partition, partition_indices) in indices.into_iter().enumerate() { + let now = Instant::now(); let indices = partition_indices.into(); // Produce batches based on indices let columns = input_batch @@ -198,10 +228,13 @@ impl ExecutionPlan for RepartitionExec { input_batch.schema(), columns, ); + repart_time.add(now.elapsed().as_nanos() as usize); + let now = Instant::now(); let tx = txs.get_mut(&num_output_partition).unwrap(); tx.send(Some(output_batch)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; + send_time.add(now.elapsed().as_nanos() as usize); } } other => { @@ -236,6 +269,17 @@ impl ExecutionPlan for RepartitionExec { })) } + fn metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("fetchTime".to_owned(), (*self.fetch_time_nanos).clone()); + metrics.insert( + "repartitionTime".to_owned(), + (*self.repart_time_nanos).clone(), + ); + metrics.insert("sendTime".to_owned(), (*self.send_time_nanos).clone()); + metrics + } + fn fmt_as( &self, t: DisplayFormatType, @@ -259,6 +303,9 @@ impl RepartitionExec { input, partitioning, channels: Arc::new(Mutex::new(HashMap::new())), + fetch_time_nanos: SQLMetric::time_nanos(), + repart_time_nanos: SQLMetric::time_nanos(), + send_time_nanos: SQLMetric::time_nanos(), }) } }