Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions datafusion/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ pub struct IndentVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
/// If true, includes summarized schema information
with_schema: bool,
indent: u32,
/// The current indent
indent: usize,
}

impl<'a, 'b> IndentVisitor<'a, 'b> {
Expand All @@ -42,13 +43,6 @@ impl<'a, 'b> IndentVisitor<'a, 'b> {
indent: 0,
}
}

fn write_indent(&mut self) -> fmt::Result {
for _ in 0..self.indent {
write!(self.f, " ")?;
}
Ok(())
}
}

impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
Expand All @@ -58,8 +52,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
if self.indent > 0 {
writeln!(self.f)?;
}
self.write_indent()?;

write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a better way to make indents that I found while googling around

write!(self.f, "{}", plan.display())?;
if self.with_schema {
write!(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,15 @@ pub enum Partitioning {
/// after all children have been visited.
////
/// To use, define a struct that implements this trait and then invoke
/// "LogicalPlan::accept".
/// [`LogicalPlan::accept`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What this change does? better looking in the doc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes it an hyperlink in API docs :)

///
/// For example, for a logical plan like:
///
/// ```text
/// Projection: #id
/// Filter: #state Eq Utf8(\"CO\")\
/// CsvScan: employee.csv projection=Some([0, 3])";
/// ```
///
/// The sequence of visit operations would be:
/// ```text
Expand Down
19 changes: 18 additions & 1 deletion datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use std::task::{Context, Poll};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};

use arrow::compute::kernels::concat::concat;
Expand Down Expand Up @@ -114,6 +115,22 @@ impl ExecutionPlan for CoalesceBatchesExec {
is_closed: false,
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size
)
}
}
}
}

struct CoalesceBatchesStream {
Expand Down
19 changes: 16 additions & 3 deletions datafusion/src/physical_plan/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use futures::{lock::Mutex, StreamExt};
use std::{any::Any, sync::Arc, task::Poll};

use crate::physical_plan::memory::MemoryStream;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
Expand All @@ -36,8 +35,10 @@ use crate::{
use async_trait::async_trait;
use std::time::Instant;

use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
use crate::physical_plan::coalesce_batches::concat_batches;
use super::{
coalesce_batches::concat_batches, memory::MemoryStream, DisplayFormatType,
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
use log::debug;

/// Data of the left side
Expand Down Expand Up @@ -192,6 +193,18 @@ impl ExecutionPlan for CrossJoinExec {
join_time: 0,
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "CrossJoinExec")
}
}
}
}

/// A stream that issues [RecordBatch]es as they arrive from the right of the join.
Expand Down
32 changes: 30 additions & 2 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Execution plan for reading CSV files

use crate::error::{DataFusionError, Result};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, Partitioning};
use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::csv;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -135,6 +134,19 @@ impl std::fmt::Debug for Source {
}
}

impl std::fmt::Display for Source {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Source::PartitionedFiles { path, filenames } => {
write!(f, "Path({}: [{}])", path, filenames.join(","))
}
Source::Reader(_) => {
write!(f, "Reader(...)")
}
}
}
}

impl Clone for Source {
fn clone(&self) -> Self {
match self {
Expand Down Expand Up @@ -405,6 +417,22 @@ impl ExecutionPlan for CsvExec {
}
}
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"CsvExec: source={}, has_header={}",
self.source, self.has_header
)
}
}
}
}

/// Iterator over batches
Expand Down
90 changes: 90 additions & 0 deletions datafusion/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Implementation of physical plan display. See
//! [`crate::physical_plan::displayable`] for examples of how to
//! format

use std::fmt;

use super::{accept, ExecutionPlan, ExecutionPlanVisitor};

/// Options for controlling how each [`ExecutionPlan`] should format itself
#[derive(Debug, Clone, Copy)]
pub enum DisplayFormatType {
/// Default, compact format. Example: `FilterExec: c12 < 10.0`
Default,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I envision adding more types (e.g. Graphviz) as needs evolve

}

/// Wraps an `ExecutionPlan` with various ways to display this plan
pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
}

impl<'a> DisplayableExecutionPlan<'a> {
/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// pretty printed in a variety of ways
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self { inner }
}

/// Return a `format`able structure that produces a single line
/// per node.
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=4096
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
/// ```
pub fn indent(&self) -> impl fmt::Display + 'a {
struct Wrapper<'a>(&'a dyn ExecutionPlan);
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let t = DisplayFormatType::Default;
let mut visitor = IndentVisitor { t, f, indent: 0 };
accept(self.0, &mut visitor)
}
}
Wrapper(self.inner)
}
}

/// Formats plans with a single line per node.
struct IndentVisitor<'a, 'b> {
/// How to format each node
t: DisplayFormatType,
/// Write to this formatter
f: &'a mut fmt::Formatter<'b>,
///with_schema: bool,
indent: usize,
}

impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
type Error = fmt::Error;
fn pre_visit(
&mut self,
plan: &dyn ExecutionPlan,
) -> std::result::Result<bool, Self::Error> {
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
plan.fmt_as(self.t, self.f)?;
writeln!(self.f)?;
self.indent += 1;
Ok(true)
}
}
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl AggregateExpr for DistinctCount {
count_data_type: self.data_type.clone(),
}))
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
Expand Down
17 changes: 15 additions & 2 deletions datafusion/src/physical_plan/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::any::Any;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::memory::MemoryStream;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
use crate::physical_plan::{
memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
};
use arrow::array::NullArray;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -120,6 +121,18 @@ impl ExecutionPlan for EmptyExec {
None,
)?))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
}
}
}
}

#[cfg(test)]
Expand Down
19 changes: 15 additions & 4 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
use std::any::Any;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::{
error::{DataFusionError, Result},
logical_plan::StringifiedPlan,
physical_plan::{common::SizedRecordBatchStream, ExecutionPlan},
physical_plan::Partitioning,
physical_plan::{common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};

use crate::physical_plan::Partitioning;

use super::SendableRecordBatchStream;
use async_trait::async_trait;

Expand Down Expand Up @@ -122,4 +121,16 @@ impl ExecutionPlan for ExplainExec {
vec![Arc::new(record_batch)],
)))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "ExplainExec")
}
}
}
}
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/expressions/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ impl AggregateExpr for Avg {
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}
}

/// An accumulator to compute the average
Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl AggregateExpr for Count {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(CountAccumulator::new()))
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
Expand Down
8 changes: 8 additions & 0 deletions datafusion/src/physical_plan/expressions/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl AggregateExpr for Max {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
}

fn name(&self) -> &str {
&self.name
}
}

// Statically-typed version of min/max(array) -> ScalarValue for string types.
Expand Down Expand Up @@ -387,6 +391,10 @@ impl AggregateExpr for Min {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
Expand Down
Loading