From 40d2873c8baeb426804e1c8bc8ae7520476eae82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 3 Sep 2022 06:27:14 -0400 Subject: [PATCH 1/2] [MINOR] Add debug logging to plan teardown --- datafusion/core/src/physical_plan/common.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index dd194263d6bc..22d45b58ff69 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -21,7 +21,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::physical_plan::metrics::MemTrackingMetrics; -use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; +use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics}; use arrow::compute::concat; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; @@ -29,6 +29,7 @@ use arrow::error::Result as ArrowResult; use arrow::ipc::writer::FileWriter; use arrow::record_batch::RecordBatch; use futures::{Future, Stream, StreamExt, TryStreamExt}; +use log::debug; use pin_project_lite::pin_project; use std::fs; use std::fs::{metadata, File}; @@ -185,6 +186,10 @@ pub(crate) fn spawn_execution( // there is no place to send the error. let arrow_error = ArrowError::ExternalError(Box::new(e)); output.send(Err(arrow_error)).await.ok(); + debug!( + "Stopping execution: error executing input: {}", + displayable(input.as_ref()).one_line() + ); return; } Ok(stream) => stream, @@ -194,6 +199,10 @@ pub(crate) fn spawn_execution( // If send fails, plan being torn down, // there is no place to send the error. if let Err(_) = output.send(item).await { + debug!( + "Stopping execution: output is gone, plan cancelling: {}", + displayable(input.as_ref()).one_line() + ); return; } } From 0f168d95c907f09e635c66d45bd5338ec735c4f2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 3 Sep 2022 06:38:02 -0400 Subject: [PATCH 2/2] clippy --- datafusion/core/src/physical_plan/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 22d45b58ff69..79535497d4ed 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -198,7 +198,7 @@ pub(crate) fn spawn_execution( while let Some(item) = stream.next().await { // If send fails, plan being torn down, // there is no place to send the error. - if let Err(_) = output.send(item).await { + if output.send(item).await.is_err() { debug!( "Stopping execution: output is gone, plan cancelling: {}", displayable(input.as_ref()).one_line()