Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::prelude::*;

use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -343,21 +344,27 @@ async fn execute_query(
debug: bool,
) -> Result<Vec<RecordBatch>> {
if debug {
println!("Logical plan:\n{:?}", plan);
println!("=== Logical plan ===\n{:?}\n", plan);
}
let plan = ctx.optimize(plan)?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
let physical_plan = ctx.create_physical_plan(&plan)?;
if debug {
println!(
"Physical plan:\n{}",
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent().to_string()
);
}
let result = collect(physical_plan).await?;
let result = collect(physical_plan.clone()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent()
.to_string()
);
pretty::print_batches(&result)?;
}
Ok(result)
Expand Down
50 changes: 44 additions & 6 deletions datafusion/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,27 @@ pub enum DisplayFormatType {
/// Wraps an `ExecutionPlan` with various ways to display this plan
pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
/// whether to show metrics or not
with_metrics: bool,
}

impl<'a> DisplayableExecutionPlan<'a> {
/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// pretty printed in a variety of ways
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self { inner }
Self {
inner,
with_metrics: false,
}
}

/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// pretty printed in a variety of ways
pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
with_metrics: true,
}
}

/// Return a `format`able structure that produces a single line
Expand All @@ -53,15 +67,26 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// CsvExec: source=...",
/// ```
pub fn indent(&self) -> impl fmt::Display + 'a {
struct Wrapper<'a>(&'a dyn ExecutionPlan);
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
with_metrics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let t = DisplayFormatType::Default;
let mut visitor = IndentVisitor { t, f, indent: 0 };
accept(self.0, &mut visitor)
let mut visitor = IndentVisitor {
t,
f,
indent: 0,
with_metrics: self.with_metrics,
};
accept(self.plan, &mut visitor)
}
}
Wrapper(self.inner)
Wrapper {
plan: self.inner,
with_metrics: self.with_metrics,
}
}
}

Expand All @@ -71,8 +96,10 @@ struct IndentVisitor<'a, 'b> {
t: DisplayFormatType,
/// Write to this formatter
f: &'a mut fmt::Formatter<'b>,
///with_schema: bool,
/// Indent size
indent: usize,
/// whether to show metrics or not
with_metrics: bool,
}

impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
Expand All @@ -83,6 +110,17 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
) -> std::result::Result<bool, Self::Error> {
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
plan.fmt_as(self.t, self.f)?;
if self.with_metrics {
write!(
self.f,
", metrics=[{}]",
plan.metrics()
.iter()
.map(|(k, v)| format!("{}={:?}", k, v.value))
.collect::<Vec<_>>()
.join(", ")
)?;
}
writeln!(self.f)?;
self.indent += 1;
Ok(true)
Expand Down