From fda9122e71e63050b1c59438289c22e1cf79509c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 May 2021 14:46:20 -0400 Subject: [PATCH] Return errors properly from RepartitionExec --- datafusion/src/physical_plan/repartition.rs | 205 ++++++++++++++++++-- datafusion/src/test/exec.rs | 183 ++++++++++++++++- 2 files changed, 372 insertions(+), 16 deletions(-) diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index e5747dda88b7..37d98c7d118b 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -147,12 +147,13 @@ impl ExecutionPlan for RepartitionExec { let fetch_time = self.fetch_time_nanos.clone(); let repart_time = self.repart_time_nanos.clone(); let send_time = self.send_time_nanos.clone(); - let mut txs: HashMap<_, _> = channels + let txs: HashMap<_, _> = channels .iter() .map(|(partition, (tx, _rx))| (*partition, tx.clone())) .collect(); let partitioning = self.partitioning.clone(); - let _: JoinHandle> = tokio::spawn(async move { + let mut txs_captured = txs.clone(); + let input_task: JoinHandle> = tokio::spawn(async move { // execute the child operator let now = Instant::now(); let mut stream = input.execute(i).await?; @@ -170,13 +171,13 @@ impl ExecutionPlan for RepartitionExec { if result.is_none() { break; } - let result = result.unwrap(); + let result: ArrowResult = result.unwrap(); match &partitioning { Partitioning::RoundRobinBatch(_) => { let now = Instant::now(); let output_partition = counter % num_output_partitions; - let tx = txs.get_mut(&output_partition).unwrap(); + let tx = txs_captured.get_mut(&output_partition).unwrap(); tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -230,7 +231,9 @@ impl ExecutionPlan for RepartitionExec { ); repart_time.add(now.elapsed().as_nanos() as usize); let now = Instant::now(); - let tx = txs.get_mut(&num_output_partition).unwrap(); + let tx = txs_captured + .get_mut(&num_output_partition) + .unwrap(); tx.send(Some(output_batch)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -249,13 +252,12 @@ impl ExecutionPlan for RepartitionExec { counter += 1; } - // notify each output partition that this input partition has no more data - for (_, tx) in txs { - tx.send(None) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - } Ok(()) }); + + // In a separate task, wait for each input to be done + // (and pass along any errors) + tokio::spawn(async move { Self::wait_for_task(input_task, txs).await }); } } @@ -308,6 +310,45 @@ impl RepartitionExec { send_time_nanos: SQLMetric::time_nanos(), }) } + + /// Waits for `input_task` which is consuming one of the inputs to + /// complete. Upon each successful completion, sends a `None` to + /// each of the output tx channels to signal one of the inputs is + /// complete. Upon error, propagates the errors to all output tx + /// channels. + async fn wait_for_task( + input_task: JoinHandle>, + txs: HashMap>>>, + ) { + // wait for completion, and propagate error + // note we ignore errors on send (.ok) as that means the receiver has already shutdown. + match input_task.await { + // Error in joining task + Err(e) => { + for (_, tx) in txs { + let err = DataFusionError::Execution(format!("Join Error: {}", e)); + let err = Err(err.into_arrow_external_error()); + tx.send(Some(err)).ok(); + } + } + // Error from running input task + Ok(Err(e)) => { + for (_, tx) in txs { + // wrap it because need to send error to all output partitions + let err = DataFusionError::Execution(e.to_string()); + let err = Err(err.into_arrow_external_error()); + tx.send(Some(err)).ok(); + } + } + // Input task completed successfully + Ok(Ok(())) => { + // notify each output partition that this input partition has no more data + for (_, tx) in txs { + tx.send(None).ok(); + } + } + } + } } struct RepartitionStream { @@ -356,10 +397,17 @@ impl RecordBatchStream for RepartitionStream { #[cfg(test)] mod tests { use super::*; - use crate::physical_plan::memory::MemoryExec; - use arrow::array::UInt32Array; + use crate::{ + assert_batches_sorted_eq, + physical_plan::memory::MemoryExec, + test::exec::{ErrorExec, MockExec}, + }; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; + use arrow::{ + array::{ArrayRef, StringArray, UInt32Array}, + error::ArrowError, + }; #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { @@ -517,4 +565,137 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn unsupported_partitioning() { + // have to send at least one batch through to provoke error + let batch = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, + )]) + .unwrap(); + + let schema = batch.schema(); + let input = MockExec::new(vec![Ok(batch)], schema); + // This generates an error (partitioning type not supported) + // but only after the plan is executed. The error should be + // returned and no results produced + let partitioning = Partitioning::UnknownPartitioning(1); + let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + let output_stream = exec.execute(0).await.unwrap(); + + // Expect that an error is returned + let result_string = crate::physical_plan::common::collect(output_stream) + .await + .unwrap_err() + .to_string(); + assert!( + result_string + .contains("Unsupported repartitioning scheme UnknownPartitioning(1)"), + "actual: {}", + result_string + ); + } + + #[tokio::test] + async fn error_for_input_exec() { + // This generates an error on a call to execute. The error + // should be returned and no results produced. + + let input = ErrorExec::new(); + let partitioning = Partitioning::RoundRobinBatch(1); + let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + + // Note: this should pass (the stream can be created) but the + // error when the input is executed should get passed back + let output_stream = exec.execute(0).await.unwrap(); + + // Expect that an error is returned + let result_string = crate::physical_plan::common::collect(output_stream) + .await + .unwrap_err() + .to_string(); + assert!( + result_string.contains("ErrorExec, unsurprisingly, errored in partition 0"), + "actual: {}", + result_string + ); + } + + #[tokio::test] + async fn repartition_with_error_in_stream() { + let batch = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, + )]) + .unwrap(); + + // input stream returns one good batch and then one error. The + // error should be returned. + let err = Err(ArrowError::ComputeError("bad data error".to_string())); + + let schema = batch.schema(); + let input = MockExec::new(vec![Ok(batch), err], schema); + let partitioning = Partitioning::RoundRobinBatch(1); + let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + + // Note: this should pass (the stream can be created) but the + // error when the input is executed should get passed back + let output_stream = exec.execute(0).await.unwrap(); + + // Expect that an error is returned + let result_string = crate::physical_plan::common::collect(output_stream) + .await + .unwrap_err() + .to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {}", + result_string + ); + } + + #[tokio::test] + async fn repartition_with_delayed_stream() { + let batch1 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, + )]) + .unwrap(); + + let batch2 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["frob", "baz"])) as ArrayRef, + )]) + .unwrap(); + + // The mock exec doesn't return immediately (instead it + // requires the input to wait at least once) + let schema = batch1.schema(); + let expected_batches = vec![batch1.clone(), batch2.clone()]; + let input = MockExec::new(vec![Ok(batch1), Ok(batch2)], schema); + let partitioning = Partitioning::RoundRobinBatch(1); + + let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + + let expected = vec![ + "+------------------+", + "| my_awesome_field |", + "+------------------+", + "| foo |", + "| bar |", + "| frob |", + "| baz |", + "+------------------+", + ]; + + assert_batches_sorted_eq!(&expected, &expected_batches); + + let output_stream = exec.execute(0).await.unwrap(); + let batches = crate::physical_plan::common::collect(output_stream) + .await + .unwrap(); + + assert_batches_sorted_eq!(&expected, &batches); + } } diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 04cd29530c01..bcd94dd6d639 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -17,14 +17,25 @@ //! Simple iterator over batches for use in testing -use std::task::{Context, Poll}; +use async_trait::async_trait; +use std::{ + any::Any, + sync::Arc, + task::{Context, Poll}, +}; use arrow::{ - datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, + datatypes::{DataType, Field, Schema, SchemaRef}, + error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, }; -use futures::Stream; +use futures::{Stream, StreamExt}; +use tokio_stream::wrappers::ReceiverStream; -use crate::physical_plan::RecordBatchStream; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; /// Index into the data that has been returned so far #[derive(Debug, Default, Clone)] @@ -100,3 +111,167 @@ impl RecordBatchStream for TestStream { self.data[0].schema() } } + +/// A Mock ExecutionPlan that can be used for writing tests of other ExecutionPlans +/// +#[derive(Debug)] +pub struct MockExec { + /// the results to send back + data: Vec>, + schema: SchemaRef, +} + +impl MockExec { + /// Create a new exec with a single partition that returns the + /// record batches in this Exec. Note the batches are not produced + /// immediately (the caller has to actually yield and another task + /// must run) to ensure any poll loops are correct. + pub fn new(data: Vec>, schema: SchemaRef) -> Self { + Self { data, schema } + } +} + +#[async_trait] +impl ExecutionPlan for MockExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec> { + unimplemented!() + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + /// Returns a stream which yields data + async fn execute(&self, partition: usize) -> Result { + assert_eq!(partition, 0); + + let schema = self.schema(); + + // Result doesn't implement clone, so do it ourself + let data: Vec<_> = self + .data + .iter() + .map(|r| match r { + Ok(batch) => Ok(batch.clone()), + Err(e) => Err(clone_error(e)), + }) + .collect(); + + let (tx, rx) = tokio::sync::mpsc::channel(2); + + // task simply sends data in order but in a separate + // thread (to ensure the batches are not available without the + // DelayedStream yielding). + tokio::task::spawn(async move { + for batch in data { + println!("Sending batch via delayed stream"); + if let Err(e) = tx.send(batch).await { + println!("ERROR batch via delayed stream: {}", e); + } + } + }); + + // returned stream simply reads off the rx stream + let stream = DelayedStream { + schema, + inner: ReceiverStream::new(rx), + }; + Ok(Box::pin(stream)) + } +} + +fn clone_error(e: &ArrowError) -> ArrowError { + use ArrowError::*; + match e { + ComputeError(msg) => ComputeError(msg.to_string()), + _ => unimplemented!(), + } +} + +#[derive(Debug)] +pub struct DelayedStream { + schema: SchemaRef, + inner: ReceiverStream>, +} + +impl Stream for DelayedStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.inner.poll_next_unpin(cx) + } +} + +impl RecordBatchStream for DelayedStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +/// A mock execution plan that errors on a call to execute +#[derive(Debug)] +pub struct ErrorExec { + schema: SchemaRef, +} +impl ErrorExec { + pub fn new() -> Self { + let schema = Arc::new(Schema::new(vec![Field::new( + "dummy", + DataType::Int64, + true, + )])); + Self { schema } + } +} + +#[async_trait] +impl ExecutionPlan for ErrorExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec> { + unimplemented!() + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + /// Returns a stream which yields data + async fn execute(&self, partition: usize) -> Result { + Err(DataFusionError::Internal(format!( + "ErrorExec, unsurprisingly, errored in partition {}", + partition + ))) + } +}