From 1c77ec0512e5671fa4adb2096f6c5ae8be832383 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 10 Jun 2021 10:30:58 -0400 Subject: [PATCH 1/3] Cleanup RepartitionExec code --- datafusion/src/physical_plan/repartition.rs | 238 ++++++++++---------- 1 file changed, 124 insertions(+), 114 deletions(-) diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 37d98c7d118b..452f091b2dc8 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -38,7 +38,7 @@ use futures::stream::Stream; use futures::StreamExt; use hashbrown::HashMap; use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, + mpsc::{self, UnboundedReceiver, UnboundedSender}, Mutex, }; use tokio::task::JoinHandle; @@ -132,9 +132,8 @@ impl ExecutionPlan for RepartitionExec { // being read yet. This may cause high memory usage if the next operator is // reading output partitions in order rather than concurrently. One workaround // for this would be to add spill-to-disk capabilities. - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::< - Option>, - >(); + let (sender, receiver) = + mpsc::unbounded_channel::>>(); channels.insert(partition, (sender, receiver)); } // Use fixed random state @@ -142,122 +141,25 @@ impl ExecutionPlan for RepartitionExec { // launch one async task per *input* partition for i in 0..num_input_partitions { - let random_state = random.clone(); - let input = self.input.clone(); - let fetch_time = self.fetch_time_nanos.clone(); - let repart_time = self.repart_time_nanos.clone(); - let send_time = self.send_time_nanos.clone(); let txs: HashMap<_, _> = channels .iter() .map(|(partition, (tx, _rx))| (*partition, tx.clone())) .collect(); - let partitioning = self.partitioning.clone(); - let mut txs_captured = txs.clone(); - let input_task: JoinHandle> = tokio::spawn(async move { - // execute the child operator - let now = Instant::now(); - let mut stream = input.execute(i).await?; - fetch_time.add(now.elapsed().as_nanos() as usize); - - let mut counter = 0; - let hashes_buf = &mut vec![]; - - loop { - // fetch the next batch - let now = Instant::now(); - let result = stream.next().await; - fetch_time.add(now.elapsed().as_nanos() as usize); - - if result.is_none() { - break; - } - let result: ArrowResult = result.unwrap(); - - match &partitioning { - Partitioning::RoundRobinBatch(_) => { - let now = Instant::now(); - let output_partition = counter % num_output_partitions; - let tx = txs_captured.get_mut(&output_partition).unwrap(); - tx.send(Some(result)).map_err(|e| { - DataFusionError::Execution(e.to_string()) - })?; - send_time.add(now.elapsed().as_nanos() as usize); - } - Partitioning::Hash(exprs, _) => { - let now = Instant::now(); - let input_batch = result?; - let arrays = exprs - .iter() - .map(|expr| { - Ok(expr - .evaluate(&input_batch)? - .into_array(input_batch.num_rows())) - }) - .collect::>>()?; - hashes_buf.clear(); - hashes_buf.resize(arrays[0].len(), 0); - // Hash arrays and compute buckets based on number of partitions - let hashes = - create_hashes(&arrays, &random_state, hashes_buf)?; - let mut indices = vec![vec![]; num_output_partitions]; - for (index, hash) in hashes.iter().enumerate() { - indices - [(*hash % num_output_partitions as u64) as usize] - .push(index as u64) - } - repart_time.add(now.elapsed().as_nanos() as usize); - for (num_output_partition, partition_indices) in - indices.into_iter().enumerate() - { - let now = Instant::now(); - let indices = partition_indices.into(); - // Produce batches based on indices - let columns = input_batch - .columns() - .iter() - .map(|c| { - take(c.as_ref(), &indices, None).map_err( - |e| { - DataFusionError::Execution( - e.to_string(), - ) - }, - ) - }) - .collect::>>>()?; - let output_batch = RecordBatch::try_new( - input_batch.schema(), - columns, - ); - repart_time.add(now.elapsed().as_nanos() as usize); - let now = Instant::now(); - let tx = txs_captured - .get_mut(&num_output_partition) - .unwrap(); - tx.send(Some(output_batch)).map_err(|e| { - DataFusionError::Execution(e.to_string()) - })?; - send_time.add(now.elapsed().as_nanos() as usize); - } - } - other => { - // this should be unreachable as long as the validation logic - // in the constructor is kept up-to-date - return Err(DataFusionError::NotImplemented(format!( - "Unsupported repartitioning scheme {:?}", - other - ))); - } - } - counter += 1; - } - - Ok(()) - }); + let input_task: JoinHandle> = + tokio::spawn(Self::pull_from_input( + random.clone(), + self.input.clone(), + i, + txs.clone(), + self.partitioning.clone(), + self.fetch_time_nanos.clone(), + self.repart_time_nanos.clone(), + self.send_time_nanos.clone(), + )); // In a separate task, wait for each input to be done - // (and pass along any errors) - tokio::spawn(async move { Self::wait_for_task(input_task, txs).await }); + // (and pass along any errors, including panic!s) + tokio::spawn(Self::wait_for_task(input_task, txs)); } } @@ -311,6 +213,114 @@ impl RepartitionExec { }) } + /// Pulls data from the specified input plan, feeding it to the + /// output partitions based on the desired partitioning + /// + /// i is the input partition index + /// + /// txs hold the output sending channels for each output partition + async fn pull_from_input( + random_state: ahash::RandomState, + input: Arc, + i: usize, + mut txs: HashMap>>>, + partitioning: Partitioning, + // TODO move these to a single Metrics object + fetch_time: Arc, + repart_time: Arc, + send_time: Arc, + ) -> Result<()> { + let num_output_partitions = txs.len(); + + // execute the child operator + let now = Instant::now(); + let mut stream = input.execute(i).await?; + fetch_time.add(now.elapsed().as_nanos() as usize); + + let mut counter = 0; + let hashes_buf = &mut vec![]; + + loop { + // fetch the next batch + let now = Instant::now(); + let result = stream.next().await; + fetch_time.add(now.elapsed().as_nanos() as usize); + + if result.is_none() { + break; + } + let result: ArrowResult = result.unwrap(); + + match &partitioning { + Partitioning::RoundRobinBatch(_) => { + let now = Instant::now(); + let output_partition = counter % num_output_partitions; + let tx = txs.get_mut(&output_partition).unwrap(); + tx.send(Some(result)) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + send_time.add(now.elapsed().as_nanos() as usize); + } + Partitioning::Hash(exprs, _) => { + let now = Instant::now(); + let input_batch = result?; + let arrays = exprs + .iter() + .map(|expr| { + Ok(expr + .evaluate(&input_batch)? + .into_array(input_batch.num_rows())) + }) + .collect::>>()?; + hashes_buf.clear(); + hashes_buf.resize(arrays[0].len(), 0); + // Hash arrays and compute buckets based on number of partitions + let hashes = create_hashes(&arrays, &random_state, hashes_buf)?; + let mut indices = vec![vec![]; num_output_partitions]; + for (index, hash) in hashes.iter().enumerate() { + indices[(*hash % num_output_partitions as u64) as usize] + .push(index as u64) + } + repart_time.add(now.elapsed().as_nanos() as usize); + for (num_output_partition, partition_indices) in + indices.into_iter().enumerate() + { + let now = Instant::now(); + let indices = partition_indices.into(); + // Produce batches based on indices + let columns = input_batch + .columns() + .iter() + .map(|c| { + take(c.as_ref(), &indices, None).map_err(|e| { + DataFusionError::Execution(e.to_string()) + }) + }) + .collect::>>>()?; + let output_batch = + RecordBatch::try_new(input_batch.schema(), columns); + repart_time.add(now.elapsed().as_nanos() as usize); + let now = Instant::now(); + let tx = txs.get_mut(&num_output_partition).unwrap(); + tx.send(Some(output_batch)) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + send_time.add(now.elapsed().as_nanos() as usize); + } + } + other => { + // this should be unreachable as long as the validation logic + // in the constructor is kept up-to-date + return Err(DataFusionError::NotImplemented(format!( + "Unsupported repartitioning scheme {:?}", + other + ))); + } + } + counter += 1; + } + + Ok(()) + } + /// Waits for `input_task` which is consuming one of the inputs to /// complete. Upon each successful completion, sends a `None` to /// each of the output tx channels to signal one of the inputs is From 3d63b0d107192f37d969e3e8cec4abddc7fb626a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 10 Jun 2021 10:44:48 -0400 Subject: [PATCH 2/3] cleanup metric handling --- datafusion/src/physical_plan/repartition.rs | 69 +++++++++++++-------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 452f091b2dc8..998e835ef3b4 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -60,12 +60,40 @@ pub struct RepartitionExec { HashMap, UnboundedReceiver)>, >, >, + + /// Execution metrics + metrics: RepartitionMetrics, +} + +#[derive(Debug, Clone)] +struct RepartitionMetrics { /// Time in nanos to execute child operator and fetch batches - fetch_time_nanos: Arc, + fetch_nanos: Arc, /// Time in nanos to perform repartitioning - repart_time_nanos: Arc, + repart_nanos: Arc, /// Time in nanos for sending resulting batches to channels - send_time_nanos: Arc, + send_nanos: Arc, +} + +impl RepartitionMetrics { + fn new() -> Self { + Self { + fetch_nanos: SQLMetric::time_nanos(), + repart_nanos: SQLMetric::time_nanos(), + send_nanos: SQLMetric::time_nanos(), + } + } + /// Convert into the external metrics form + fn to_hashmap(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("fetchTime".to_owned(), self.fetch_nanos.as_ref().clone()); + metrics.insert( + "repartitionTime".to_owned(), + self.repart_nanos.as_ref().clone(), + ); + metrics.insert("sendTime".to_owned(), self.send_nanos.as_ref().clone()); + metrics + } } impl RepartitionExec { @@ -145,6 +173,7 @@ impl ExecutionPlan for RepartitionExec { .iter() .map(|(partition, (tx, _rx))| (*partition, tx.clone())) .collect(); + let input_task: JoinHandle> = tokio::spawn(Self::pull_from_input( random.clone(), @@ -152,9 +181,7 @@ impl ExecutionPlan for RepartitionExec { i, txs.clone(), self.partitioning.clone(), - self.fetch_time_nanos.clone(), - self.repart_time_nanos.clone(), - self.send_time_nanos.clone(), + self.metrics.clone(), )); // In a separate task, wait for each input to be done @@ -174,14 +201,7 @@ impl ExecutionPlan for RepartitionExec { } fn metrics(&self) -> HashMap { - let mut metrics = HashMap::new(); - metrics.insert("fetchTime".to_owned(), (*self.fetch_time_nanos).clone()); - metrics.insert( - "repartitionTime".to_owned(), - (*self.repart_time_nanos).clone(), - ); - metrics.insert("sendTime".to_owned(), (*self.send_time_nanos).clone()); - metrics + self.metrics.to_hashmap() } fn fmt_as( @@ -207,9 +227,7 @@ impl RepartitionExec { input, partitioning, channels: Arc::new(Mutex::new(HashMap::new())), - fetch_time_nanos: SQLMetric::time_nanos(), - repart_time_nanos: SQLMetric::time_nanos(), - send_time_nanos: SQLMetric::time_nanos(), + metrics: RepartitionMetrics::new(), }) } @@ -225,17 +243,14 @@ impl RepartitionExec { i: usize, mut txs: HashMap>>>, partitioning: Partitioning, - // TODO move these to a single Metrics object - fetch_time: Arc, - repart_time: Arc, - send_time: Arc, + metrics: RepartitionMetrics, ) -> Result<()> { let num_output_partitions = txs.len(); // execute the child operator let now = Instant::now(); let mut stream = input.execute(i).await?; - fetch_time.add(now.elapsed().as_nanos() as usize); + metrics.fetch_nanos.add(now.elapsed().as_nanos() as usize); let mut counter = 0; let hashes_buf = &mut vec![]; @@ -244,7 +259,7 @@ impl RepartitionExec { // fetch the next batch let now = Instant::now(); let result = stream.next().await; - fetch_time.add(now.elapsed().as_nanos() as usize); + metrics.fetch_nanos.add(now.elapsed().as_nanos() as usize); if result.is_none() { break; @@ -258,7 +273,7 @@ impl RepartitionExec { let tx = txs.get_mut(&output_partition).unwrap(); tx.send(Some(result)) .map_err(|e| DataFusionError::Execution(e.to_string()))?; - send_time.add(now.elapsed().as_nanos() as usize); + metrics.send_nanos.add(now.elapsed().as_nanos() as usize); } Partitioning::Hash(exprs, _) => { let now = Instant::now(); @@ -280,7 +295,7 @@ impl RepartitionExec { indices[(*hash % num_output_partitions as u64) as usize] .push(index as u64) } - repart_time.add(now.elapsed().as_nanos() as usize); + metrics.repart_nanos.add(now.elapsed().as_nanos() as usize); for (num_output_partition, partition_indices) in indices.into_iter().enumerate() { @@ -298,12 +313,12 @@ impl RepartitionExec { .collect::>>>()?; let output_batch = RecordBatch::try_new(input_batch.schema(), columns); - repart_time.add(now.elapsed().as_nanos() as usize); + metrics.repart_nanos.add(now.elapsed().as_nanos() as usize); let now = Instant::now(); let tx = txs.get_mut(&num_output_partition).unwrap(); tx.send(Some(output_batch)) .map_err(|e| DataFusionError::Execution(e.to_string()))?; - send_time.add(now.elapsed().as_nanos() as usize); + metrics.send_nanos.add(now.elapsed().as_nanos() as usize); } } other => { From 98d2853ecdd9a1bb86ec5f64ab00a52e1bad74e3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 10 Jun 2021 10:47:19 -0400 Subject: [PATCH 3/3] Add elapsed_nanos --- datafusion/src/physical_plan/mod.rs | 5 +++++ datafusion/src/physical_plan/repartition.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 490e02875c42..aec3ef4591f1 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -107,6 +107,11 @@ impl SQLMetric { self.value.fetch_add(n, Ordering::Relaxed); } + /// Add elapsed nanoseconds since `start`to self + pub fn add_elapsed(&self, start: std::time::Instant) { + self.add(start.elapsed().as_nanos() as usize) + } + /// Get the current value pub fn value(&self) -> usize { self.value.load(Ordering::Relaxed) diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 998e835ef3b4..f0422814b101 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -250,7 +250,7 @@ impl RepartitionExec { // execute the child operator let now = Instant::now(); let mut stream = input.execute(i).await?; - metrics.fetch_nanos.add(now.elapsed().as_nanos() as usize); + metrics.fetch_nanos.add_elapsed(now); let mut counter = 0; let hashes_buf = &mut vec![]; @@ -259,7 +259,7 @@ impl RepartitionExec { // fetch the next batch let now = Instant::now(); let result = stream.next().await; - metrics.fetch_nanos.add(now.elapsed().as_nanos() as usize); + metrics.fetch_nanos.add_elapsed(now); if result.is_none() { break; @@ -273,7 +273,7 @@ impl RepartitionExec { let tx = txs.get_mut(&output_partition).unwrap(); tx.send(Some(result)) .map_err(|e| DataFusionError::Execution(e.to_string()))?; - metrics.send_nanos.add(now.elapsed().as_nanos() as usize); + metrics.send_nanos.add_elapsed(now); } Partitioning::Hash(exprs, _) => { let now = Instant::now(); @@ -295,7 +295,7 @@ impl RepartitionExec { indices[(*hash % num_output_partitions as u64) as usize] .push(index as u64) } - metrics.repart_nanos.add(now.elapsed().as_nanos() as usize); + metrics.repart_nanos.add_elapsed(now); for (num_output_partition, partition_indices) in indices.into_iter().enumerate() { @@ -313,12 +313,12 @@ impl RepartitionExec { .collect::>>>()?; let output_batch = RecordBatch::try_new(input_batch.schema(), columns); - metrics.repart_nanos.add(now.elapsed().as_nanos() as usize); + metrics.repart_nanos.add_elapsed(now); let now = Instant::now(); let tx = txs.get_mut(&num_output_partition).unwrap(); tx.send(Some(output_batch)) .map_err(|e| DataFusionError::Execution(e.to_string()))?; - metrics.send_nanos.add(now.elapsed().as_nanos() as usize); + metrics.send_nanos.add_elapsed(now); } } other => {