From 62cfab293c4e9debc24cee1721f0180b64cdf5d1 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 17 May 2024 08:44:24 +0800 Subject: [PATCH 1/2] move test to slt Signed-off-by: jayzhan211 --- .../aggregate_statistics.rs | 358 ------------------ .../test_files/aggregate_optimization.slt | 82 ++++ 2 files changed, 82 insertions(+), 358 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/aggregate_optimization.slt diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 1a82dac4658c..f6315c6ab880 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -304,361 +304,3 @@ fn take_optimizable_max( } None } - -#[cfg(test)] -pub(crate) mod tests { - - use super::*; - use crate::logical_expr::Operator; - use crate::physical_plan::aggregates::PhysicalGroupBy; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::physical_plan::common; - use crate::physical_plan::expressions::Count; - use crate::physical_plan::filter::FilterExec; - use crate::physical_plan::memory::MemoryExec; - use crate::prelude::SessionContext; - - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use datafusion_common::cast::as_int64_array; - use datafusion_physical_expr::expressions::cast; - use datafusion_physical_expr::PhysicalExpr; - use datafusion_physical_plan::aggregates::AggregateMode; - - /// Mock data using a MemoryExec which has an exact count statistic - fn mock_data() -> Result> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])); - - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(vec![Some(1), Some(2), None])), - Arc::new(Int32Array::from(vec![Some(4), None, Some(6)])), - ], - )?; - - Ok(Arc::new(MemoryExec::try_new( - &[vec![batch]], - Arc::clone(&schema), - None, - )?)) - } - - /// Checks that the count optimization was applied and we still get the right result - async fn assert_count_optim_success( - plan: AggregateExec, - agg: TestAggregate, - ) -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); - let plan: Arc = Arc::new(plan); - - let optimized = AggregateStatistics::new() - .optimize(Arc::clone(&plan), state.config_options())?; - - // A ProjectionExec is a sign that the count optimization was applied - assert!(optimized.as_any().is::()); - - // run both the optimized and nonoptimized plan - let optimized_result = - common::collect(optimized.execute(0, session_ctx.task_ctx())?).await?; - let nonoptimized_result = - common::collect(plan.execute(0, session_ctx.task_ctx())?).await?; - assert_eq!(optimized_result.len(), nonoptimized_result.len()); - - // and validate the results are the same and expected - assert_eq!(optimized_result.len(), 1); - check_batch(optimized_result.into_iter().next().unwrap(), &agg); - // check the non optimized one too to ensure types and names remain the same - assert_eq!(nonoptimized_result.len(), 1); - check_batch(nonoptimized_result.into_iter().next().unwrap(), &agg); - - Ok(()) - } - - fn check_batch(batch: RecordBatch, agg: &TestAggregate) { - let schema = batch.schema(); - let fields = schema.fields(); - assert_eq!(fields.len(), 1); - - let field = &fields[0]; - assert_eq!(field.name(), agg.column_name()); - assert_eq!(field.data_type(), &DataType::Int64); - // note that nullabiolity differs - - assert_eq!( - as_int64_array(batch.column(0)).unwrap().values(), - &[agg.expected_count()] - ); - } - - /// Describe the type of aggregate being tested - pub(crate) enum TestAggregate { - /// Testing COUNT(*) type aggregates - CountStar, - - /// Testing for COUNT(column) aggregate - ColumnA(Arc), - } - - impl TestAggregate { - pub(crate) fn new_count_star() -> Self { - Self::CountStar - } - - fn new_count_column(schema: &Arc) -> Self { - Self::ColumnA(schema.clone()) - } - - /// Return appropriate expr depending if COUNT is for col or table (*) - pub(crate) fn count_expr(&self) -> Arc { - Arc::new(Count::new( - self.column(), - self.column_name(), - DataType::Int64, - )) - } - - /// what argument would this aggregate need in the plan? - fn column(&self) -> Arc { - match self { - Self::CountStar => expressions::lit(COUNT_STAR_EXPANSION), - Self::ColumnA(s) => expressions::col("a", s).unwrap(), - } - } - - /// What name would this aggregate produce in a plan? - fn column_name(&self) -> &'static str { - match self { - Self::CountStar => "COUNT(*)", - Self::ColumnA(_) => "COUNT(a)", - } - } - - /// What is the expected count? - fn expected_count(&self) -> i64 { - match self { - TestAggregate::CountStar => 3, - TestAggregate::ColumnA(_) => 2, - } - } - } - - #[tokio::test] - async fn test_count_partial_direct_child() -> Result<()> { - // basic test case with the aggregation applied on a source with exact statistics - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_with_nulls_direct_child() -> Result<()> { - // basic test case with the aggregation applied on a source with exact statistics - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_indirect_child() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - source, - Arc::clone(&schema), - )?; - - // We introduce an intermediate optimization step between the partial and final aggregtator - let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - Arc::new(coalesce), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_with_nulls_indirect_child() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - source, - Arc::clone(&schema), - )?; - - // We introduce an intermediate optimization step between the partial and final aggregtator - let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - Arc::new(coalesce), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_inexact_stat() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - // adding a filter makes the statistics inexact - let filter = Arc::new(FilterExec::try_new( - expressions::binary( - expressions::col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?, - source, - )?); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - filter, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - let optimized = - AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; - - // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); - - Ok(()) - } - - #[tokio::test] - async fn test_count_with_nulls_inexact_stat() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - // adding a filter makes the statistics inexact - let filter = Arc::new(FilterExec::try_new( - expressions::binary( - expressions::col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?, - source, - )?); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - filter, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![agg.count_expr()], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - let optimized = - AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; - - // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); - - Ok(()) - } -} diff --git a/datafusion/sqllogictest/test_files/aggregate_optimization.slt b/datafusion/sqllogictest/test_files/aggregate_optimization.slt new file mode 100644 index 000000000000..d5dfec1e9493 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregate_optimization.slt @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +## Test for Aggregate function optimization + +# Aggregate statistics + +statement ok +create table t(a int, b int) as values (1, 2), (null, 4), (5, null); + +# A ProjectionExec is a sign that the count optimization was applied +query TT +explain select count(a) from t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[COUNT(t.a)]] +02)--TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[2 as COUNT(t.a)] +02)--PlaceholderRowExec + +query TT +explain select count(*) from t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]] +02)--TableScan: t projection=[] +physical_plan +01)ProjectionExec: expr=[3 as COUNT(*)] +02)--PlaceholderRowExec + +# adding a filter makes the statistics inexact, AggregateExec is not replaced +query TT +explain select count(*) from t where a > 2; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]] +02)--Projection: +03)----Filter: t.a > Int32(2) +04)------TableScan: t projection=[a] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------ProjectionExec: expr=[] +06)----------CoalesceBatchesExec: target_batch_size=8192 +07)------------FilterExec: a@0 > 2 +08)--------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +explain select count(a) from t where a > 2; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[COUNT(t.a)]] +02)--Filter: t.a > Int32(2) +03)----TableScan: t projection=[a] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[COUNT(t.a)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(t.a)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------FilterExec: a@0 > 2 +07)------------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +drop table t; From 770be4d1b55e0f27554b69e7c7e494567053e3df Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 17 May 2024 19:17:37 +0800 Subject: [PATCH 2/2] remove more Signed-off-by: jayzhan211 --- .../combine_partial_final_agg.rs | 279 ------------------ .../limited_distinct_aggregation.rs | 72 +---- .../test_files/aggregate_optimization.slt | 75 +++++ 3 files changed, 76 insertions(+), 350 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index e41e4dd31647..b51855949569 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -194,282 +194,3 @@ fn discard_column_index(group_expr: Arc) -> Arc { - let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); - - // run optimizer - let optimizer = CombinePartialFinalAggregate {}; - let config = ConfigOptions::new(); - let optimized = optimizer.optimize($PLAN, &config)?; - // Now format correctly - let plan = displayable(optimized.as_ref()).indent(true).to_string(); - let actual_lines = trim_plan_display(&plan); - - assert_eq!( - &expected_lines, &actual_lines, - "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", - expected_lines, actual_lines - ); - }; - } - - fn trim_plan_display(plan: &str) -> Vec<&str> { - plan.split('\n') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect() - } - - fn schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ])) - } - - fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, - None, - None, - Default::default(), - )) - } - - fn partial_aggregate_exec( - input: Arc, - group_by: PhysicalGroupBy, - aggr_expr: Vec>, - ) -> Arc { - let schema = input.schema(); - let n_aggr = aggr_expr.len(); - Arc::new( - AggregateExec::try_new( - AggregateMode::Partial, - group_by, - aggr_expr, - vec![None; n_aggr], - input, - schema, - ) - .unwrap(), - ) - } - - fn final_aggregate_exec( - input: Arc, - group_by: PhysicalGroupBy, - aggr_expr: Vec>, - ) -> Arc { - let schema = input.schema(); - let n_aggr = aggr_expr.len(); - Arc::new( - AggregateExec::try_new( - AggregateMode::Final, - group_by, - aggr_expr, - vec![None; n_aggr], - input, - schema, - ) - .unwrap(), - ) - } - - fn repartition_exec(input: Arc) -> Arc { - Arc::new( - RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap(), - ) - } - - #[test] - fn aggregations_not_combined() -> Result<()> { - let schema = schema(); - - let aggr_expr = vec![Arc::new(Count::new( - lit(1i8), - "COUNT(1)".to_string(), - DataType::Int64, - )) as _]; - let plan = final_aggregate_exec( - repartition_exec(partial_aggregate_exec( - parquet_exec(&schema), - PhysicalGroupBy::default(), - aggr_expr.clone(), - )), - PhysicalGroupBy::default(), - aggr_expr, - ); - // should not combine the Partial/Final AggregateExecs - let expected = &[ - "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", - ]; - assert_optimized!(expected, plan); - - let aggr_expr1 = vec![Arc::new(Count::new( - lit(1i8), - "COUNT(1)".to_string(), - DataType::Int64, - )) as _]; - let aggr_expr2 = vec![Arc::new(Count::new( - lit(1i8), - "COUNT(2)".to_string(), - DataType::Int64, - )) as _]; - - let plan = final_aggregate_exec( - partial_aggregate_exec( - parquet_exec(&schema), - PhysicalGroupBy::default(), - aggr_expr1, - ), - PhysicalGroupBy::default(), - aggr_expr2, - ); - // should not combine the Partial/Final AggregateExecs - let expected = &[ - "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]", - "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", - ]; - - assert_optimized!(expected, plan); - - Ok(()) - } - - #[test] - fn aggregations_combined() -> Result<()> { - let schema = schema(); - let aggr_expr = vec![Arc::new(Count::new( - lit(1i8), - "COUNT(1)".to_string(), - DataType::Int64, - )) as _]; - - let plan = final_aggregate_exec( - partial_aggregate_exec( - parquet_exec(&schema), - PhysicalGroupBy::default(), - aggr_expr.clone(), - ), - PhysicalGroupBy::default(), - aggr_expr, - ); - // should combine the Partial/Final AggregateExecs to tne Single AggregateExec - let expected = &[ - "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", - ]; - - assert_optimized!(expected, plan); - Ok(()) - } - - #[test] - fn aggregations_with_group_combined() -> Result<()> { - let schema = schema(); - let aggr_expr = vec![Arc::new(Sum::new( - col("b", &schema)?, - "Sum(b)".to_string(), - DataType::Int64, - )) as _]; - - let groups: Vec<(Arc, String)> = - vec![(col("c", &schema)?, "c".to_string())]; - - let partial_group_by = PhysicalGroupBy::new_single(groups); - let partial_agg = partial_aggregate_exec( - parquet_exec(&schema), - partial_group_by, - aggr_expr.clone(), - ); - - let groups: Vec<(Arc, String)> = - vec![(col("c", &partial_agg.schema())?, "c".to_string())]; - let final_group_by = PhysicalGroupBy::new_single(groups); - - let plan = final_aggregate_exec(partial_agg, final_group_by, aggr_expr); - // should combine the Partial/Final AggregateExecs to tne Single AggregateExec - let expected = &[ - "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", - ]; - - assert_optimized!(expected, plan); - Ok(()) - } - - #[test] - fn aggregations_with_limit_combined() -> Result<()> { - let schema = schema(); - let aggr_expr = vec![]; - - let groups: Vec<(Arc, String)> = - vec![(col("c", &schema)?, "c".to_string())]; - - let partial_group_by = PhysicalGroupBy::new_single(groups); - let partial_agg = partial_aggregate_exec( - parquet_exec(&schema), - partial_group_by, - aggr_expr.clone(), - ); - - let groups: Vec<(Arc, String)> = - vec![(col("c", &partial_agg.schema())?, "c".to_string())]; - let final_group_by = PhysicalGroupBy::new_single(groups); - - let schema = partial_agg.schema(); - let final_agg = Arc::new( - AggregateExec::try_new( - AggregateMode::Final, - final_group_by, - aggr_expr, - vec![], - partial_agg, - schema, - ) - .unwrap() - .with_limit(Some(5)), - ); - let plan: Arc = final_agg; - // should combine the Partial/Final AggregateExecs to a Single AggregateExec - // with the final limit preserved - let expected = &[ - "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[], lim=[5]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", - ]; - - assert_optimized!(expected, plan); - Ok(()) - } -} diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 950bb3c8eeb2..0abdec49dc6d 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -193,7 +193,6 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { mod tests { use super::*; - use crate::physical_optimizer::aggregate_statistics::tests::TestAggregate; use crate::physical_optimizer::enforce_distribution::tests::{ parquet_exec_with_sort, schema, trim_plan_display, }; @@ -209,8 +208,7 @@ mod tests { use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use datafusion_execution::config::SessionConfig; - use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::{cast, col}; + use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_plan::aggregates::AggregateMode; use datafusion_physical_plan::displayable; @@ -507,74 +505,6 @@ mod tests { Ok(()) } - #[test] - fn test_has_aggregate_expression() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![agg.count_expr()], /* aggr_expr */ - vec![None], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) - } - - #[test] - fn test_has_filter() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec - // the `a > 1` filter is applied in the AggregateExec - let filter_expr = Some(expressions::binary( - expressions::col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?); - let agg = TestAggregate::new_count_star(); - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![agg.count_expr()], /* aggr_expr */ - vec![filter_expr], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) - } - #[test] fn test_has_order_by() -> Result<()> { let sort_key = vec![PhysicalSortExpr { diff --git a/datafusion/sqllogictest/test_files/aggregate_optimization.slt b/datafusion/sqllogictest/test_files/aggregate_optimization.slt index d5dfec1e9493..e1bc300ef921 100644 --- a/datafusion/sqllogictest/test_files/aggregate_optimization.slt +++ b/datafusion/sqllogictest/test_files/aggregate_optimization.slt @@ -78,5 +78,80 @@ physical_plan 06)----------FilterExec: a@0 > 2 07)------------MemoryExec: partitions=1, partition_sizes=[1] +# Limited Distinct Aggregate + +# Limit is not pushed to AggregateExec +query TT +explain select count(*) from t group by a limit 1; +---- +logical_plan +01)Projection: COUNT(*) +02)--Limit: skip=0, fetch=1 +03)----Aggregate: groupBy=[[t.a]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]] +04)------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)] +02)--GlobalLimitExec: skip=0, fetch=1 +03)----CoalescePartitionsExec +04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(*)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(*)] +09)----------------MemoryExec: partitions=1, partition_sizes=[1] + +# filter is applied in the AggregateExec +query TT +explain select count(*) from t where a > 1 group by a limit 1; +---- +logical_plan +01)Projection: COUNT(*) +02)--Limit: skip=0, fetch=1 +03)----Aggregate: groupBy=[[t.a]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]] +04)------Filter: t.a > Int32(1) +05)--------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)] +02)--GlobalLimitExec: skip=0, fetch=1 +03)----CoalescePartitionsExec +04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(*)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(*)] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------CoalesceBatchesExec: target_batch_size=8192 +10)------------------FilterExec: a@0 > 1 +11)--------------------MemoryExec: partitions=1, partition_sizes=[1] + +# Partial / Final not combined + +query TT +explain select count(1) from t group by a; +---- +logical_plan +01)Projection: COUNT(Int64(1)) +02)--Aggregate: groupBy=[[t.a]], aggr=[[COUNT(Int64(1))]] +03)----TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[COUNT(Int64(1))@1 as COUNT(Int64(1))] +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(Int64(1))] +07)------------MemoryExec: partitions=1, partition_sizes=[1] + +# Aggregate Combined + +query TT +explain select count(1), count(2) from t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1)), COUNT(Int64(2))]] +02)--TableScan: t projection=[] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[COUNT(Int64(1)), COUNT(Int64(2))] +02)--MemoryExec: partitions=1, partition_sizes=[1] + statement ok drop table t;