Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 11 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2725,14 +2725,17 @@ impl DefaultPhysicalPlanner {
ExplainAnalyzeCategories::All => None,
ExplainAnalyzeCategories::Only(cats) => Some(cats),
};
Ok(Arc::new(AnalyzeExec::new(
a.verbose,
show_statistics,
metric_types,
metric_categories,
input,
schema,
)))
Ok(Arc::new(
AnalyzeExec::new(
a.verbose,
show_statistics,
metric_types,
metric_categories,
input,
schema,
)
.with_format(a.format.clone()),
))
}

/// Optimize a physical plan by applying each physical optimizer,
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,7 @@ impl LogicalPlanBuilder {
if explain_option.analyze {
Ok(Self::new(LogicalPlan::Analyze(Analyze {
verbose: explain_option.verbose,
format: explain_option.format,
input: self.plan,
schema,
})))
Expand Down
7 changes: 6 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,7 @@ impl LogicalPlan {
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Analyze(Analyze {
verbose: a.verbose,
format: a.format.clone(),
schema: Arc::clone(&a.schema),
input: Arc::new(input),
}))
Expand Down Expand Up @@ -3299,13 +3300,17 @@ impl PartialOrd for Explain {
pub struct Analyze {
/// Should extra detail be included?
pub verbose: bool,
/// Output syntax/format for the rendered physical plan + metrics.
pub format: ExplainFormat,
/// The logical plan that is being EXPLAIN ANALYZE'd
pub input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
// Manual implementation needed because of `schema` field, and because
// `ExplainFormat` does not implement `PartialOrd`. Comparison excludes both
// fields.
impl PartialOrd for Analyze {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.verbose.partial_cmp(&other.verbose) {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,13 @@ impl TreeNode for LogicalPlan {
}),
LogicalPlan::Analyze(Analyze {
verbose,
format,
input,
schema,
}) => input.map_elements(f)?.update_data(|input| {
LogicalPlan::Analyze(Analyze {
verbose,
format,
input,
schema,
})
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ log = { workspace = true }
num-traits = { workspace = true }
parking_lot = { workspace = true }
pin-project-lite = "^0.2.7"
serde_json = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
Expand All @@ -81,6 +82,9 @@ insta = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
rstest_reuse = "0.7.0"
# Ensure `pgjson_snapshot_of_sample_plan` sees insertion-order JSON output
# regardless of feature unification with upstream consumers.
serde_json = { workspace = true, features = ["preserve_order"] }
tokio = { workspace = true, features = [
"rt-multi-thread",
"fs",
Expand Down
130 changes: 91 additions & 39 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ use crate::metrics::{MetricCategory, MetricType};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::format::ExplainFormat;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
use datafusion_common::{
DataFusionError, Result, assert_eq_or_internal_err, internal_err,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::PhysicalExpr;
Expand All @@ -50,6 +53,8 @@ pub struct AnalyzeExec {
metric_types: Vec<MetricType>,
/// Optional filter by semantic category (rows / bytes / timing).
metric_categories: Option<Vec<MetricCategory>>,
/// Output format for the rendered plan + metrics.
format: ExplainFormat,
/// The input plan (the plan being analyzed)
pub(crate) input: Arc<dyn ExecutionPlan>,
/// The output schema for RecordBatches of this exec node
Expand All @@ -58,7 +63,7 @@ pub struct AnalyzeExec {
}

impl AnalyzeExec {
/// Create a new AnalyzeExec
/// Create a new AnalyzeExec with the default output format (indent).
pub fn new(
verbose: bool,
show_statistics: bool,
Expand All @@ -73,12 +78,19 @@ impl AnalyzeExec {
show_statistics,
metric_types,
metric_categories,
format: ExplainFormat::Indent,
input,
schema,
cache: Arc::new(cache),
}
}

/// Builder: set the output format (indent or pgjson).
pub fn with_format(mut self, format: ExplainFormat) -> Self {
self.format = format;
self
}

/// Access to verbose
pub fn verbose(&self) -> bool {
self.verbose
Expand All @@ -94,6 +106,11 @@ impl AnalyzeExec {
self.metric_categories.as_deref()
}

/// Access to format
pub fn format(&self) -> &ExplainFormat {
&self.format
}

/// The input plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
Expand Down Expand Up @@ -160,14 +177,17 @@ impl ExecutionPlan for AnalyzeExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(
self.verbose,
self.show_statistics,
self.metric_types.clone(),
self.metric_categories.clone(),
children.pop().unwrap(),
Arc::clone(&self.schema),
)))
Ok(Arc::new(
Self::new(
self.verbose,
self.show_statistics,
self.metric_types.clone(),
self.metric_categories.clone(),
children.pop().unwrap(),
Arc::clone(&self.schema),
)
.with_format(self.format.clone()),
))
}

fn execute(
Expand Down Expand Up @@ -204,6 +224,7 @@ impl ExecutionPlan for AnalyzeExec {
let show_statistics = self.show_statistics;
let metric_types = self.metric_types.clone();
let metric_categories = self.metric_categories.clone();
let format = self.format.clone();

// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
Expand All @@ -225,6 +246,7 @@ impl ExecutionPlan for AnalyzeExec {
&captured_schema,
&metric_types,
metric_categories.as_deref(),
&format,
)
};

Expand All @@ -246,39 +268,69 @@ fn create_output_batch(
schema: &SchemaRef,
metric_types: &[MetricType],
metric_categories: Option<&[MetricCategory]>,
format: &ExplainFormat,
) -> Result<RecordBatch> {
let mut type_builder = StringBuilder::with_capacity(1, 1024);
let mut plan_builder = StringBuilder::with_capacity(1, 1024);

// TODO use some sort of enum rather than strings?
type_builder.append_value("Plan with Metrics");

let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
.set_metric_types(metric_types.to_vec())
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);

// Verbose output
// TODO make this more sophisticated
if verbose {
type_builder.append_value("Plan with Full Metrics");

let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
.set_metric_types(metric_types.to_vec())
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);

type_builder.append_value("Output Rows");
plan_builder.append_value(total_rows.to_string());

type_builder.append_value("Duration");
plan_builder.append_value(format!("{duration:?}"));
match format {
ExplainFormat::Indent => {
// TODO use some sort of enum rather than strings?
type_builder.append_value("Plan with Metrics");

let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
.set_metric_types(metric_types.to_vec())
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);

// Verbose output
// TODO make this more sophisticated
if verbose {
type_builder.append_value("Plan with Full Metrics");

let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(input.as_ref())
.set_metric_types(metric_types.to_vec())
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);

type_builder.append_value("Output Rows");
plan_builder.append_value(total_rows.to_string());

type_builder.append_value("Duration");
plan_builder.append_value(format!("{duration:?}"));
}
}
ExplainFormat::PostgresJSON => {
// For pgjson we emit a single self-contained JSON array so the
// result remains parseable. Summary values (total rows / duration)
// are attached to the root object in verbose mode rather than
// emitted as separate rows.
type_builder.append_value("Plan with Metrics");

let mut displayable = if verbose {
DisplayableExecutionPlan::with_full_metrics(input.as_ref())
} else {
DisplayableExecutionPlan::with_metrics(input.as_ref())
};
displayable = displayable
.set_metric_types(metric_types.to_vec())
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
.set_show_statistics(show_statistics);
if verbose {
displayable = displayable.set_summary(Some(total_rows), Some(duration));
}
plan_builder.append_value(displayable.pgjson(verbose).to_string());
}
ExplainFormat::Tree | ExplainFormat::Graphviz => {
return internal_err!("AnalyzeExec does not support {format} output format");
}
}

RecordBatch::try_new(
Expand Down
Loading
Loading