Skip to content

Commit

Permalink
Implement hash partitioned aggregation (#320)
Browse files Browse the repository at this point in the history
* Implement hash partitioned aggregation

* Ballista

* Make configurable and use configured concurrency

* WIP

* Add some hash types

* Fmt

* Disable repartition aggregations in ballista

* fmt

* Clippy, ballista

* Fix test

* Revert test ode

* Update datafusion/src/physical_plan/hash_aggregate.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Add info about required child partitioning

* Add test

* Test fix

* Set concurrency

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
Dandandan and alamb committed May 16, 2021
1 parent 1c50371 commit ed92673
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 53 deletions.
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ message ProjectionExecNode {
enum AggregateMode {
PARTIAL = 0;
FINAL = 1;
FINAL_PARTITIONED = 2;
}

message HashAggregateExecNode {
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let agg_mode: AggregateMode = match mode {
protobuf::AggregateMode::Partial => AggregateMode::Partial,
protobuf::AggregateMode::Final => AggregateMode::Final,
protobuf::AggregateMode::FinalPartitioned => {
AggregateMode::FinalPartitioned
}
};

let group = hash_agg
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
let agg_mode = match exec.mode() {
AggregateMode::Partial => protobuf::AggregateMode::Partial,
AggregateMode::Final => protobuf::AggregateMode::Final,
AggregateMode::FinalPartitioned => {
protobuf::AggregateMode::FinalPartitioned
}
};
let input_schema = exec.input_schema();
let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?;
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ pub fn create_datafusion_context() -> ExecutionContext {
let config = ExecutionConfig::new()
.with_concurrency(1)
.with_repartition_joins(false)
.with_repartition_aggregations(false)
.with_physical_optimizer_rules(rules);
ExecutionContext::with_config(config)
}
7 changes: 2 additions & 5 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl DistributedPlanner {
//TODO should insert query stages in more generic way based on partitioning metadata
// and not specifically for this operator
match agg.mode() {
AggregateMode::Final => {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
let new_stage = create_query_stage(
Expand Down Expand Up @@ -237,10 +237,9 @@ mod test {
use ballista_core::serde::protobuf;
use ballista_core::utils::format_plan;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
use std::convert::TryInto;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -278,11 +277,9 @@ mod test {
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
CsvExec: testdata/lineitem; partitions=2
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
MergeExec
UnresolvedShuffleExec: stages=[1]
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ pub const TPCH_TABLES: &[&str] = &[
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddMergeExec::new()),
Arc::new(CoalesceBatches::new()),
];
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
let config = ExecutionConfig::new()
.with_physical_optimizer_rules(rules)
.with_repartition_aggregations(false);
let mut ctx = ExecutionContext::with_config(config);

for table in TPCH_TABLES {
Expand Down
19 changes: 9 additions & 10 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `concurrency` level
pub repartition_joins: bool,
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
}

impl ExecutionConfig {
Expand Down Expand Up @@ -663,6 +666,7 @@ impl ExecutionConfig {
create_default_catalog_and_schema: true,
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
}
}

Expand Down Expand Up @@ -746,6 +750,11 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}
/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down Expand Up @@ -1351,7 +1360,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped() -> Result<()> {
let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand All @@ -1371,7 +1379,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped_avg() -> Result<()> {
let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand All @@ -1392,7 +1399,6 @@ mod tests {
async fn boolean_literal() -> Result<()> {
let results =
execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+------+",
Expand All @@ -1414,7 +1420,6 @@ mod tests {
async fn aggregate_grouped_empty() -> Result<()> {
let results =
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec!["++", "||", "++", "++"];
assert_batches_sorted_eq!(expected, &results);
Expand All @@ -1425,7 +1430,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped_max() -> Result<()> {
let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand All @@ -1445,7 +1449,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped_min() -> Result<()> {
let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand Down Expand Up @@ -1629,7 +1632,6 @@ mod tests {
#[tokio::test]
async fn count_aggregated() -> Result<()> {
let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+-----------+",
Expand Down Expand Up @@ -1681,7 +1683,6 @@ mod tests {
&mut ctx,
"SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)",
).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+---------------------+---------+",
Expand Down Expand Up @@ -1925,7 +1926,6 @@ mod tests {
];

let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
Expand All @@ -1952,7 +1952,6 @@ mod tests {
];

let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_optimizer/merge_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl PhysicalOptimizerRule for AddMergeExec {
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::HashPartitioned(_) => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
children
.iter()
Expand Down
5 changes: 4 additions & 1 deletion datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ fn optimize_concurrency(
.map(|child| {
optimize_concurrency(
concurrency,
plan.required_child_distribution() == Distribution::SinglePartition,
matches!(
plan.required_child_distribution(),
Distribution::SinglePartition
),
child.clone(),
)
})
Expand Down
22 changes: 17 additions & 5 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ pub enum AggregateMode {
Partial,
/// Final aggregate that produces a single partition of output
Final,
/// Final aggregate that works on pre-partitioned data.
///
/// This requires the invariant that all rows with a particular
/// grouping key are in the same partitions, such as is the case
/// with Hash repartitioning on the group keys. If a group key is
/// duplicated, duplicate groups would be produced
FinalPartitioned,
}

/// Hash aggregate execution plan
Expand Down Expand Up @@ -123,7 +130,7 @@ fn create_schema(
fields.extend(expr.state_fields()?.iter().cloned())
}
}
AggregateMode::Final => {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
// in final mode, the field with the final result of the accumulator
for expr in aggr_expr {
fields.push(expr.field()?)
Expand Down Expand Up @@ -204,6 +211,9 @@ impl ExecutionPlan for HashAggregateExec {
fn required_child_distribution(&self) -> Distribution {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
self.group_expr.iter().map(|x| x.0.clone()).collect(),
),
AggregateMode::Final => Distribution::SinglePartition,
}
}
Expand Down Expand Up @@ -454,7 +464,7 @@ fn group_aggregate_batch(
})
.try_for_each(|(accumulator, values)| match mode {
AggregateMode::Partial => accumulator.update_batch(&values),
AggregateMode::Final => {
AggregateMode::FinalPartitioned | AggregateMode::Final => {
// note: the aggregation here is over states, not values, thus the merge
accumulator.merge_batch(&values)
}
Expand Down Expand Up @@ -807,7 +817,7 @@ fn aggregate_expressions(
Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
}
// in this mode, we build the merge expressions of the aggregation
AggregateMode::Final => Ok(aggr_expr
AggregateMode::Final | AggregateMode::FinalPartitioned => Ok(aggr_expr
.iter()
.map(|agg| merge_expressions(agg))
.collect::<Result<Vec<_>>>()?),
Expand Down Expand Up @@ -901,7 +911,9 @@ fn aggregate_batch(
// 1.3
match mode {
AggregateMode::Partial => accum.update_batch(values),
AggregateMode::Final => accum.merge_batch(values),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
accum.merge_batch(values)
}
}
})
}
Expand Down Expand Up @@ -1074,7 +1086,7 @@ fn finalize_aggregation(
.collect::<Result<Vec<_>>>()?;
Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
}
AggregateMode::Final => {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
// merge the state to the final value
accumulators
.iter()
Expand Down
Loading

0 comments on commit ed92673

Please sign in to comment.