diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 07419d09b7a9..3da0e85437d7 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -396,6 +396,7 @@ message ProjectionExecNode { enum AggregateMode { PARTIAL = 0; FINAL = 1; + FINAL_PARTITIONED = 2; } message HashAggregateExecNode { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 9c35c9d88941..97f03948f7bd 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -201,6 +201,9 @@ impl TryInto> for &protobuf::PhysicalPlanNode { let agg_mode: AggregateMode = match mode { protobuf::AggregateMode::Partial => AggregateMode::Partial, protobuf::AggregateMode::Final => AggregateMode::Final, + protobuf::AggregateMode::FinalPartitioned => { + AggregateMode::FinalPartitioned + } }; let group = hash_agg diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 8a5fd71083f7..9571f3de2e76 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -172,6 +172,9 @@ impl TryInto for Arc { let agg_mode = match exec.mode() { AggregateMode::Partial => protobuf::AggregateMode::Partial, AggregateMode::Final => protobuf::AggregateMode::Final, + AggregateMode::FinalPartitioned => { + protobuf::AggregateMode::FinalPartitioned + } }; let input_schema = exec.input_schema(); let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index ee9c9557e789..55541d5fd014 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -322,6 +322,7 @@ pub fn create_datafusion_context() -> ExecutionContext { let config = ExecutionConfig::new() .with_concurrency(1) .with_repartition_joins(false) + .with_repartition_aggregations(false) .with_physical_optimizer_rules(rules); ExecutionContext::with_config(config) } diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 20dd0d36d9ab..b81d7de355ef 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -128,7 +128,7 @@ impl DistributedPlanner { //TODO should insert query stages in more generic way based on partitioning metadata // and not specifically for this operator match agg.mode() { - AggregateMode::Final => { + AggregateMode::Final | AggregateMode::FinalPartitioned => { let mut new_children: Vec> = vec![]; for child in &children { let new_stage = create_query_stage( @@ -237,10 +237,9 @@ mod test { use ballista_core::serde::protobuf; use ballista_core::utils::format_plan; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; - use datafusion::physical_plan::merge::MergeExec; - use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::ExecutionPlan; + use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec}; use std::convert::TryInto; use std::sync::Arc; use uuid::Uuid; @@ -278,11 +277,9 @@ mod test { QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1 HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] CsvExec: testdata/lineitem; partitions=2 - QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2 MergeExec UnresolvedShuffleExec: stages=[1] - QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3 SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index 330cc9a9332c..098906050386 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -33,10 +33,12 @@ pub const TPCH_TABLES: &[&str] = &[ pub fn datafusion_test_context(path: &str) -> Result { // remove Repartition rule because that isn't supported yet let rules: Vec> = vec![ - Arc::new(CoalesceBatches::new()), Arc::new(AddMergeExec::new()), + Arc::new(CoalesceBatches::new()), ]; - let config = ExecutionConfig::new().with_physical_optimizer_rules(rules); + let config = ExecutionConfig::new() + .with_physical_optimizer_rules(rules) + .with_repartition_aggregations(false); let mut ctx = ExecutionContext::with_config(config); for table in TPCH_TABLES { diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 9c7a6217d7d9..272e75acba6f 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -636,6 +636,9 @@ pub struct ExecutionConfig { /// Should DataFusion repartition data using the join keys to execute joins in parallel /// using the provided `concurrency` level pub repartition_joins: bool, + /// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel + /// using the provided `concurrency` level + pub repartition_aggregations: bool, } impl ExecutionConfig { @@ -663,6 +666,7 @@ impl ExecutionConfig { create_default_catalog_and_schema: true, information_schema: false, repartition_joins: true, + repartition_aggregations: true, } } @@ -746,6 +750,11 @@ impl ExecutionConfig { self.repartition_joins = enabled; self } + /// Enables or disables the use of repartitioning for aggregations to improve parallelism + pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { + self.repartition_aggregations = enabled; + self + } } /// Holds per-execution properties and data (such as starting timestamps, etc). @@ -1351,7 +1360,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped() -> Result<()> { let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1371,7 +1379,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped_avg() -> Result<()> { let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1392,7 +1399,6 @@ mod tests { async fn boolean_literal() -> Result<()> { let results = execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+------+", @@ -1414,7 +1420,6 @@ mod tests { async fn aggregate_grouped_empty() -> Result<()> { let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec!["++", "||", "++", "++"]; assert_batches_sorted_eq!(expected, &results); @@ -1425,7 +1430,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped_max() -> Result<()> { let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1445,7 +1449,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped_min() -> Result<()> { let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1629,7 +1632,6 @@ mod tests { #[tokio::test] async fn count_aggregated() -> Result<()> { let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+-----------+", @@ -1681,7 +1683,6 @@ mod tests { &mut ctx, "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)", ).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+---------------------+---------+", @@ -1925,7 +1926,6 @@ mod tests { ]; let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+", @@ -1952,7 +1952,6 @@ mod tests { ]; let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+", diff --git a/datafusion/src/physical_optimizer/merge_exec.rs b/datafusion/src/physical_optimizer/merge_exec.rs index 255d1bc24587..877c0be00e1b 100644 --- a/datafusion/src/physical_optimizer/merge_exec.rs +++ b/datafusion/src/physical_optimizer/merge_exec.rs @@ -52,6 +52,7 @@ impl PhysicalOptimizerRule for AddMergeExec { .collect::>>()?; match plan.required_child_distribution() { Distribution::UnspecifiedDistribution => plan.with_new_children(children), + Distribution::HashPartitioned(_) => plan.with_new_children(children), Distribution::SinglePartition => plan.with_new_children( children .iter() diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 82f46f9cbbbb..fee4b3e11e5d 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -52,7 +52,10 @@ fn optimize_concurrency( .map(|child| { optimize_concurrency( concurrency, - plan.required_child_distribution() == Distribution::SinglePartition, + matches!( + plan.required_child_distribution(), + Distribution::SinglePartition + ), child.clone(), ) }) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 3059e2f746ce..0a822dc898af 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -78,6 +78,13 @@ pub enum AggregateMode { Partial, /// Final aggregate that produces a single partition of output Final, + /// Final aggregate that works on pre-partitioned data. + /// + /// This requires the invariant that all rows with a particular + /// grouping key are in the same partitions, such as is the case + /// with Hash repartitioning on the group keys. If a group key is + /// duplicated, duplicate groups would be produced + FinalPartitioned, } /// Hash aggregate execution plan @@ -123,7 +130,7 @@ fn create_schema( fields.extend(expr.state_fields()?.iter().cloned()) } } - AggregateMode::Final => { + AggregateMode::Final | AggregateMode::FinalPartitioned => { // in final mode, the field with the final result of the accumulator for expr in aggr_expr { fields.push(expr.field()?) @@ -204,6 +211,9 @@ impl ExecutionPlan for HashAggregateExec { fn required_child_distribution(&self) -> Distribution { match &self.mode { AggregateMode::Partial => Distribution::UnspecifiedDistribution, + AggregateMode::FinalPartitioned => Distribution::HashPartitioned( + self.group_expr.iter().map(|x| x.0.clone()).collect(), + ), AggregateMode::Final => Distribution::SinglePartition, } } @@ -454,7 +464,7 @@ fn group_aggregate_batch( }) .try_for_each(|(accumulator, values)| match mode { AggregateMode::Partial => accumulator.update_batch(&values), - AggregateMode::Final => { + AggregateMode::FinalPartitioned | AggregateMode::Final => { // note: the aggregation here is over states, not values, thus the merge accumulator.merge_batch(&values) } @@ -807,7 +817,7 @@ fn aggregate_expressions( Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect()) } // in this mode, we build the merge expressions of the aggregation - AggregateMode::Final => Ok(aggr_expr + AggregateMode::Final | AggregateMode::FinalPartitioned => Ok(aggr_expr .iter() .map(|agg| merge_expressions(agg)) .collect::>>()?), @@ -901,7 +911,9 @@ fn aggregate_batch( // 1.3 match mode { AggregateMode::Partial => accum.update_batch(values), - AggregateMode::Final => accum.merge_batch(values), + AggregateMode::Final | AggregateMode::FinalPartitioned => { + accum.merge_batch(values) + } } }) } @@ -1074,7 +1086,7 @@ fn finalize_aggregation( .collect::>>()?; Ok(a.iter().flatten().cloned().collect::>()) } - AggregateMode::Final => { + AggregateMode::Final | AggregateMode::FinalPartitioned => { // merge the state to the final value accumulators .iter() diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 0bf5a2857fde..01551cd4daf4 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -23,8 +23,9 @@ use ahash::RandomState; use arrow::{ array::{ - ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray, - TimestampMicrosecondArray, TimestampNanosecondArray, UInt32BufferBuilder, + ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, + Float64Array, LargeStringArray, PrimitiveArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, }, compute, @@ -862,6 +863,49 @@ macro_rules! hash_array_primitive { }; } +macro_rules! hash_array_float { + ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + let values = array.values(); + + if array.null_count() == 0 { + if $multi_col { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = combine_hashes( + $ty::get_hash(&value.to_le_bytes(), $random_state), + *hash, + ); + } + } else { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = $ty::get_hash(&value.to_le_bytes(), $random_state) + } + } + } else { + if $multi_col { + for (i, (hash, value)) in + $hashes.iter_mut().zip(values.iter()).enumerate() + { + if !array.is_null(i) { + *hash = combine_hashes( + $ty::get_hash(&value.to_le_bytes(), $random_state), + *hash, + ); + } + } + } else { + for (i, (hash, value)) in + $hashes.iter_mut().zip(values.iter()).enumerate() + { + if !array.is_null(i) { + *hash = $ty::get_hash(&value.to_le_bytes(), $random_state); + } + } + } + } + }; +} + /// Creates hash values for every element in the row based on the values in the columns pub fn create_hashes<'a>( arrays: &[ArrayRef], @@ -953,6 +997,36 @@ pub fn create_hashes<'a>( multi_col ); } + DataType::Float32 => { + hash_array_float!( + Float32Array, + col, + u32, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Float64 => { + hash_array_float!( + Float64Array, + col, + u64, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Timestamp(TimeUnit::Millisecond, None) => { + hash_array_primitive!( + TimestampMillisecondArray, + col, + i64, + hashes_buffer, + random_state, + multi_col + ); + } DataType::Timestamp(TimeUnit::Microsecond, None) => { hash_array_primitive!( TimestampMicrosecondArray, @@ -973,6 +1047,26 @@ pub fn create_hashes<'a>( multi_col ); } + DataType::Date32 => { + hash_array_primitive!( + Date32Array, + col, + i32, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Date64 => { + hash_array_primitive!( + Date64Array, + col, + i64, + hashes_buffer, + random_state, + multi_col + ); + } DataType::Boolean => { hash_array!( BooleanArray, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 6ab9570790e7..e915b2c257dd 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -359,12 +359,15 @@ impl Partitioning { } /// Distribution schemes -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum Distribution { /// Unspecified distribution UnspecifiedDistribution, /// A single partition is required SinglePartition, + /// Requires children to be distributed in such a way that the same + /// values of the keys end up in the same partition + HashPartitioned(Vec>), } /// Represents the result from an expression diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index d11e8e93d199..9e7dc7172b82 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -48,7 +48,7 @@ use crate::{ error::{DataFusionError, Result}, physical_plan::displayable, }; -use arrow::compute::can_cast_types; +use arrow::{compute::can_cast_types, datatypes::DataType}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; @@ -187,19 +187,54 @@ impl DefaultPhysicalPlanner { let final_group: Vec> = (0..groups.len()).map(|i| col(&groups[i].1)).collect(); - // construct a second aggregation, keeping the final column name equal to the first aggregation - // and the expressions corresponding to the respective aggregate - Ok(Arc::new(HashAggregateExec::try_new( - AggregateMode::Final, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - initial_aggr, - input_schema, - )?)) + // TODO: dictionary type not yet supported in Hash Repartition + let contains_dict = groups + .iter() + .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) + .any(|x| matches!(x, DataType::Dictionary(_, _))); + + if !groups.is_empty() + && ctx_state.config.concurrency > 1 + && ctx_state.config.repartition_aggregations + && !contains_dict + { + // Divide partial hash aggregates into multiple partitions by hash key + let hash_repartition = Arc::new(RepartitionExec::try_new( + initial_aggr, + Partitioning::Hash( + final_group.clone(), + ctx_state.config.concurrency, + ), + )?); + + // Combine hashaggregates within the partition + Ok(Arc::new(HashAggregateExec::try_new( + AggregateMode::FinalPartitioned, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + hash_repartition, + input_schema, + )?)) + } else { + // construct a second aggregation, keeping the final column name equal to the first aggregation + // and the expressions corresponding to the respective aggregate + + Ok(Arc::new(HashAggregateExec::try_new( + AggregateMode::Final, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + initial_aggr, + input_schema, + )?)) + } } LogicalPlan::Projection { input, expr, .. } => { let input_exec = self.create_initial_plan(input, ctx_state)?; @@ -761,7 +796,8 @@ mod tests { } fn plan(logical_plan: &LogicalPlan) -> Result> { - let ctx_state = make_ctx_state(); + let mut ctx_state = make_ctx_state(); + ctx_state.config.concurrency = 4; let planner = DefaultPhysicalPlanner::default(); planner.create_physical_plan(logical_plan, &ctx_state) } @@ -998,6 +1034,26 @@ mod tests { Ok(()) } + #[test] + fn hash_agg_group_by_partitioned() -> Result<()> { + let testdata = arrow::util::test_util::arrow_test_data(); + let path = format!("{}/csv/aggregate_test_100.csv", testdata); + + let options = CsvReadOptions::new().schema_infer_max_records(100); + let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + .aggregate(vec![col("c1")], vec![sum(col("c2"))])? + .build()?; + + let execution_plan = plan(&logical_plan)?; + let formatted = format!("{:?}", execution_plan); + + // Make sure the plan contains a FinalPartitioned, which means it will not use the Final + // mode in HashAggregate (which is slower) + assert!(formatted.contains("FinalPartitioned")); + + Ok(()) + } + /// An example extension node that doesn't do anything struct NoOpExtensionNode { schema: DFSchemaRef, diff --git a/datafusion/src/physical_plan/unicode_expressions.rs b/datafusion/src/physical_plan/unicode_expressions.rs index 787ea7ea2673..3852fd7c931f 100644 --- a/datafusion/src/physical_plan/unicode_expressions.rs +++ b/datafusion/src/physical_plan/unicode_expressions.rs @@ -93,7 +93,6 @@ where pub fn left(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let n_array = downcast_arg!(args[1], "n", Int64Array); - let result = string_array .iter() .zip(n_array.iter()) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 0b9cc2ae18b9..6edb75733490 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2950,17 +2950,19 @@ async fn test_physical_plan_display_indent() { let physical_plan = ctx.create_physical_plan(&plan).unwrap(); let expected = vec![ - "GlobalLimitExec: limit=10", - " SortExec: [the_min DESC]", - " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", - " HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]", - " MergeExec", - " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", - " CoalesceBatchesExec: target_batch_size=4096", - " FilterExec: c12 < CAST(10 AS Float64)", - " RepartitionExec: partitioning=RoundRobinBatch(3)", - " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", - ]; + "GlobalLimitExec: limit=10", + " SortExec: [the_min DESC]", + " MergeExec", + " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", + " HashAggregateExec: mode=FinalPartitioned, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\" }], 3)", + " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " CoalesceBatchesExec: target_batch_size=4096", + " FilterExec: c12 < CAST(10 AS Float64)", + " RepartitionExec: partitioning=RoundRobinBatch(3)", + " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", + ]; let data_path = arrow::util::test_util::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent())