From c90bfc9f5a510ce010616a43955be9ce8749ab32 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 11 May 2021 23:24:50 +0200 Subject: [PATCH 01/16] Implement hash partitioned aggregation --- datafusion/src/execution/context.rs | 10 ----- .../src/physical_plan/hash_aggregate.rs | 18 +++++--- datafusion/src/physical_plan/planner.rs | 45 +++++++++++++------ .../src/physical_plan/unicode_expressions.rs | 8 +--- 4 files changed, 45 insertions(+), 36 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index b53f7c15e3aa..518961f50098 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1305,7 +1305,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped() -> Result<()> { let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1325,7 +1324,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped_avg() -> Result<()> { let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1346,7 +1344,6 @@ mod tests { async fn boolean_literal() -> Result<()> { let results = execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+------+", @@ -1368,7 +1365,6 @@ mod tests { async fn aggregate_grouped_empty() -> Result<()> { let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec!["++", "||", "++", "++"]; assert_batches_sorted_eq!(expected, &results); @@ -1379,7 +1375,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped_max() -> Result<()> { let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1399,7 +1394,6 @@ mod tests { #[tokio::test] async fn aggregate_grouped_min() -> Result<()> { let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+---------+", @@ -1583,7 +1577,6 @@ mod tests { #[tokio::test] async fn count_aggregated() -> Result<()> { let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+----+-----------+", @@ -1635,7 +1628,6 @@ mod tests { &mut ctx, "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)" ).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+---------------------+---------+", @@ -1879,7 +1871,6 @@ mod tests { ]; let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; - assert_eq!(results.len(), 1); let expected = vec! [ @@ -1907,7 +1898,6 @@ mod tests { ]; let results = run_count_distinct_integers_aggregated_scenario(partitions).await?; - assert_eq!(results.len(), 1); let expected = vec![ "+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+", diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index fad4fa585034..5f78e57e9e1c 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -76,6 +76,8 @@ pub enum AggregateMode { Partial, /// Final aggregate that produces a single partition of output Final, + /// Final aggregate that works on pre-partitioned data + FinalPartitioned, } /// Hash aggregate execution plan @@ -121,7 +123,7 @@ fn create_schema( fields.extend(expr.state_fields()?.iter().cloned()) } } - AggregateMode::Final => { + AggregateMode::Final | AggregateMode::FinalPartitioned => { // in final mode, the field with the final result of the accumulator for expr in aggr_expr { fields.push(expr.field()?) @@ -201,7 +203,9 @@ impl ExecutionPlan for HashAggregateExec { fn required_child_distribution(&self) -> Distribution { match &self.mode { - AggregateMode::Partial => Distribution::UnspecifiedDistribution, + AggregateMode::Partial | AggregateMode::FinalPartitioned => { + Distribution::UnspecifiedDistribution + } AggregateMode::Final => Distribution::SinglePartition, } } @@ -419,7 +423,7 @@ fn group_aggregate_batch( }) .try_for_each(|(accumulator, values)| match mode { AggregateMode::Partial => accumulator.update_batch(&values), - AggregateMode::Final => { + AggregateMode::FinalPartitioned | AggregateMode::Final => { // note: the aggregation here is over states, not values, thus the merge accumulator.merge_batch(&values) } @@ -772,7 +776,7 @@ fn aggregate_expressions( Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect()) } // in this mode, we build the merge expressions of the aggregation - AggregateMode::Final => Ok(aggr_expr + AggregateMode::Final | AggregateMode::FinalPartitioned => Ok(aggr_expr .iter() .map(|agg| merge_expressions(agg)) .collect::>>()?), @@ -866,7 +870,9 @@ fn aggregate_batch( // 1.3 match mode { AggregateMode::Partial => accum.update_batch(values), - AggregateMode::Final => accum.merge_batch(values), + AggregateMode::Final | AggregateMode::FinalPartitioned => { + accum.merge_batch(values) + } } }) } @@ -1039,7 +1045,7 @@ fn finalize_aggregation( .collect::>>()?; Ok(a.iter().flatten().cloned().collect::>()) } - AggregateMode::Final => { + AggregateMode::Final | AggregateMode::FinalPartitioned => { // merge the state to the final value accumulators .iter() diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index acbb863c604b..46c715a27358 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -184,19 +184,38 @@ impl DefaultPhysicalPlanner { let final_group: Vec> = (0..groups.len()).map(|i| col(&groups[i].1)).collect(); - // construct a second aggregation, keeping the final column name equal to the first aggregation - // and the expressions corresponding to the respective aggregate - Ok(Arc::new(HashAggregateExec::try_new( - AggregateMode::Final, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - initial_aggr, - input_schema, - )?)) + if groups.len() > 0 { + let hash_repartition = Arc::new(RepartitionExec::try_new( + initial_aggr, + Partitioning::Hash(final_group.clone(), 40), + )?); + Ok(Arc::new(HashAggregateExec::try_new( + AggregateMode::FinalPartitioned, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + hash_repartition, + input_schema, + )?)) + } else { + // construct a second aggregation, keeping the final column name equal to the first aggregation + // and the expressions corresponding to the respective aggregate + + Ok(Arc::new(HashAggregateExec::try_new( + AggregateMode::Final, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + initial_aggr, + input_schema, + )?)) + } } LogicalPlan::Projection { input, expr, .. } => { let input_exec = self.create_initial_plan(input, ctx_state)?; diff --git a/datafusion/src/physical_plan/unicode_expressions.rs b/datafusion/src/physical_plan/unicode_expressions.rs index 787ea7ea2673..d451cfa74283 100644 --- a/datafusion/src/physical_plan/unicode_expressions.rs +++ b/datafusion/src/physical_plan/unicode_expressions.rs @@ -26,12 +26,7 @@ use std::cmp::Ordering; use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use arrow::{ - array::{ - ArrayRef, GenericStringArray, Int64Array, PrimitiveArray, StringOffsetSizeTrait, - }, - datatypes::{ArrowNativeType, ArrowPrimitiveType}, -}; +use arrow::{array::{ArrayRef, GenericStringArray, Int64Array, PrimitiveArray, StringArray, StringOffsetSizeTrait}, datatypes::{ArrowNativeType, ArrowPrimitiveType}}; use hashbrown::HashMap; use unicode_segmentation::UnicodeSegmentation; @@ -93,7 +88,6 @@ where pub fn left(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let n_array = downcast_arg!(args[1], "n", Int64Array); - let result = string_array .iter() .zip(n_array.iter()) From 3dc2cf4bc66f40ba9c622d4fe130bebbf2722ba4 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 11 May 2021 23:34:39 +0200 Subject: [PATCH 02/16] Ballista --- ballista/rust/core/proto/ballista.proto | 1 + ballista/rust/core/src/serde/physical_plan/from_proto.rs | 3 +++ ballista/rust/core/src/serde/physical_plan/to_proto.rs | 3 +++ 3 files changed, 7 insertions(+) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 07419d09b7a9..3da0e85437d7 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -396,6 +396,7 @@ message ProjectionExecNode { enum AggregateMode { PARTIAL = 0; FINAL = 1; + FINAL_PARTITIONED = 2; } message HashAggregateExecNode { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 6a33c6a43f62..7d943ae846e8 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -199,6 +199,9 @@ impl TryInto> for &protobuf::PhysicalPlanNode { let agg_mode: AggregateMode = match mode { protobuf::AggregateMode::Partial => AggregateMode::Partial, protobuf::AggregateMode::Final => AggregateMode::Final, + protobuf::AggregateMode::FinalPartitioned => { + AggregateMode::FinalPartitioned + } }; let group = hash_agg diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 8a5fd71083f7..9571f3de2e76 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -172,6 +172,9 @@ impl TryInto for Arc { let agg_mode = match exec.mode() { AggregateMode::Partial => protobuf::AggregateMode::Partial, AggregateMode::Final => protobuf::AggregateMode::Final, + AggregateMode::FinalPartitioned => { + protobuf::AggregateMode::FinalPartitioned + } }; let input_schema = exec.input_schema(); let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; From 381df9c3299d91103eb0c013e887c6190c4cd5a2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 11 May 2021 23:45:19 +0200 Subject: [PATCH 03/16] Make configurable and use configured concurrency --- ballista/rust/scheduler/src/planner.rs | 2 +- datafusion/src/execution/context.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 20dd0d36d9ab..fdf4852a38de 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -128,7 +128,7 @@ impl DistributedPlanner { //TODO should insert query stages in more generic way based on partitioning metadata // and not specifically for this operator match agg.mode() { - AggregateMode::Final => { + AggregateMode::Final | AggregateMode::FinalPartitioned => { let mut new_children: Vec> = vec![]; for child in &children { let new_stage = create_query_stage( diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 518961f50098..7a2026865f37 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -628,6 +628,9 @@ pub struct ExecutionConfig { /// Should DataFusion repartition data using the join keys to execute joins in parallel /// using the provided `concurrency` level pub repartition_joins: bool, + /// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel + /// using the provided `concurrency` level + pub repartition_aggregates: bool, } impl ExecutionConfig { @@ -655,6 +658,7 @@ impl ExecutionConfig { create_default_catalog_and_schema: true, information_schema: false, repartition_joins: true, + repartition_aggregates: true, } } From 9dd926f2a93d75c4265fa038af350d338a440086 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 00:03:45 +0200 Subject: [PATCH 04/16] WIP --- datafusion/src/physical_plan/planner.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 46c715a27358..5f0858041fc9 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -45,7 +45,7 @@ use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalP use crate::prelude::JoinType; use crate::scalar::ScalarValue; use crate::variable::VarType; -use arrow::compute::can_cast_types; +use arrow::{compute::can_cast_types, datatypes::DataType}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; @@ -184,11 +184,27 @@ impl DefaultPhysicalPlanner { let final_group: Vec> = (0..groups.len()).map(|i| col(&groups[i].1)).collect(); - if groups.len() > 0 { + // TODO: dictionary type not yet supported in Hash Repartition + let contains_dict = groups + .iter() + .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) + .any(|x| matches!(x, DataType::Dictionary(_, _))); + + if groups.len() > 0 + && ctx_state.config.concurrency > 1 + && ctx_state.config.repartition_aggregates + && !contains_dict + { + // Divide partial hash aggregates into multiple partitions by hash key let hash_repartition = Arc::new(RepartitionExec::try_new( initial_aggr, - Partitioning::Hash(final_group.clone(), 40), + Partitioning::Hash( + final_group.clone(), + ctx_state.config.concurrency, + ), )?); + + // Combine hashaggregates within the partition Ok(Arc::new(HashAggregateExec::try_new( AggregateMode::FinalPartitioned, final_group From 268eb1e9d3276a571e2129c6e3056aedf2068b65 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 00:21:26 +0200 Subject: [PATCH 05/16] Add some hash types --- datafusion/src/physical_plan/hash_join.rs | 98 +++++++++++++++++-- .../src/physical_plan/unicode_expressions.rs | 7 +- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 2682623d374a..28180dba289f 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -21,15 +21,7 @@ use ahash::CallHasher; use ahash::RandomState; -use arrow::{ - array::{ - ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray, - TimestampMicrosecondArray, TimestampNanosecondArray, UInt32BufferBuilder, - UInt32Builder, UInt64BufferBuilder, UInt64Builder, - }, - compute, - datatypes::{TimeUnit, UInt32Type, UInt64Type}, -}; +use arrow::{array::{ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, LargeStringArray, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder}, compute, datatypes::{TimeUnit, UInt32Type, UInt64Type}}; use smallvec::{smallvec, SmallVec}; use std::{any::Any, usize}; use std::{hash::Hasher, sync::Arc}; @@ -843,6 +835,44 @@ macro_rules! hash_array_primitive { }; } +macro_rules! hash_array_float { + ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + let values = array.values(); + + if array.null_count() == 0 { + if $multi_col { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = combine_hashes($ty::get_hash(&value.to_le_bytes(), $random_state), *hash); + } + } else { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = $ty::get_hash(&value.to_le_bytes(), $random_state) + } + } + } else { + if $multi_col { + for (i, (hash, value)) in + $hashes.iter_mut().zip(values.iter()).enumerate() + { + if !array.is_null(i) { + *hash = + combine_hashes($ty::get_hash(&value.to_le_bytes(), $random_state), *hash); + } + } + } else { + for (i, (hash, value)) in + $hashes.iter_mut().zip(values.iter()).enumerate() + { + if !array.is_null(i) { + *hash = $ty::get_hash(&value.to_le_bytes(), $random_state); + } + } + } + } + }; +} + /// Creates hash values for every element in the row based on the values in the columns pub fn create_hashes<'a>( arrays: &[ArrayRef], @@ -934,6 +964,36 @@ pub fn create_hashes<'a>( multi_col ); } + DataType::Float32 => { + hash_array_float!( + Float32Array, + col, + u32, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Float64 => { + hash_array_float!( + Float64Array, + col, + u64, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Timestamp(TimeUnit::Millisecond, None) => { + hash_array_primitive!( + TimestampMillisecondArray, + col, + i64, + hashes_buffer, + random_state, + multi_col + ); + } DataType::Timestamp(TimeUnit::Microsecond, None) => { hash_array_primitive!( TimestampMicrosecondArray, @@ -954,6 +1014,26 @@ pub fn create_hashes<'a>( multi_col ); } + DataType::Date32 => { + hash_array_primitive!( + Date32Array, + col, + i32, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Date64 => { + hash_array_primitive!( + Date64Array, + col, + i64, + hashes_buffer, + random_state, + multi_col + ); + } DataType::Boolean => { hash_array!( BooleanArray, diff --git a/datafusion/src/physical_plan/unicode_expressions.rs b/datafusion/src/physical_plan/unicode_expressions.rs index d451cfa74283..3852fd7c931f 100644 --- a/datafusion/src/physical_plan/unicode_expressions.rs +++ b/datafusion/src/physical_plan/unicode_expressions.rs @@ -26,7 +26,12 @@ use std::cmp::Ordering; use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use arrow::{array::{ArrayRef, GenericStringArray, Int64Array, PrimitiveArray, StringArray, StringOffsetSizeTrait}, datatypes::{ArrowNativeType, ArrowPrimitiveType}}; +use arrow::{ + array::{ + ArrayRef, GenericStringArray, Int64Array, PrimitiveArray, StringOffsetSizeTrait, + }, + datatypes::{ArrowNativeType, ArrowPrimitiveType}, +}; use hashbrown::HashMap; use unicode_segmentation::UnicodeSegmentation; From 358b21bd4a37b556c4b6d4a55761134465d98e74 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 00:23:51 +0200 Subject: [PATCH 06/16] Fmt --- datafusion/src/physical_plan/hash_join.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 28180dba289f..63ce40089635 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -21,7 +21,16 @@ use ahash::CallHasher; use ahash::RandomState; -use arrow::{array::{ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, LargeStringArray, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder}, compute, datatypes::{TimeUnit, UInt32Type, UInt64Type}}; +use arrow::{ + array::{ + ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, + Float64Array, LargeStringArray, PrimitiveArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, UInt32BufferBuilder, + UInt32Builder, UInt64BufferBuilder, UInt64Builder, + }, + compute, + datatypes::{TimeUnit, UInt32Type, UInt64Type}, +}; use smallvec::{smallvec, SmallVec}; use std::{any::Any, usize}; use std::{hash::Hasher, sync::Arc}; @@ -843,7 +852,10 @@ macro_rules! hash_array_float { if array.null_count() == 0 { if $multi_col { for (hash, value) in $hashes.iter_mut().zip(values.iter()) { - *hash = combine_hashes($ty::get_hash(&value.to_le_bytes(), $random_state), *hash); + *hash = combine_hashes( + $ty::get_hash(&value.to_le_bytes(), $random_state), + *hash, + ); } } else { for (hash, value) in $hashes.iter_mut().zip(values.iter()) { @@ -856,8 +868,10 @@ macro_rules! hash_array_float { $hashes.iter_mut().zip(values.iter()).enumerate() { if !array.is_null(i) { - *hash = - combine_hashes($ty::get_hash(&value.to_le_bytes(), $random_state), *hash); + *hash = combine_hashes( + $ty::get_hash(&value.to_le_bytes(), $random_state), + *hash, + ); } } } else { From 6824f1b1a2afce7f105681226e56da3eb3aae22b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 00:42:00 +0200 Subject: [PATCH 07/16] Disable repartition aggregations in ballista --- ballista/rust/core/src/utils.rs | 1 + datafusion/src/execution/context.rs | 10 ++++++++-- datafusion/src/physical_plan/planner.rs | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index ee9c9557e789..55541d5fd014 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -322,6 +322,7 @@ pub fn create_datafusion_context() -> ExecutionContext { let config = ExecutionConfig::new() .with_concurrency(1) .with_repartition_joins(false) + .with_repartition_aggregations(false) .with_physical_optimizer_rules(rules); ExecutionContext::with_config(config) } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 7a2026865f37..236d2613abe9 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -630,7 +630,7 @@ pub struct ExecutionConfig { pub repartition_joins: bool, /// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel /// using the provided `concurrency` level - pub repartition_aggregates: bool, + pub repartition_aggregations: bool, } impl ExecutionConfig { @@ -658,7 +658,7 @@ impl ExecutionConfig { create_default_catalog_and_schema: true, information_schema: false, repartition_joins: true, - repartition_aggregates: true, + repartition_aggregations: true, } } @@ -742,6 +742,12 @@ impl ExecutionConfig { self.repartition_joins = enabled; self } + /// Enables or disables the use of repartitioning for aggregations to improve parallelism + pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { + self.repartition_aggregations = enabled; + self + } + } /// Execution context for registering data sources and executing queries diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 5f0858041fc9..8add2011c952 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -192,7 +192,7 @@ impl DefaultPhysicalPlanner { if groups.len() > 0 && ctx_state.config.concurrency > 1 - && ctx_state.config.repartition_aggregates + && ctx_state.config.repartition_aggregations && !contains_dict { // Divide partial hash aggregates into multiple partitions by hash key From 207dbf415a6add88266a85d491fa369788cae1e4 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 00:56:23 +0200 Subject: [PATCH 08/16] fmt --- datafusion/src/execution/context.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 236d2613abe9..006aefb2c78d 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -742,12 +742,11 @@ impl ExecutionConfig { self.repartition_joins = enabled; self } - /// Enables or disables the use of repartitioning for aggregations to improve parallelism - pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { - self.repartition_aggregations = enabled; - self - } - + /// Enables or disables the use of repartitioning for aggregations to improve parallelism + pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { + self.repartition_aggregations = enabled; + self + } } /// Execution context for registering data sources and executing queries From 6ea59c19469894fb33fa169c0f27b61cff6f2ae8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 01:25:15 +0200 Subject: [PATCH 09/16] Clippy, ballista --- ballista/rust/core/src/serde/physical_plan/from_proto.rs | 2 +- datafusion/src/physical_plan/planner.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 7d943ae846e8..e80dc79157e6 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -228,7 +228,7 @@ impl TryInto> for &protobuf::PhysicalPlanNode { scalar_functions: Default::default(), var_provider: Default::default(), aggregate_functions: Default::default(), - config: ExecutionConfig::new(), + config: ExecutionConfig::new().with_repartition_aggregations(false), }; let input_schema = hash_agg diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 8add2011c952..6c797cf8bc21 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -190,7 +190,7 @@ impl DefaultPhysicalPlanner { .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) .any(|x| matches!(x, DataType::Dictionary(_, _))); - if groups.len() > 0 + if !groups.is_empty() && ctx_state.config.concurrency > 1 && ctx_state.config.repartition_aggregations && !contains_dict From a0db9edcd189974c11a95cbe6a04da29ec23ce28 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 01:55:21 +0200 Subject: [PATCH 10/16] Fix test --- ballista/rust/scheduler/src/planner.rs | 55 ++++++++++++----------- ballista/rust/scheduler/src/test_utils.rs | 6 +-- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index fdf4852a38de..65113abdafa8 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -236,11 +236,12 @@ mod test { use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf; use ballista_core::utils::format_plan; - use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::merge::MergeExec; - use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::ExecutionPlan; + use datafusion::physical_plan::{ + hash_aggregate::HashAggregateExec, repartition::RepartitionExec, + }; use std::convert::TryInto; use std::sync::Arc; use uuid::Uuid; @@ -275,38 +276,40 @@ mod test { } /* Expected result: - QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1 - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - CsvExec: testdata/lineitem; partitions=2 - - QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2 - MergeExec - UnresolvedShuffleExec: stages=[1] - - QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3 - SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext - ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - UnresolvedShuffleExec: stages=[2] - */ - - let sort = stages[2].children()[0].clone(); + QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=1 + HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] + CsvExec: testdata/lineitem; partitions=2 + + QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=2 + RepartitionExec { input: UnresolvedShuffleExec { query_stage_ids: [1], schema: Schema { fields: [Field { name: "l_return + UnresolvedShuffleExec: stages=[1] + + QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=3 + ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multiply + HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] + RepartitionExec { input: HashAggregateExec { mode: Partial, group_expr: [(Column { name: "l_returnflag" }, "l_returnflag + HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] + CsvExec: testdata/lineitem; partitions=2 + + QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=4 + SortExec { input: MergeExec { input: UnresolvedShuffleExec { query_stage_ids: [3], schema: Schema { fields: [Field { nam + MergeExec + UnresolvedShuffleExec: stages=[3] + MergeExec { input: UnresolvedShuffleExec { query_stage_ids: [3], ..} */ + + let sort = stages[3].children()[0].clone(); let sort = downcast_exec!(sort, SortExec); let projection = sort.children()[0].clone(); println!("{:?}", projection); - let projection = downcast_exec!(projection, ProjectionExec); + let projection = downcast_exec!(projection, MergeExec); let final_hash = projection.children()[0].clone(); - let final_hash = downcast_exec!(final_hash, HashAggregateExec); - - let unresolved_shuffle = final_hash.children()[0].clone(); - let unresolved_shuffle = - downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); - assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]); + let final_hash = downcast_exec!(final_hash, UnresolvedShuffleExec); + assert_eq!(final_hash.query_stage_ids, vec![3]); let merge_exec = stages[1].children()[0].clone(); - let merge_exec = downcast_exec!(merge_exec, MergeExec); + let merge_exec = downcast_exec!(merge_exec, RepartitionExec); let unresolved_shuffle = merge_exec.children()[0].clone(); let unresolved_shuffle = diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index 330cc9a9332c..bf32943bd876 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -32,10 +32,8 @@ pub const TPCH_TABLES: &[&str] = &[ pub fn datafusion_test_context(path: &str) -> Result { // remove Repartition rule because that isn't supported yet - let rules: Vec> = vec![ - Arc::new(CoalesceBatches::new()), - Arc::new(AddMergeExec::new()), - ]; + let rules: Vec> = + vec![Arc::new(AddMergeExec::new())]; let config = ExecutionConfig::new().with_physical_optimizer_rules(rules); let mut ctx = ExecutionContext::with_config(config); From edf9396be4b4d8a9d30b96db50deb3a06aea1ce9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 07:29:59 +0200 Subject: [PATCH 11/16] Revert test ode --- .../src/serde/physical_plan/from_proto.rs | 2 +- ballista/rust/scheduler/src/planner.rs | 54 +++++++++---------- ballista/rust/scheduler/src/test_utils.rs | 10 ++-- 3 files changed, 32 insertions(+), 34 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index e80dc79157e6..7d943ae846e8 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -228,7 +228,7 @@ impl TryInto> for &protobuf::PhysicalPlanNode { scalar_functions: Default::default(), var_provider: Default::default(), aggregate_functions: Default::default(), - config: ExecutionConfig::new().with_repartition_aggregations(false), + config: ExecutionConfig::new(), }; let input_schema = hash_agg diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 65113abdafa8..b81d7de355ef 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -236,12 +236,10 @@ mod test { use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf; use ballista_core::utils::format_plan; - use datafusion::physical_plan::merge::MergeExec; + use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::ExecutionPlan; - use datafusion::physical_plan::{ - hash_aggregate::HashAggregateExec, repartition::RepartitionExec, - }; + use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec}; use std::convert::TryInto; use std::sync::Arc; use uuid::Uuid; @@ -276,40 +274,36 @@ mod test { } /* Expected result: - QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=1 - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - CsvExec: testdata/lineitem; partitions=2 - - QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=2 - RepartitionExec { input: UnresolvedShuffleExec { query_stage_ids: [1], schema: Schema { fields: [Field { name: "l_return - UnresolvedShuffleExec: stages=[1] - - QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=3 - ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multiply - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - RepartitionExec { input: HashAggregateExec { mode: Partial, group_expr: [(Column { name: "l_returnflag" }, "l_returnflag - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - CsvExec: testdata/lineitem; partitions=2 - - QueryStageExec: job=38b8d8f8-4765-44c3-9d98-ba420d523e13, stage=4 - SortExec { input: MergeExec { input: UnresolvedShuffleExec { query_stage_ids: [3], schema: Schema { fields: [Field { nam - MergeExec - UnresolvedShuffleExec: stages=[3] - MergeExec { input: UnresolvedShuffleExec { query_stage_ids: [3], ..} */ - - let sort = stages[3].children()[0].clone(); + QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1 + HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] + CsvExec: testdata/lineitem; partitions=2 + QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2 + MergeExec + UnresolvedShuffleExec: stages=[1] + QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3 + SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext + ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip + HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] + UnresolvedShuffleExec: stages=[2] + */ + + let sort = stages[2].children()[0].clone(); let sort = downcast_exec!(sort, SortExec); let projection = sort.children()[0].clone(); println!("{:?}", projection); - let projection = downcast_exec!(projection, MergeExec); + let projection = downcast_exec!(projection, ProjectionExec); let final_hash = projection.children()[0].clone(); - let final_hash = downcast_exec!(final_hash, UnresolvedShuffleExec); - assert_eq!(final_hash.query_stage_ids, vec![3]); + let final_hash = downcast_exec!(final_hash, HashAggregateExec); + + let unresolved_shuffle = final_hash.children()[0].clone(); + let unresolved_shuffle = + downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]); let merge_exec = stages[1].children()[0].clone(); - let merge_exec = downcast_exec!(merge_exec, RepartitionExec); + let merge_exec = downcast_exec!(merge_exec, MergeExec); let unresolved_shuffle = merge_exec.children()[0].clone(); let unresolved_shuffle = diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index bf32943bd876..098906050386 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -32,9 +32,13 @@ pub const TPCH_TABLES: &[&str] = &[ pub fn datafusion_test_context(path: &str) -> Result { // remove Repartition rule because that isn't supported yet - let rules: Vec> = - vec![Arc::new(AddMergeExec::new())]; - let config = ExecutionConfig::new().with_physical_optimizer_rules(rules); + let rules: Vec> = vec![ + Arc::new(AddMergeExec::new()), + Arc::new(CoalesceBatches::new()), + ]; + let config = ExecutionConfig::new() + .with_physical_optimizer_rules(rules) + .with_repartition_aggregations(false); let mut ctx = ExecutionContext::with_config(config); for table in TPCH_TABLES { From f25800832cafbdf4db4bb1e80f68d21bdcbf378f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 12 May 2021 12:55:11 +0200 Subject: [PATCH 12/16] Update datafusion/src/physical_plan/hash_aggregate.rs Co-authored-by: Andrew Lamb --- datafusion/src/physical_plan/hash_aggregate.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 5f78e57e9e1c..4ca220a647b3 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -76,7 +76,12 @@ pub enum AggregateMode { Partial, /// Final aggregate that produces a single partition of output Final, - /// Final aggregate that works on pre-partitioned data + /// Final aggregate that works on pre-partitioned data. + /// + /// This requires the invariant that all rows with a particular + /// grouping key are in the same partitions, such as is the case + /// with Hash repartitioning on the group keys. If a group key is + /// duplicated, duplicate groups would be produced FinalPartitioned, } From 96b5a96c7fa0a01d41b19eb39e2fdd89e048b016 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 19:19:22 +0200 Subject: [PATCH 13/16] Add info about required child partitioning --- datafusion/src/physical_optimizer/merge_exec.rs | 1 + datafusion/src/physical_optimizer/repartition.rs | 2 +- datafusion/src/physical_plan/hash_aggregate.rs | 7 ++++--- datafusion/src/physical_plan/mod.rs | 5 ++++- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_optimizer/merge_exec.rs b/datafusion/src/physical_optimizer/merge_exec.rs index 255d1bc24587..877c0be00e1b 100644 --- a/datafusion/src/physical_optimizer/merge_exec.rs +++ b/datafusion/src/physical_optimizer/merge_exec.rs @@ -52,6 +52,7 @@ impl PhysicalOptimizerRule for AddMergeExec { .collect::>>()?; match plan.required_child_distribution() { Distribution::UnspecifiedDistribution => plan.with_new_children(children), + Distribution::HashPartitioned(_) => plan.with_new_children(children), Distribution::SinglePartition => plan.with_new_children( children .iter() diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 82f46f9cbbbb..5251b7e76b30 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -52,7 +52,7 @@ fn optimize_concurrency( .map(|child| { optimize_concurrency( concurrency, - plan.required_child_distribution() == Distribution::SinglePartition, + matches!(plan.required_child_distribution(), Distribution::SinglePartition), child.clone(), ) }) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 5f78e57e9e1c..fc46d3de3498 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -203,9 +203,10 @@ impl ExecutionPlan for HashAggregateExec { fn required_child_distribution(&self) -> Distribution { match &self.mode { - AggregateMode::Partial | AggregateMode::FinalPartitioned => { - Distribution::UnspecifiedDistribution - } + AggregateMode::Partial => Distribution::UnspecifiedDistribution, + AggregateMode::FinalPartitioned => Distribution::HashPartitioned( + self.group_expr.iter().map(|x| x.0.clone()).collect(), + ), AggregateMode::Final => Distribution::SinglePartition, } } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index a8f6f0c35f00..0bb6b2a8a0b6 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -218,12 +218,15 @@ impl Partitioning { } /// Distribution schemes -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum Distribution { /// Unspecified distribution UnspecifiedDistribution, /// A single partition is required SinglePartition, + /// Requires children to be distributed in such a way that the same + /// values of the keys end up in the same partition + HashPartitioned(Vec>), } /// Represents the result from an expression From e13e3458f0be1b0d5ae8946fa4d83580568d0fde Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 12 May 2021 19:45:55 +0200 Subject: [PATCH 14/16] Add test --- .../src/physical_optimizer/repartition.rs | 5 ++++- .../src/physical_plan/hash_aggregate.rs | 2 +- datafusion/src/physical_plan/planner.rs | 22 ++++++++++++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 5251b7e76b30..fee4b3e11e5d 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -52,7 +52,10 @@ fn optimize_concurrency( .map(|child| { optimize_concurrency( concurrency, - matches!(plan.required_child_distribution(), Distribution::SinglePartition), + matches!( + plan.required_child_distribution(), + Distribution::SinglePartition + ), child.clone(), ) }) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index c3b48cf7288c..e218ee4d413e 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -81,7 +81,7 @@ pub enum AggregateMode { /// This requires the invariant that all rows with a particular /// grouping key are in the same partitions, such as is the case /// with Hash repartitioning on the group keys. If a group key is - /// duplicated, duplicate groups would be produced + /// duplicated, duplicate groups would be produced FinalPartitioned, } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 6c797cf8bc21..4273216d705b 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -793,7 +793,7 @@ mod tests { scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), - config: ExecutionConfig::new(), + config: ExecutionConfig::new().with_concurrency(4), } } @@ -1035,6 +1035,26 @@ mod tests { Ok(()) } + #[test] + fn hash_agg_group_by_partitioned() -> Result<()> { + let testdata = arrow::util::test_util::arrow_test_data(); + let path = format!("{}/csv/aggregate_test_100.csv", testdata); + + let options = CsvReadOptions::new().schema_infer_max_records(100); + let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + .aggregate(vec![col("c1")], vec![sum(col("c2"))])? + .build()?; + + let execution_plan = plan(&logical_plan)?; + let formatted = format!("{:?}", execution_plan); + + // Make sure the plan contains a FinalPartitioned, which means it will not use the Final + // mode in HashAggregate (which is slower) + assert!(formatted.contains("FinalPartitioned")); + + Ok(()) + } + /// An example extension node that doesn't do anything struct NoOpExtensionNode { schema: DFSchemaRef, From 48c8b938fb742ed04656066763cabb12b71952c6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 15 May 2021 18:43:59 +0200 Subject: [PATCH 15/16] Test fix --- datafusion/tests/sql.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 0b9cc2ae18b9..6edb75733490 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2950,17 +2950,19 @@ async fn test_physical_plan_display_indent() { let physical_plan = ctx.create_physical_plan(&plan).unwrap(); let expected = vec![ - "GlobalLimitExec: limit=10", - " SortExec: [the_min DESC]", - " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", - " HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]", - " MergeExec", - " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", - " CoalesceBatchesExec: target_batch_size=4096", - " FilterExec: c12 < CAST(10 AS Float64)", - " RepartitionExec: partitioning=RoundRobinBatch(3)", - " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", - ]; + "GlobalLimitExec: limit=10", + " SortExec: [the_min DESC]", + " MergeExec", + " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", + " HashAggregateExec: mode=FinalPartitioned, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\" }], 3)", + " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " CoalesceBatchesExec: target_batch_size=4096", + " FilterExec: c12 < CAST(10 AS Float64)", + " RepartitionExec: partitioning=RoundRobinBatch(3)", + " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", + ]; let data_path = arrow::util::test_util::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) From 2fc12eb301901dfe5193bc6b54b4f7b896da6d7e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 16 May 2021 21:09:18 +0200 Subject: [PATCH 16/16] Set concurrency --- datafusion/src/physical_plan/planner.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index e6f9aaf35a88..9e7dc7172b82 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -796,7 +796,8 @@ mod tests { } fn plan(logical_plan: &LogicalPlan) -> Result> { - let ctx_state = make_ctx_state(); + let mut ctx_state = make_ctx_state(); + ctx_state.config.concurrency = 4; let planner = DefaultPhysicalPlanner::default(); planner.create_physical_plan(logical_plan, &ctx_state) }