diff --git a/datafusion/src/logical_plan/display.rs b/datafusion/src/logical_plan/display.rs index 76749b547a8f..f285534fdf1b 100644 --- a/datafusion/src/logical_plan/display.rs +++ b/datafusion/src/logical_plan/display.rs @@ -29,7 +29,8 @@ pub struct IndentVisitor<'a, 'b> { f: &'a mut fmt::Formatter<'b>, /// If true, includes summarized schema information with_schema: bool, - indent: u32, + /// The current indent + indent: usize, } impl<'a, 'b> IndentVisitor<'a, 'b> { @@ -42,13 +43,6 @@ impl<'a, 'b> IndentVisitor<'a, 'b> { indent: 0, } } - - fn write_indent(&mut self) -> fmt::Result { - for _ in 0..self.indent { - write!(self.f, " ")?; - } - Ok(()) - } } impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> { @@ -58,8 +52,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> { if self.indent > 0 { writeln!(self.f)?; } - self.write_indent()?; - + write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; write!(self.f, "{}", plan.display())?; if self.with_schema { write!( diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 13509d13eb15..8b9aac9ea73b 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -356,13 +356,15 @@ pub enum Partitioning { /// after all children have been visited. //// /// To use, define a struct that implements this trait and then invoke -/// "LogicalPlan::accept". +/// [`LogicalPlan::accept`]. /// /// For example, for a logical plan like: /// +/// ```text /// Projection: #id /// Filter: #state Eq Utf8(\"CO\")\ /// CsvScan: employee.csv projection=Some([0, 3])"; +/// ``` /// /// The sequence of visit operations would be: /// ```text diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs index b91e0b672eb5..e25412d9d6b8 100644 --- a/datafusion/src/physical_plan/coalesce_batches.rs +++ b/datafusion/src/physical_plan/coalesce_batches.rs @@ -25,7 +25,8 @@ use std::task::{Context, Poll}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, }; use arrow::compute::kernels::concat::concat; @@ -114,6 +115,22 @@ impl ExecutionPlan for CoalesceBatchesExec { is_closed: false, })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "CoalesceBatchesExec: target_batch_size={}", + self.target_batch_size + ) + } + } + } } struct CoalesceBatchesStream { diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 4372352d6ecf..f6f5da4cf8db 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -21,7 +21,6 @@ use futures::{lock::Mutex, StreamExt}; use std::{any::Any, sync::Arc, task::Poll}; -use crate::physical_plan::memory::MemoryStream; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -36,8 +35,10 @@ use crate::{ use async_trait::async_trait; use std::time::Instant; -use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; -use crate::physical_plan::coalesce_batches::concat_batches; +use super::{ + coalesce_batches::concat_batches, memory::MemoryStream, DisplayFormatType, + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; use log::debug; /// Data of the left side @@ -192,6 +193,18 @@ impl ExecutionPlan for CrossJoinExec { join_time: 0, })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "CrossJoinExec") + } + } + } } /// A stream that issues [RecordBatch]es as they arrive from the right of the join. diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 9ab817799954..96b24cc33201 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -18,8 +18,7 @@ //! Execution plan for reading CSV files use crate::error::{DataFusionError, Result}; -use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::{common, Partitioning}; +use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::csv; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -135,6 +134,19 @@ impl std::fmt::Debug for Source { } } +impl std::fmt::Display for Source { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Source::PartitionedFiles { path, filenames } => { + write!(f, "Path({}: [{}])", path, filenames.join(",")) + } + Source::Reader(_) => { + write!(f, "Reader(...)") + } + } + } +} + impl Clone for Source { fn clone(&self) -> Self { match self { @@ -405,6 +417,22 @@ impl ExecutionPlan for CsvExec { } } } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "CsvExec: source={}, has_header={}", + self.source, self.has_header + ) + } + } + } } /// Iterator over batches diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs new file mode 100644 index 000000000000..bfc3cd951d21 --- /dev/null +++ b/datafusion/src/physical_plan/display.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of physical plan display. See +//! [`crate::physical_plan::displayable`] for examples of how to +//! format + +use std::fmt; + +use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; + +/// Options for controlling how each [`ExecutionPlan`] should format itself +#[derive(Debug, Clone, Copy)] +pub enum DisplayFormatType { + /// Default, compact format. Example: `FilterExec: c12 < 10.0` + Default, +} + +/// Wraps an `ExecutionPlan` with various ways to display this plan +pub struct DisplayableExecutionPlan<'a> { + inner: &'a dyn ExecutionPlan, +} + +impl<'a> DisplayableExecutionPlan<'a> { + /// Create a wrapper around an [`'ExecutionPlan'] which can be + /// pretty printed in a variety of ways + pub fn new(inner: &'a dyn ExecutionPlan) -> Self { + Self { inner } + } + + /// Return a `format`able structure that produces a single line + /// per node. + /// + /// ```text + /// ProjectionExec: expr=[a] + /// CoalesceBatchesExec: target_batch_size=4096 + /// FilterExec: a < 5 + /// RepartitionExec: partitioning=RoundRobinBatch(16) + /// CsvExec: source=...", + /// ``` + pub fn indent(&self) -> impl fmt::Display + 'a { + struct Wrapper<'a>(&'a dyn ExecutionPlan); + impl<'a> fmt::Display for Wrapper<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let t = DisplayFormatType::Default; + let mut visitor = IndentVisitor { t, f, indent: 0 }; + accept(self.0, &mut visitor) + } + } + Wrapper(self.inner) + } +} + +/// Formats plans with a single line per node. +struct IndentVisitor<'a, 'b> { + /// How to format each node + t: DisplayFormatType, + /// Write to this formatter + f: &'a mut fmt::Formatter<'b>, + ///with_schema: bool, + indent: usize, +} + +impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { + type Error = fmt::Error; + fn pre_visit( + &mut self, + plan: &dyn ExecutionPlan, + ) -> std::result::Result { + write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; + plan.fmt_as(self.t, self.f)?; + writeln!(self.f)?; + self.indent += 1; + Ok(true) + } +} diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 927f16fe3d21..f3513c2950e4 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -120,6 +120,10 @@ impl AggregateExpr for DistinctCount { count_data_type: self.data_type.clone(), })) } + + fn name(&self) -> &str { + &self.name + } } #[derive(Debug)] diff --git a/datafusion/src/physical_plan/empty.rs b/datafusion/src/physical_plan/empty.rs index 3011b289507f..391a695f4501 100644 --- a/datafusion/src/physical_plan/empty.rs +++ b/datafusion/src/physical_plan/empty.rs @@ -21,8 +21,9 @@ use std::any::Any; use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::memory::MemoryStream; -use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning}; +use crate::physical_plan::{ + memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, +}; use arrow::array::NullArray; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -120,6 +121,18 @@ impl ExecutionPlan for EmptyExec { None, )?)) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row) + } + } + } } #[cfg(test)] diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index 26d2c94dc80a..3c5ef1af3236 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -20,15 +20,14 @@ use std::any::Any; use std::sync::Arc; -use crate::error::{DataFusionError, Result}; use crate::{ + error::{DataFusionError, Result}, logical_plan::StringifiedPlan, - physical_plan::{common::SizedRecordBatchStream, ExecutionPlan}, + physical_plan::Partitioning, + physical_plan::{common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan}, }; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; -use crate::physical_plan::Partitioning; - use super::SendableRecordBatchStream; use async_trait::async_trait; @@ -122,4 +121,16 @@ impl ExecutionPlan for ExplainExec { vec![Arc::new(record_batch)], ))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ExplainExec") + } + } + } } diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 38644129dcd0..6a6332042188 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -109,6 +109,10 @@ impl AggregateExpr for Avg { fn expressions(&self) -> Vec> { vec![self.expr.clone()] } + + fn name(&self) -> &str { + &self.name + } } /// An accumulator to compute the average diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs index 22459813b7e5..4a3fbe4fa7d3 100644 --- a/datafusion/src/physical_plan/expressions/count.rs +++ b/datafusion/src/physical_plan/expressions/count.rs @@ -83,6 +83,10 @@ impl AggregateExpr for Count { fn create_accumulator(&self) -> Result> { Ok(Box::new(CountAccumulator::new())) } + + fn name(&self) -> &str { + &self.name + } } #[derive(Debug)] diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 5ed14610ada3..ea917d30d940 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -88,6 +88,10 @@ impl AggregateExpr for Max { fn create_accumulator(&self) -> Result> { Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?)) } + + fn name(&self) -> &str { + &self.name + } } // Statically-typed version of min/max(array) -> ScalarValue for string types. @@ -387,6 +391,10 @@ impl AggregateExpr for Min { fn create_accumulator(&self) -> Result> { Ok(Box::new(MinAccumulator::try_new(&self.data_type)?)) } + + fn name(&self) -> &str { + &self.name + } } #[derive(Debug)] diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 6e252205955d..4d57c39bb31c 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -74,6 +74,19 @@ pub struct PhysicalSortExpr { pub options: SortOptions, } +impl std::fmt::Display for PhysicalSortExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let opts_string = match (self.options.descending, self.options.nulls_first) { + (true, true) => "DESC", + (true, false) => "DESC NULLS LAST", + (false, true) => "ASC", + (false, false) => "ASC NULLS LAST", + }; + + write!(f, "{} {}", self.expr, opts_string) + } +} + impl PhysicalSortExpr { /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result { diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs index 6f50894003da..7bbbf99fa659 100644 --- a/datafusion/src/physical_plan/expressions/sum.rs +++ b/datafusion/src/physical_plan/expressions/sum.rs @@ -104,6 +104,10 @@ impl AggregateExpr for Sum { fn create_accumulator(&self) -> Result> { Ok(Box::new(SumAccumulator::try_new(&self.data_type)?)) } + + fn name(&self) -> &str { + &self.name + } } #[derive(Debug)] diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index 61af78db8ed2..bc2b17aa4f47 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -25,7 +25,9 @@ use std::task::{Context, Poll}; use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr}; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, +}; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; @@ -119,6 +121,18 @@ impl ExecutionPlan for FilterExec { input: self.input.execute(partition).await?, })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "FilterExec: {}", self.predicate) + } + } + } } /// The FilterExec streams wraps the input iterator and applies the predicate expression to diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index fad4fa585034..3059e2f746ce 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -28,8 +28,10 @@ use futures::{ }; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric}; -use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr}; +use crate::physical_plan::{ + Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan, + Partitioning, PhysicalExpr, SQLMetric, +}; use arrow::{ array::{Array, UInt32Builder}, @@ -257,6 +259,39 @@ impl ExecutionPlan for HashAggregateExec { metrics.insert("outputRows".to_owned(), (*self.output_rows).clone()); metrics } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "HashAggregateExec: mode={:?}", self.mode)?; + let g: Vec = self + .group_expr + .iter() + .map(|(e, alias)| { + let e = e.to_string(); + if &e != alias { + format!("{} as {}", e, alias) + } else { + e + } + }) + .collect(); + write!(f, ", gby=[{}]", g.join(", "))?; + + let a: Vec = self + .aggr_expr + .iter() + .map(|agg| agg.name().to_string()) + .collect(); + write!(f, ", aggr=[{}]", a.join(", "))?; + } + } + Ok(()) + } } /* diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 2682623d374a..0bf5a2857fde 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -58,7 +58,10 @@ use super::{ }; use crate::error::{DataFusionError, Result}; -use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use super::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, +}; use crate::physical_plan::coalesce_batches::concat_batches; use log::debug; @@ -393,6 +396,22 @@ impl ExecutionPlan for HashJoinExec { is_exhausted: false, })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "HashJoinExec: mode={:?}, join_type={:?}, on={:?}", + self.mode, self.join_type, self.on + ) + } + } + } } /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index c091196483f4..c56dbe141b2d 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -26,7 +26,9 @@ use futures::stream::Stream; use futures::stream::StreamExt; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning}; +use crate::physical_plan::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, +}; use arrow::array::ArrayRef; use arrow::compute::limit; use arrow::datatypes::SchemaRef; @@ -121,6 +123,18 @@ impl ExecutionPlan for GlobalLimitExec { let stream = self.input.execute(0).await?; Ok(Box::pin(LimitStream::new(stream, self.limit))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "GlobalLimitExec: limit={}", self.limit) + } + } + } } /// LocalLimitExec applies a limit to a single partition @@ -187,6 +201,18 @@ impl ExecutionPlan for LocalLimitExec { let stream = self.input.execute(partition).await?; Ok(Box::pin(LimitStream::new(stream, self.limit))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "LocalLimitExec: limit={}", self.limit) + } + } + } } /// Truncate a RecordBatch to maximum of n rows diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index 9022077559ac..85d8aeef073c 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -22,7 +22,10 @@ use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; -use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use super::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, +}; use crate::error::{DataFusionError, Result}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; @@ -88,6 +91,25 @@ impl ExecutionPlan for MemoryExec { self.projection.clone(), )?)) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + let partitions: Vec<_> = + self.partitions.iter().map(|b| b.len()).collect(); + write!( + f, + "MemoryExec: partitions={}, partition_sizes={:?}", + partitions.len(), + partitions + ) + } + } + } } impl MemoryExec { diff --git a/datafusion/src/physical_plan/merge.rs b/datafusion/src/physical_plan/merge.rs index c66532b73ccf..c65227c16114 100644 --- a/datafusion/src/physical_plan/merge.rs +++ b/datafusion/src/physical_plan/merge.rs @@ -36,8 +36,7 @@ use arrow::{ use super::RecordBatchStream; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::Partitioning; +use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use super::SendableRecordBatchStream; use pin_project_lite::pin_project; @@ -151,6 +150,18 @@ impl ExecutionPlan for MergeExec { } } } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "MergeExec") + } + } + } } pin_project! { diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index a8f6f0c35f00..6ab9570790e7 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -17,7 +17,7 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. -use std::fmt::{Debug, Display}; +use std::fmt::{self, Debug, Display}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::{any::Any, pin::Pin}; @@ -31,9 +31,10 @@ use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; use async_trait::async_trait; +pub use display::DisplayFormatType; use futures::stream::Stream; -use self::merge::MergeExec; +use self::{display::DisplayableExecutionPlan, merge::MergeExec}; use hashbrown::HashMap; /// Trait for types that stream [arrow::record_batch::RecordBatch] @@ -120,7 +121,16 @@ pub trait PhysicalPlanner { ) -> Result>; } -/// Partition-aware execution plan for a relation +/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. +/// +/// Each `ExecutionPlan` is Partition-aware and is responsible for +/// creating the actual `async` [`SendableRecordBatchStream`]s +/// of [`RecordBatch`] that incrementally compute the operator's +/// output from its input partition. +/// +/// [`ExecutionPlan`] can be displayed in an simplified form using the +/// return value from [`displayable`] in addition to the (normally +/// quite verbose) `Debug` output. #[async_trait] pub trait ExecutionPlan: Debug + Send + Sync { /// Returns the execution plan as [`Any`](std::any::Any) so that it can be @@ -152,6 +162,137 @@ pub trait ExecutionPlan: Debug + Send + Sync { fn metrics(&self) -> HashMap { HashMap::new() } + + /// Format this `ExecutionPlan` to `f` in the specified type. + /// + /// Should not include a newline + /// + /// Note this function prints a placeholder by default to preserve + /// backwards compatibility. + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ExecutionPlan(PlaceHolder)") + } +} + +/// Return a [wrapper](DisplayableExecutionPlan) around an +/// [`ExecutionPlan`] which can be displayed in various easier to +/// understand ways. +/// +/// ``` +/// use datafusion::prelude::*; +/// use datafusion::physical_plan::displayable; +/// +/// // Hard code concurrency as it appears in the RepartitionExec output +/// let config = ExecutionConfig::new() +/// .with_concurrency(3); +/// let mut ctx = ExecutionContext::with_config(config); +/// +/// // register the a table +/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap(); +/// +/// // create a plan to run a SQL query +/// let plan = ctx +/// .create_logical_plan("SELECT a FROM example WHERE a < 5") +/// .unwrap(); +/// let plan = ctx.optimize(&plan).unwrap(); +/// let physical_plan = ctx.create_physical_plan(&plan).unwrap(); +/// +/// // Format using display string +/// let displayable_plan = displayable(physical_plan.as_ref()); +/// let plan_string = format!("{}", displayable_plan.indent()); +/// +/// assert_eq!("ProjectionExec: expr=[a]\ +/// \n CoalesceBatchesExec: target_batch_size=4096\ +/// \n FilterExec: a < 5\ +/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ +/// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true", +/// plan_string.trim()); +/// ``` +/// +pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> { + DisplayableExecutionPlan::new(plan) +} + +/// Visit all children of this plan, according to the order defined on `ExecutionPlanVisitor`. +// Note that this would be really nice if it were a method on +// ExecutionPlan, but it can not be because it takes a generic +// parameter and `ExecutionPlan` is a trait +pub fn accept( + plan: &dyn ExecutionPlan, + visitor: &mut V, +) -> std::result::Result<(), V::Error> { + visitor.pre_visit(plan)?; + for child in plan.children() { + visit_execution_plan(child.as_ref(), visitor)?; + } + visitor.post_visit(plan)?; + Ok(()) +} + +/// Trait that implements the [Visitor +/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a +/// depth first walk of `ExecutionPlan` nodes. `pre_visit` is called +/// before any children are visited, and then `post_visit` is called +/// after all children have been visited. +//// +/// To use, define a struct that implements this trait and then invoke +/// ['accept']. +/// +/// For example, for an execution plan that looks like: +/// +/// ```text +/// ProjectionExec: #id +/// FilterExec: state = CO +/// CsvExec: +/// ``` +/// +/// The sequence of visit operations would be: +/// ```text +/// visitor.pre_visit(ProjectionExec) +/// visitor.pre_visit(FilterExec) +/// visitor.pre_visit(CsvExec) +/// visitor.post_visit(CsvExec) +/// visitor.post_visit(FilterExec) +/// visitor.post_visit(ProjectionExec) +/// ``` +pub trait ExecutionPlanVisitor { + /// The type of error returned by this visitor + type Error; + + /// Invoked on an `ExecutionPlan` plan before any of its child + /// inputs have been visited. If Ok(true) is returned, the + /// recursion continues. If Err(..) or Ok(false) are returned, the + /// recursion stops immediately and the error, if any, is returned + /// to `accept` + fn pre_visit( + &mut self, + plan: &dyn ExecutionPlan, + ) -> std::result::Result; + + /// Invoked on an `ExecutionPlan` plan *after* all of its child + /// inputs have been visited. The return value is handled the same + /// as the return value of `pre_visit`. The provided default + /// implementation returns `Ok(true)`. + fn post_visit( + &mut self, + _plan: &dyn ExecutionPlan, + ) -> std::result::Result { + Ok(true) + } +} + +/// Recursively calls `pre_visit` and `post_visit` for this node and +/// all of its children, as described on [`ExecutionPlanVisitor`] +pub fn visit_execution_plan( + plan: &dyn ExecutionPlan, + visitor: &mut V, +) -> std::result::Result<(), V::Error> { + visitor.pre_visit(plan)?; + for child in plan.children() { + visit_execution_plan(child.as_ref(), visitor)?; + } + visitor.post_visit(plan)?; + Ok(()) } /// Execute the [ExecutionPlan] and collect the results in memory @@ -290,6 +431,12 @@ pub trait AggregateExpr: Send + Sync + Debug { /// expressions that are passed to the Accumulator. /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. fn expressions(&self) -> Vec>; + + /// Human readable name such as `"MIN(c2)"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "AggregateExpr: default name" + } } /// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and @@ -351,6 +498,7 @@ pub mod cross_join; pub mod crypto_expressions; pub mod csv; pub mod datetime_expressions; +pub mod display; pub mod distinct_expressions; pub mod empty; pub mod explain; diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index dee0fc89a7a0..dd5e77bc21eb 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -27,7 +27,7 @@ use super::{ planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, }; -use crate::physical_plan::{common, ExecutionPlan, Partitioning}; +use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning}; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -864,6 +864,32 @@ impl ExecutionPlan for ParquetExec { inner: ReceiverStream::new(response_rx), })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + let files: Vec<_> = self + .partitions + .iter() + .map(|pp| pp.filenames.iter()) + .flatten() + .map(|s| s.as_str()) + .collect(); + + write!( + f, + "ParquetExec: batch_size={}, limit={:?}, partitions=[{}]", + self.batch_size, + self.limit, + files.join(", ") + ) + } + } + } } fn send_result( diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 664e4dccbdf9..d11e8e93d199 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,7 +23,6 @@ use super::{ aggregates, cross_join::CrossJoinExec, empty::EmptyExec, expressions::binary, functions, hash_join::PartitionMode, udaf, union::UnionExec, }; -use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; use crate::logical_plan::{ DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, @@ -45,6 +44,10 @@ use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalP use crate::prelude::JoinType; use crate::scalar::ScalarValue; use crate::variable::VarType; +use crate::{ + error::{DataFusionError, Result}, + physical_plan::displayable, +}; use arrow::compute::can_cast_types; use arrow::compute::SortOptions; @@ -383,7 +386,7 @@ impl DefaultPhysicalPlanner { if *verbose { stringified_plans.push(StringifiedPlan::new( PlanType::PhysicalPlan, - format!("{:#?}", input), + displayable(input.as_ref()).indent().to_string(), )); } Ok(Arc::new(ExplainExec::new( diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index a881beb453a0..c0d78ff7168b 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -26,7 +26,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr}; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, +}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -130,6 +132,31 @@ impl ExecutionPlan for ProjectionExec { input: self.input.execute(partition).await?, })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + let expr: Vec = self + .expr + .iter() + .map(|(e, alias)| { + let e = e.to_string(); + if &e != alias { + format!("{} as {}", e, alias) + } else { + e + } + }) + .collect(); + + write!(f, "ProjectionExec: expr=[{}]", expr.join(", ")) + } + } + } } fn batch_project( diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 7243550127bd..2599690bfc00 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -24,7 +24,7 @@ use std::task::{Context, Poll}; use std::{any::Any, collections::HashMap, vec}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{ExecutionPlan, Partitioning}; +use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::record_batch::RecordBatch; use arrow::{array::Array, error::Result as ArrowResult}; use arrow::{compute::take, datatypes::SchemaRef}; @@ -235,6 +235,18 @@ impl ExecutionPlan for RepartitionExec { input: UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1), })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "RepartitionExec: partitioning={:?}", self.partitioning) + } + } + } } impl RepartitionExec { diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 010e4068638b..822906019021 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -41,7 +41,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ - common, Distribution, ExecutionPlan, Partitioning, SQLMetric, + common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SQLMetric, }; /// Sort execution plan @@ -145,6 +145,19 @@ impl ExecutionPlan for SortExec { ))) } + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); + write!(f, "SortExec: [{}]", expr.join(",")) + } + } + } + fn metrics(&self) -> HashMap { let mut metrics = HashMap::new(); metrics.insert("outputRows".to_owned(), (*self.output_rows).clone()); diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs index 3dc6aa402f52..f7515d326d0a 100644 --- a/datafusion/src/physical_plan/udaf.rs +++ b/datafusion/src/physical_plan/udaf.rs @@ -165,4 +165,8 @@ impl AggregateExpr for AggregateFunctionExpr { fn create_accumulator(&self) -> Result> { (self.fun.accumulator)() } + + fn name(&self) -> &str { + &self.name + } } diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index a00dd6ac2821..b39f47bba07b 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -20,11 +20,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use datafusion::error::{DataFusionError, Result}; use datafusion::{ datasource::{datasource::Statistics, TableProvider}, physical_plan::collect, }; +use datafusion::{ + error::{DataFusionError, Result}, + physical_plan::DisplayFormatType, +}; use datafusion::execution::context::ExecutionContext; use datafusion::logical_plan::{col, Expr, LogicalPlan, LogicalPlanBuilder}; @@ -128,6 +131,18 @@ impl ExecutionPlan for CustomExecutionPlan { async fn execute(&self, _partition: usize) -> Result { Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "CustomExecutionPlan: projection={:#?}", self.projection) + } + } + } } impl TableProvider for CustomTableProvider { diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index c80ffe4d3467..0b9cc2ae18b9 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -31,9 +31,8 @@ use arrow::{ util::display::array_value_to_string, }; -use datafusion::execution::context::ExecutionContext; use datafusion::logical_plan::LogicalPlan; -use datafusion::prelude::create_udf; +use datafusion::prelude::*; use datafusion::{ datasource::{csv::CsvReadOptions, MemTable}, physical_plan::collect, @@ -42,6 +41,7 @@ use datafusion::{ error::{DataFusionError, Result}, physical_plan::ColumnarValue, }; +use datafusion::{execution::context::ExecutionContext, physical_plan::displayable}; #[tokio::test] async fn nyc() -> Result<()> { @@ -2932,3 +2932,47 @@ async fn test_cast_expressions_error() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_physical_plan_display_indent() { + // Hard code concurrency as it appears in the RepartitionExec output + let config = ExecutionConfig::new().with_concurrency(3); + let mut ctx = ExecutionContext::with_config(config); + register_aggregate_csv(&mut ctx).unwrap(); + let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ + FROM aggregate_test_100 \ + WHERE c12 < 10 \ + GROUP BY c1 \ + ORDER BY the_min DESC \ + LIMIT 10"; + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + + let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let expected = vec![ + "GlobalLimitExec: limit=10", + " SortExec: [the_min DESC]", + " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", + " HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " MergeExec", + " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " CoalesceBatchesExec: target_batch_size=4096", + " FilterExec: c12 < CAST(10 AS Float64)", + " RepartitionExec: partitioning=RoundRobinBatch(3)", + " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", + ]; + + let data_path = arrow::util::test_util::arrow_test_data(); + let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) + .trim() + .lines() + // normalize paths + .map(|s| s.replace(&data_path, "ARROW_TEST_DATA")) + .collect::>(); + + assert_eq!( + expected, actual, + "expected:\n{:#?}\nactual:\n\n{:#?}\n", + expected, actual + ); +} diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 5e38c57b6f1b..8914c05e8f88 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -75,8 +75,8 @@ use datafusion::{ optimizer::{optimizer::OptimizerRule, utils::optimize_children}, physical_plan::{ planner::{DefaultPhysicalPlanner, ExtensionPlanner}, - Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream, - SendableRecordBatchStream, + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, + RecordBatchStream, SendableRecordBatchStream, }, prelude::{ExecutionConfig, ExecutionContext}, }; @@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> { let mut ctx = setup_table(make_topk_context()).await?; let expected = vec![ - "| logical_plan after topk | TopK: k=3 |", - "| | Projection: #customer_id, #revenue |", - "| | TableScan: sales projection=Some([0, 1]) |", + "| logical_plan after topk | TopK: k=3 |", + "| | Projection: #customer_id, #revenue |", + "| | TableScan: sales projection=Some([0, 1]) |", ].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {}", QUERY); @@ -397,6 +397,18 @@ impl ExecutionPlan for TopKExec { state: BTreeMap::new(), })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "TopKExec: k={}", self.k) + } + } + } } // A very specialized TopK implementation