diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 490e02875c42..aec3ef4591f1 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -107,6 +107,11 @@ impl SQLMetric { self.value.fetch_add(n, Ordering::Relaxed); } + /// Add elapsed nanoseconds since `start`to self + pub fn add_elapsed(&self, start: std::time::Instant) { + self.add(start.elapsed().as_nanos() as usize) + } + /// Get the current value pub fn value(&self) -> usize { self.value.load(Ordering::Relaxed) diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 37d98c7d118b..f0422814b101 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -38,7 +38,7 @@ use futures::stream::Stream; use futures::StreamExt; use hashbrown::HashMap; use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, + mpsc::{self, UnboundedReceiver, UnboundedSender}, Mutex, }; use tokio::task::JoinHandle; @@ -60,12 +60,40 @@ pub struct RepartitionExec { HashMap, UnboundedReceiver)>, >, >, + + /// Execution metrics + metrics: RepartitionMetrics, +} + +#[derive(Debug, Clone)] +struct RepartitionMetrics { /// Time in nanos to execute child operator and fetch batches - fetch_time_nanos: Arc, + fetch_nanos: Arc, /// Time in nanos to perform repartitioning - repart_time_nanos: Arc, + repart_nanos: Arc, /// Time in nanos for sending resulting batches to channels - send_time_nanos: Arc, + send_nanos: Arc, +} + +impl RepartitionMetrics { + fn new() -> Self { + Self { + fetch_nanos: SQLMetric::time_nanos(), + repart_nanos: SQLMetric::time_nanos(), + send_nanos: SQLMetric::time_nanos(), + } + } + /// Convert into the external metrics form + fn to_hashmap(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("fetchTime".to_owned(), self.fetch_nanos.as_ref().clone()); + metrics.insert( + "repartitionTime".to_owned(), + self.repart_nanos.as_ref().clone(), + ); + metrics.insert("sendTime".to_owned(), self.send_nanos.as_ref().clone()); + metrics + } } impl RepartitionExec { @@ -132,9 +160,8 @@ impl ExecutionPlan for RepartitionExec { // being read yet. This may cause high memory usage if the next operator is // reading output partitions in order rather than concurrently. One workaround // for this would be to add spill-to-disk capabilities. - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::< - Option>, - >(); + let (sender, receiver) = + mpsc::unbounded_channel::>>(); channels.insert(partition, (sender, receiver)); } // Use fixed random state @@ -142,122 +169,24 @@ impl ExecutionPlan for RepartitionExec { // launch one async task per *input* partition for i in 0..num_input_partitions { - let random_state = random.clone(); - let input = self.input.clone(); - let fetch_time = self.fetch_time_nanos.clone(); - let repart_time = self.repart_time_nanos.clone(); - let send_time = self.send_time_nanos.clone(); let txs: HashMap<_, _> = channels .iter() .map(|(partition, (tx, _rx))| (*partition, tx.clone())) .collect(); - let partitioning = self.partitioning.clone(); - let mut txs_captured = txs.clone(); - let input_task: JoinHandle> = tokio::spawn(async move { - // execute the child operator - let now = Instant::now(); - let mut stream = input.execute(i).await?; - fetch_time.add(now.elapsed().as_nanos() as usize); - let mut counter = 0; - let hashes_buf = &mut vec![]; - - loop { - // fetch the next batch - let now = Instant::now(); - let result = stream.next().await; - fetch_time.add(now.elapsed().as_nanos() as usize); - - if result.is_none() { - break; - } - let result: ArrowResult = result.unwrap(); - - match &partitioning { - Partitioning::RoundRobinBatch(_) => { - let now = Instant::now(); - let output_partition = counter % num_output_partitions; - let tx = txs_captured.get_mut(&output_partition).unwrap(); - tx.send(Some(result)).map_err(|e| { - DataFusionError::Execution(e.to_string()) - })?; - send_time.add(now.elapsed().as_nanos() as usize); - } - Partitioning::Hash(exprs, _) => { - let now = Instant::now(); - let input_batch = result?; - let arrays = exprs - .iter() - .map(|expr| { - Ok(expr - .evaluate(&input_batch)? - .into_array(input_batch.num_rows())) - }) - .collect::>>()?; - hashes_buf.clear(); - hashes_buf.resize(arrays[0].len(), 0); - // Hash arrays and compute buckets based on number of partitions - let hashes = - create_hashes(&arrays, &random_state, hashes_buf)?; - let mut indices = vec![vec![]; num_output_partitions]; - for (index, hash) in hashes.iter().enumerate() { - indices - [(*hash % num_output_partitions as u64) as usize] - .push(index as u64) - } - repart_time.add(now.elapsed().as_nanos() as usize); - for (num_output_partition, partition_indices) in - indices.into_iter().enumerate() - { - let now = Instant::now(); - let indices = partition_indices.into(); - // Produce batches based on indices - let columns = input_batch - .columns() - .iter() - .map(|c| { - take(c.as_ref(), &indices, None).map_err( - |e| { - DataFusionError::Execution( - e.to_string(), - ) - }, - ) - }) - .collect::>>>()?; - let output_batch = RecordBatch::try_new( - input_batch.schema(), - columns, - ); - repart_time.add(now.elapsed().as_nanos() as usize); - let now = Instant::now(); - let tx = txs_captured - .get_mut(&num_output_partition) - .unwrap(); - tx.send(Some(output_batch)).map_err(|e| { - DataFusionError::Execution(e.to_string()) - })?; - send_time.add(now.elapsed().as_nanos() as usize); - } - } - other => { - // this should be unreachable as long as the validation logic - // in the constructor is kept up-to-date - return Err(DataFusionError::NotImplemented(format!( - "Unsupported repartitioning scheme {:?}", - other - ))); - } - } - counter += 1; - } - - Ok(()) - }); + let input_task: JoinHandle> = + tokio::spawn(Self::pull_from_input( + random.clone(), + self.input.clone(), + i, + txs.clone(), + self.partitioning.clone(), + self.metrics.clone(), + )); // In a separate task, wait for each input to be done - // (and pass along any errors) - tokio::spawn(async move { Self::wait_for_task(input_task, txs).await }); + // (and pass along any errors, including panic!s) + tokio::spawn(Self::wait_for_task(input_task, txs)); } } @@ -272,14 +201,7 @@ impl ExecutionPlan for RepartitionExec { } fn metrics(&self) -> HashMap { - let mut metrics = HashMap::new(); - metrics.insert("fetchTime".to_owned(), (*self.fetch_time_nanos).clone()); - metrics.insert( - "repartitionTime".to_owned(), - (*self.repart_time_nanos).clone(), - ); - metrics.insert("sendTime".to_owned(), (*self.send_time_nanos).clone()); - metrics + self.metrics.to_hashmap() } fn fmt_as( @@ -305,12 +227,115 @@ impl RepartitionExec { input, partitioning, channels: Arc::new(Mutex::new(HashMap::new())), - fetch_time_nanos: SQLMetric::time_nanos(), - repart_time_nanos: SQLMetric::time_nanos(), - send_time_nanos: SQLMetric::time_nanos(), + metrics: RepartitionMetrics::new(), }) } + /// Pulls data from the specified input plan, feeding it to the + /// output partitions based on the desired partitioning + /// + /// i is the input partition index + /// + /// txs hold the output sending channels for each output partition + async fn pull_from_input( + random_state: ahash::RandomState, + input: Arc, + i: usize, + mut txs: HashMap>>>, + partitioning: Partitioning, + metrics: RepartitionMetrics, + ) -> Result<()> { + let num_output_partitions = txs.len(); + + // execute the child operator + let now = Instant::now(); + let mut stream = input.execute(i).await?; + metrics.fetch_nanos.add_elapsed(now); + + let mut counter = 0; + let hashes_buf = &mut vec![]; + + loop { + // fetch the next batch + let now = Instant::now(); + let result = stream.next().await; + metrics.fetch_nanos.add_elapsed(now); + + if result.is_none() { + break; + } + let result: ArrowResult = result.unwrap(); + + match &partitioning { + Partitioning::RoundRobinBatch(_) => { + let now = Instant::now(); + let output_partition = counter % num_output_partitions; + let tx = txs.get_mut(&output_partition).unwrap(); + tx.send(Some(result)) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + metrics.send_nanos.add_elapsed(now); + } + Partitioning::Hash(exprs, _) => { + let now = Instant::now(); + let input_batch = result?; + let arrays = exprs + .iter() + .map(|expr| { + Ok(expr + .evaluate(&input_batch)? + .into_array(input_batch.num_rows())) + }) + .collect::>>()?; + hashes_buf.clear(); + hashes_buf.resize(arrays[0].len(), 0); + // Hash arrays and compute buckets based on number of partitions + let hashes = create_hashes(&arrays, &random_state, hashes_buf)?; + let mut indices = vec![vec![]; num_output_partitions]; + for (index, hash) in hashes.iter().enumerate() { + indices[(*hash % num_output_partitions as u64) as usize] + .push(index as u64) + } + metrics.repart_nanos.add_elapsed(now); + for (num_output_partition, partition_indices) in + indices.into_iter().enumerate() + { + let now = Instant::now(); + let indices = partition_indices.into(); + // Produce batches based on indices + let columns = input_batch + .columns() + .iter() + .map(|c| { + take(c.as_ref(), &indices, None).map_err(|e| { + DataFusionError::Execution(e.to_string()) + }) + }) + .collect::>>>()?; + let output_batch = + RecordBatch::try_new(input_batch.schema(), columns); + metrics.repart_nanos.add_elapsed(now); + let now = Instant::now(); + let tx = txs.get_mut(&num_output_partition).unwrap(); + tx.send(Some(output_batch)) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + metrics.send_nanos.add_elapsed(now); + } + } + other => { + // this should be unreachable as long as the validation logic + // in the constructor is kept up-to-date + return Err(DataFusionError::NotImplemented(format!( + "Unsupported repartitioning scheme {:?}", + other + ))); + } + } + counter += 1; + } + + Ok(()) + } + /// Waits for `input_task` which is consuming one of the inputs to /// complete. Upon each successful completion, sends a `None` to /// each of the output tx channels to signal one of the inputs is