Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista-cli/src/tui/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl HttpClient {
"job/{}/stages{}",
self.url_encode(job_id),
if self.config.job.stage.plan.tree {
"?render_tree=true"
"?plan_format=tree"
} else {
""
}
Expand Down
57 changes: 36 additions & 21 deletions ballista/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::display::format_stage_metrics;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::execution_graph::ExecutionStage;
use crate::state::execution_graph_dot::ExecutionGraphDot;
Expand Down Expand Up @@ -204,8 +205,20 @@ pub struct QueryStageSummary {

#[derive(Debug, serde::Deserialize, Default)]
pub struct JobQueryParams {
/// Flag to tree-style render for physical plan
pub render_tree: Option<bool>,
/// Controls plan format
pub plan_format: Option<PlanFormat>,
}

#[derive(Debug, serde::Deserialize, Default, Clone)]
#[serde(rename_all = "snake_case")]
pub enum PlanFormat {
/// ?plan_format=default => plain indent, no metrics
#[default]
Default,
/// ?plan_format=tree => tree render, no metrics
Comment thread
martin-g marked this conversation as resolved.
Tree,
/// ?plan_format=metrics => indent with aggregated metrics
Metrics,
}

pub async fn get_scheduler_state<
Expand Down Expand Up @@ -382,16 +395,17 @@ pub async fn get_job<
let percent_complete =
((completed_stages as f32 / num_stages as f32) * 100_f32) as u8;

let render_tree = query.render_tree.unwrap_or(false);
let plan_format = query.plan_format.clone().unwrap_or_default();

let physical_plan = if render_tree {
displayable(job.physical_plan().as_ref())
let physical_plan = match plan_format {
PlanFormat::Default | PlanFormat::Metrics => {
DisplayableExecutionPlan::new(job.physical_plan().as_ref())
.indent(false)
.to_string()
}
PlanFormat::Tree => displayable(job.physical_plan().as_ref())
.tree_render()
.to_string()
} else {
DisplayableExecutionPlan::new(job.physical_plan().as_ref())
.indent(false)
.to_string()
.to_string(),
};

Ok(Json(JobResponse {
Expand Down Expand Up @@ -490,7 +504,7 @@ pub async fn get_query_stages<
Path(job_id): Path<String>,
query: Query<JobQueryParams>,
) -> Result<impl IntoResponse, SchedulerErrorResponse> {
let render_tree = query.render_tree.unwrap_or(false);
let plan_format = query.plan_format.clone().unwrap_or_default();

if let Some(graph) = data_server
.state
Expand Down Expand Up @@ -523,11 +537,12 @@ pub async fn get_query_stages<
};
match stage {
ExecutionStage::Running(running_stage) => {
summary.stage_plan = if render_tree {
Some(displayable(running_stage.plan.as_ref()).tree_render().to_string())
} else {
Some(displayable(running_stage.plan.as_ref()).indent(false).to_string())
};
let metrics = running_stage.stage_metrics.as_deref().unwrap_or(&[]);
summary.stage_plan = Some(match plan_format {
PlanFormat::Default => displayable(running_stage.plan.as_ref()).indent(false).to_string(),
PlanFormat::Tree => displayable(running_stage.plan.as_ref()).tree_render().to_string(),
PlanFormat::Metrics => format_stage_metrics(running_stage.plan.as_ref(), metrics),
});
summary.input_rows = running_stage
.stage_metrics
.as_ref()
Expand Down Expand Up @@ -577,11 +592,11 @@ pub async fn get_query_stages<
.collect();
}
ExecutionStage::Successful(completed_stage) => {
summary.stage_plan = if render_tree {
Some(displayable(completed_stage.plan.as_ref()).tree_render().to_string())
} else {
Some(displayable(completed_stage.plan.as_ref()).indent(false).to_string())
};
summary.stage_plan = Some(match plan_format {
PlanFormat::Default => displayable(completed_stage.plan.as_ref()).indent(false).to_string(),
PlanFormat::Tree => displayable(completed_stage.plan.as_ref()).tree_render().to_string(),
PlanFormat::Metrics => format_stage_metrics(completed_stage.plan.as_ref(), &completed_stage.stage_metrics),
});
summary.input_rows = get_combined_count(
&completed_stage.stage_metrics,
"input_rows",
Expand Down
62 changes: 46 additions & 16 deletions ballista/scheduler/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,32 @@ use ballista_core::utils::collect_plan_metrics;
use datafusion::logical_expr::{StringifiedPlan, ToStringifiedPlan};
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, accept,
DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, accept, displayable,
};
use log::{error, info};
use std::fmt;

/// Merges executor-collected stage metrics into the plan's metric slots.
/// Returns None if the lengths don't match, which indicates a mismatch between two collections of metrics
fn merge_stage_metrics(
plan: &dyn ExecutionPlan,
stage_metrics: &[MetricsSet],
) -> Option<Vec<MetricsSet>> {
let mut plan_metrics = collect_plan_metrics(plan);
if plan_metrics.len() != stage_metrics.len() {
return None;
}
plan_metrics
.iter_mut()
.zip(stage_metrics)
.for_each(|(plan_metric, stage_metric)| {
stage_metric
.iter()
.for_each(|s| plan_metric.push(s.clone()));
});
Some(plan_metrics)
}

/// Prints the physical plan for a completed stage with its aggregated metrics.
pub fn print_stage_metrics(
job_id: &str,
Expand All @@ -37,25 +58,34 @@ pub fn print_stage_metrics(
) {
// The plan_metrics collected here is a snapshot clone from the plan metrics.
// They are all empty now and need to combine with the stage metrics in the ExecutionStages
let mut plan_metrics = collect_plan_metrics(plan);
if plan_metrics.len() == stage_metrics.len() {
plan_metrics.iter_mut().zip(stage_metrics).for_each(
|(plan_metric, stage_metric)| {
stage_metric
.iter()
.for_each(|s| plan_metric.push(s.clone()));
},
);

info!(
match merge_stage_metrics(plan, stage_metrics) {
Some(plan_metrics) => info!(
"\n=== [{}/{}] Stage finished, physical plan with metrics ===\n{}",
job_id,
stage_id,
DisplayableBallistaExecutionPlan::new(plan, &plan_metrics).indent()
);
} else {
error!("Fail to combine stage metrics to plan for stage [{}/{}], plan metrics array size {} does not equal
to the stage metrics array size {}", job_id, stage_id, plan_metrics.len(), stage_metrics.len());
),
None => error!(
"Fail to combine stage metrics to plan for stage [{}/{}], \
plan metrics array size {} does not equal stage metrics array size {}",
job_id,
stage_id,
collect_plan_metrics(plan).len(),
stage_metrics.len()
),
}
}

/// Formats the plan for a completed stage with its aggregated metrics.
pub fn format_stage_metrics(
plan: &dyn ExecutionPlan,
stage_metrics: &[MetricsSet],
) -> String {
match merge_stage_metrics(plan, stage_metrics) {
Some(plan_metrics) => DisplayableBallistaExecutionPlan::new(plan, &plan_metrics)
.indent()
.to_string(),
None => displayable(plan).indent(false).to_string(),
}
}

Expand Down
26 changes: 21 additions & 5 deletions dev/bin/showplan.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@
# }
# ```
#
# Job and stages endpoint accept `render_tree=true` parameter which
# renders plans as tree.
# Job and stages endpoint accept `plan_format` optional parameter which
# can render plan as a tree or a stages' metrics (i.e plan_format=tree)
#
# Command actions and options
#
Expand All @@ -133,6 +133,7 @@
# (from stages info; combinable with other mode flags)
# -w displays result with no word wrap, horizontal scroll)
# -t display plans as tree render (where applicable)
# -m display stage metrics (if available)
#
# Multiple mode flags can be combined and each will be printed in order.
# Example: -p -l prints physical plan followed by logical plan.
Expand All @@ -146,6 +147,7 @@ DISPLAY_MODE=() # modes accumulate via flags; defaults to (physical) if
STAGE_ID=""
USE_PAGER=false
RENDER_TREE=false
RENDER_METRICS=false

usage() {
echo "Usage: $(basename "$0") [job_id] [-a address] [-p] [-l] [-e] [-s stage_id]"
Expand All @@ -157,6 +159,7 @@ usage() {
echo " -s [STAGE_ID] display stage_plan for the given stage, or all stages if omitted (combinable)"
echo " -w displays result with no word wrap (horizontal scroll)"
echo " -t display plans as tree render (where applicable)"
echp " -m display stage metrics (if available)"
echo ""
echo " Mode flags are cumulative — each selected mode is printed in order."
echo " Example: $(basename "$0") -p -l prints physical plan then logical plan."
Expand Down Expand Up @@ -208,6 +211,10 @@ while [[ $# -gt 0 ]]; do
RENDER_TREE=true
shift
;;
-m)
RENDER_METRICS=true
shift
;;
*)
echo "Unknown option: $1"
usage
Expand All @@ -220,6 +227,12 @@ if [[ ${#DISPLAY_MODE[@]} -eq 0 ]]; then
DISPLAY_MODE=("physical")
fi

# Checking for mutually exclusive params
if [[ "$RENDER_TREE" == "true" && "$RENDER_METRICS" == "true" ]]; then
echo "Error: -t and -m are mutually exclusive" >&2
usage
fi

# Check dependencies
for cmd in curl jq tput; do
if ! command -v "$cmd" &> /dev/null; then
Expand All @@ -240,10 +253,13 @@ if [[ -z "$JOB_ID" ]]; then
fi
fi

# Build job info and stages URLs (optionally with render_tree query param)
# Build job info and stages URLs (optionally with plan_format query param)
if [[ "$RENDER_TREE" == "true" ]]; then
JOB_URL="${API_BASE}api/job/${JOB_ID}?render_tree=true"
STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages?render_tree=true"
JOB_URL="${API_BASE}api/job/${JOB_ID}?plan_format=tree"
STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages?plan_format=tree"
elif [[ "$RENDER_METRICS" == "true" ]]; then
JOB_URL="${API_BASE}api/job/${JOB_ID}?plan_format=metrics"
STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages?plan_format=metrics"
else
JOB_URL="${API_BASE}api/job/${JOB_ID}"
STAGES_URL="${API_BASE}api/job/${JOB_ID}/stages"
Expand Down
Loading