diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs index bfc3cd951d21..e178ea18bb43 100644 --- a/datafusion/src/physical_plan/display.rs +++ b/datafusion/src/physical_plan/display.rs @@ -87,4 +87,9 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { self.indent += 1; Ok(true) } + + fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result { + self.indent -= 1; + Ok(true) + } } diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index eb50661b42e6..17e0f13609a3 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2989,3 +2989,52 @@ async fn test_physical_plan_display_indent() { expected, actual ); } + +#[tokio::test] +async fn test_physical_plan_display_indent_multi_children() { + // Hard code concurrency as it appears in the RepartitionExec output + let config = ExecutionConfig::new().with_concurrency(3); + let mut ctx = ExecutionContext::with_config(config); + // ensure indenting works for nodes with multiple children + register_aggregate_csv(&mut ctx).unwrap(); + let sql = "SELECT c1 \ + FROM (select c1 from aggregate_test_100)\ + JOIN\ + (select c1 as c2 from aggregate_test_100)\ + ON c1=c2\ + "; + + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + + let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let expected = vec![ + "ProjectionExec: expr=[c1]", + " CoalesceBatchesExec: target_batch_size=4096", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(\"c1\", \"c2\")]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\" }], 3)", + " ProjectionExec: expr=[c1]", + " RepartitionExec: partitioning=RoundRobinBatch(3)", + " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"c2\" }], 3)", + " ProjectionExec: expr=[c1 as c2]", + " RepartitionExec: partitioning=RoundRobinBatch(3)", + " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", + ]; + + let data_path = arrow::util::test_util::arrow_test_data(); + let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) + .trim() + .lines() + // normalize paths + .map(|s| s.replace(&data_path, "ARROW_TEST_DATA")) + .collect::>(); + + assert_eq!( + expected, actual, + "expected:\n{:#?}\nactual:\n\n{:#?}\n", + expected, actual + ); +}