diff --git a/ballista-cli/src/tui/http_client.rs b/ballista-cli/src/tui/http_client.rs index c0820a976..71c35fb08 100644 --- a/ballista-cli/src/tui/http_client.rs +++ b/ballista-cli/src/tui/http_client.rs @@ -129,7 +129,7 @@ impl HttpClient { "job/{}/stages{}", self.url_encode(job_id), if self.config.job.stage.plan.tree { - "?render_tree=true" + "?plan_format=tree" } else { "" } diff --git a/ballista/scheduler/src/api/handlers.rs b/ballista/scheduler/src/api/handlers.rs index 2a6d674aa..16a134e25 100644 --- a/ballista/scheduler/src/api/handlers.rs +++ b/ballista/scheduler/src/api/handlers.rs @@ -10,6 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::display::format_stage_metrics; use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::state::execution_graph::ExecutionStage; use crate::state::execution_graph_dot::ExecutionGraphDot; @@ -204,8 +205,20 @@ pub struct QueryStageSummary { #[derive(Debug, serde::Deserialize, Default)] pub struct JobQueryParams { - /// Flag to tree-style render for physical plan - pub render_tree: Option, + /// Controls plan format + pub plan_format: Option, +} + +#[derive(Debug, serde::Deserialize, Default, Clone)] +#[serde(rename_all = "snake_case")] +pub enum PlanFormat { + /// ?plan_format=default => plain indent, no metrics + #[default] + Default, + /// ?plan_format=tree => tree render, no metrics + Tree, + /// ?plan_format=metrics => indent with aggregated metrics + Metrics, } pub async fn get_scheduler_state< @@ -382,16 +395,17 @@ pub async fn get_job< let percent_complete = ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; - let render_tree = query.render_tree.unwrap_or(false); + let plan_format = query.plan_format.clone().unwrap_or_default(); - let physical_plan = if render_tree { - displayable(job.physical_plan().as_ref()) + let physical_plan = match plan_format { + PlanFormat::Default | PlanFormat::Metrics => { + DisplayableExecutionPlan::new(job.physical_plan().as_ref()) + .indent(false) + .to_string() + } + PlanFormat::Tree => displayable(job.physical_plan().as_ref()) .tree_render() - .to_string() - } else { - DisplayableExecutionPlan::new(job.physical_plan().as_ref()) - .indent(false) - .to_string() + .to_string(), }; Ok(Json(JobResponse { @@ -490,7 +504,7 @@ pub async fn get_query_stages< Path(job_id): Path, query: Query, ) -> Result { - let render_tree = query.render_tree.unwrap_or(false); + let plan_format = query.plan_format.clone().unwrap_or_default(); if let Some(graph) = data_server .state @@ -523,11 +537,12 @@ pub async fn get_query_stages< }; match stage { ExecutionStage::Running(running_stage) => { - summary.stage_plan = if render_tree { - Some(displayable(running_stage.plan.as_ref()).tree_render().to_string()) - } else { - Some(displayable(running_stage.plan.as_ref()).indent(false).to_string()) - }; + let metrics = running_stage.stage_metrics.as_deref().unwrap_or(&[]); + summary.stage_plan = Some(match plan_format { + PlanFormat::Default => displayable(running_stage.plan.as_ref()).indent(false).to_string(), + PlanFormat::Tree => displayable(running_stage.plan.as_ref()).tree_render().to_string(), + PlanFormat::Metrics => format_stage_metrics(running_stage.plan.as_ref(), metrics), + }); summary.input_rows = running_stage .stage_metrics .as_ref() @@ -577,11 +592,11 @@ pub async fn get_query_stages< .collect(); } ExecutionStage::Successful(completed_stage) => { - summary.stage_plan = if render_tree { - Some(displayable(completed_stage.plan.as_ref()).tree_render().to_string()) - } else { - Some(displayable(completed_stage.plan.as_ref()).indent(false).to_string()) - }; + summary.stage_plan = Some(match plan_format { + PlanFormat::Default => displayable(completed_stage.plan.as_ref()).indent(false).to_string(), + PlanFormat::Tree => displayable(completed_stage.plan.as_ref()).tree_render().to_string(), + PlanFormat::Metrics => format_stage_metrics(completed_stage.plan.as_ref(), &completed_stage.stage_metrics), + }); summary.input_rows = get_combined_count( &completed_stage.stage_metrics, "input_rows", diff --git a/ballista/scheduler/src/display.rs b/ballista/scheduler/src/display.rs index 59dc2f475..f29f9e6dd 100644 --- a/ballista/scheduler/src/display.rs +++ b/ballista/scheduler/src/display.rs @@ -23,11 +23,32 @@ use ballista_core::utils::collect_plan_metrics; use datafusion::logical_expr::{StringifiedPlan, ToStringifiedPlan}; use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, accept, + DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, accept, displayable, }; use log::{error, info}; use std::fmt; +/// Merges executor-collected stage metrics into the plan's metric slots. +/// Returns None if the lengths don't match, which indicates a mismatch between two collections of metrics +fn merge_stage_metrics( + plan: &dyn ExecutionPlan, + stage_metrics: &[MetricsSet], +) -> Option> { + let mut plan_metrics = collect_plan_metrics(plan); + if plan_metrics.len() != stage_metrics.len() { + return None; + } + plan_metrics + .iter_mut() + .zip(stage_metrics) + .for_each(|(plan_metric, stage_metric)| { + stage_metric + .iter() + .for_each(|s| plan_metric.push(s.clone())); + }); + Some(plan_metrics) +} + /// Prints the physical plan for a completed stage with its aggregated metrics. pub fn print_stage_metrics( job_id: &str, @@ -37,25 +58,34 @@ pub fn print_stage_metrics( ) { // The plan_metrics collected here is a snapshot clone from the plan metrics. // They are all empty now and need to combine with the stage metrics in the ExecutionStages - let mut plan_metrics = collect_plan_metrics(plan); - if plan_metrics.len() == stage_metrics.len() { - plan_metrics.iter_mut().zip(stage_metrics).for_each( - |(plan_metric, stage_metric)| { - stage_metric - .iter() - .for_each(|s| plan_metric.push(s.clone())); - }, - ); - - info!( + match merge_stage_metrics(plan, stage_metrics) { + Some(plan_metrics) => info!( "\n=== [{}/{}] Stage finished, physical plan with metrics ===\n{}", job_id, stage_id, DisplayableBallistaExecutionPlan::new(plan, &plan_metrics).indent() - ); - } else { - error!("Fail to combine stage metrics to plan for stage [{}/{}], plan metrics array size {} does not equal - to the stage metrics array size {}", job_id, stage_id, plan_metrics.len(), stage_metrics.len()); + ), + None => error!( + "Fail to combine stage metrics to plan for stage [{}/{}], \ + plan metrics array size {} does not equal stage metrics array size {}", + job_id, + stage_id, + collect_plan_metrics(plan).len(), + stage_metrics.len() + ), + } +} + +/// Formats the plan for a completed stage with its aggregated metrics. +pub fn format_stage_metrics( + plan: &dyn ExecutionPlan, + stage_metrics: &[MetricsSet], +) -> String { + match merge_stage_metrics(plan, stage_metrics) { + Some(plan_metrics) => DisplayableBallistaExecutionPlan::new(plan, &plan_metrics) + .indent() + .to_string(), + None => displayable(plan).indent(false).to_string(), } } diff --git a/dev/bin/showplan.sh b/dev/bin/showplan.sh index 891c3c199..bff9b2423 100755 --- a/dev/bin/showplan.sh +++ b/dev/bin/showplan.sh @@ -114,8 +114,8 @@ # } # ``` # -# Job and stages endpoint accept `render_tree=true` parameter which -# renders plans as tree. +# Job and stages endpoint accept `plan_format` optional parameter which +# can render plan as a tree or a stages' metrics (i.e plan_format=tree) # # Command actions and options # @@ -133,6 +133,7 @@ # (from stages info; combinable with other mode flags) # -w displays result with no word wrap, horizontal scroll) # -t display plans as tree render (where applicable) +# -m display stage metrics (if available) # # Multiple mode flags can be combined and each will be printed in order. # Example: -p -l prints physical plan followed by logical plan. @@ -146,6 +147,7 @@ DISPLAY_MODE=() # modes accumulate via flags; defaults to (physical) if STAGE_ID="" USE_PAGER=false RENDER_TREE=false +RENDER_METRICS=false usage() { echo "Usage: $(basename "$0") [job_id] [-a address] [-p] [-l] [-e] [-s stage_id]" @@ -157,6 +159,7 @@ usage() { echo " -s [STAGE_ID] display stage_plan for the given stage, or all stages if omitted (combinable)" echo " -w displays result with no word wrap (horizontal scroll)" echo " -t display plans as tree render (where applicable)" + echp " -m display stage metrics (if available)" echo "" echo " Mode flags are cumulative — each selected mode is printed in order." echo " Example: $(basename "$0") -p -l prints physical plan then logical plan." @@ -208,6 +211,10 @@ while [[ $# -gt 0 ]]; do RENDER_TREE=true shift ;; + -m) + RENDER_METRICS=true + shift + ;; *) echo "Unknown option: $1" usage @@ -220,6 +227,12 @@ if [[ ${#DISPLAY_MODE[@]} -eq 0 ]]; then DISPLAY_MODE=("physical") fi +# Checking for mutually exclusive params +if [[ "$RENDER_TREE" == "true" && "$RENDER_METRICS" == "true" ]]; then + echo "Error: -t and -m are mutually exclusive" >&2 + usage +fi + # Check dependencies for cmd in curl jq tput; do if ! command -v "$cmd" &> /dev/null; then @@ -240,10 +253,13 @@ if [[ -z "$JOB_ID" ]]; then fi fi -# Build job info and stages URLs (optionally with render_tree query param) +# Build job info and stages URLs (optionally with plan_format query param) if [[ "$RENDER_TREE" == "true" ]]; then - JOB_URL="${API_BASE}api/job/${JOB_ID}?render_tree=true" - STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages?render_tree=true" + JOB_URL="${API_BASE}api/job/${JOB_ID}?plan_format=tree" + STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages?plan_format=tree" +elif [[ "$RENDER_METRICS" == "true" ]]; then + JOB_URL="${API_BASE}api/job/${JOB_ID}?plan_format=metrics" + STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages?plan_format=metrics" else JOB_URL="${API_BASE}api/job/${JOB_ID}" STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages"