diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 212db653f713..a633cacd0eb1 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -909,12 +909,16 @@ config_namespace! { /// into the file scan phase. pub enable_join_dynamic_filter_pushdown: bool, default = true - /// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase. + /// When set to true, the optimizer will attempt to push down Aggregate dynamic filters + /// into the file scan phase. + pub enable_aggregate_dynamic_filter_pushdown: bool, default = true + + /// When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. /// This means that if we already have 10 timestamps in the year 2025 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. - /// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown` + /// The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. pub enable_dynamic_filter_pushdown: bool, default = true @@ -1265,6 +1269,7 @@ impl ConfigOptions { self.optimizer.enable_dynamic_filter_pushdown = bool_value; self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value; self.optimizer.enable_join_dynamic_filter_pushdown = bool_value; + self.optimizer.enable_aggregate_dynamic_filter_pushdown = bool_value; } return Ok(()); } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 0903194b15a9..c498ac5b56d2 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -41,9 +41,13 @@ use datafusion_datasource::{ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::ScalarUDF; use datafusion_functions::math::random::RandomFunc; -use datafusion_functions_aggregate::count::count_udaf; +use datafusion_functions_aggregate::{ + count::count_udaf, + min_max::{max_udaf, min_udaf}, +}; use datafusion_physical_expr::{ - aggregate::AggregateExprBuilder, Partitioning, ScalarFunctionExpr, + aggregate::{AggregateExprBuilder, AggregateFunctionExpr}, + Partitioning, ScalarFunctionExpr, }; use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::{ @@ -63,6 +67,7 @@ use datafusion_physical_plan::{ use datafusion_physical_plan::union::UnionExec; use futures::StreamExt; use object_store::{memory::InMemory, ObjectStore}; +use regex::Regex; use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder}; use crate::physical_optimizer::filter_pushdown::util::TestSource; @@ -1889,6 +1894,427 @@ fn col_lit_predicate( )) } +// ==== Aggregate Dynamic Filter tests ==== + +// ---- Test Utilities ---- +struct AggregateDynFilterCase<'a> { + schema: SchemaRef, + batches: Vec, + aggr_exprs: Vec, + expected_before: Option<&'a str>, + expected_after: Option<&'a str>, + scan_support: bool, +} + +async fn run_aggregate_dyn_filter_case(case: AggregateDynFilterCase<'_>) { + let AggregateDynFilterCase { + schema, + batches, + aggr_exprs, + expected_before, + expected_after, + scan_support, + } = case; + + let scan = TestScanBuilder::new(Arc::clone(&schema)) + .with_support(scan_support) + .with_batches(batches) + .build(); + + let aggr_exprs: Vec<_> = aggr_exprs + .into_iter() + .map(|expr| Arc::new(expr) as Arc) + .collect(); + let aggr_len = aggr_exprs.len(); + + let plan: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![]), + aggr_exprs, + vec![None; aggr_len], + scan, + Arc::clone(&schema), + ) + .unwrap(), + ); + + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let optimized = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + let before = format_plan_for_test(&optimized); + if let Some(expected) = expected_before { + assert!( + before.contains(expected), + "expected `{expected}` before execution, got: {before}" + ); + } else { + assert!( + !before.contains("DynamicFilter ["), + "dynamic filter unexpectedly present before execution: {before}" + ); + } + + let session_ctx = SessionContext::new(); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let task_ctx = session_ctx.state().task_ctx(); + let mut stream = optimized.execute(0, Arc::clone(&task_ctx)).unwrap(); + let _ = stream.next().await.transpose().unwrap(); + + let after = format_plan_for_test(&optimized); + if let Some(expected) = expected_after { + assert!( + after.contains(expected), + "expected `{expected}` after execution, got: {after}" + ); + } else { + assert!( + !after.contains("DynamicFilter ["), + "dynamic filter unexpectedly present after execution: {after}" + ); + } +} + +// ---- Test Cases ---- +// Cases covered below: +// 1. `min(a)` and `max(a)` baseline. +// 2. Unsupported expression input (`min(a+1)`). +// 3. Multiple supported columns (same column vs different columns). +// 4. Mixed supported + unsupported aggregates. +// 5. Entirely NULL input to surface current bound behavior. +// 6. End-to-end tests on parquet files + +/// `MIN(a)`: able to pushdown dynamic filter +#[tokio::test] +async fn test_aggregate_dynamic_filter_min_simple() { + // Single min(a) showcases the base case. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let batches = vec![record_batch!(("a", Int32, [5, 1, 3, 8])).unwrap()]; + + let min_expr = + AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("min_a") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![min_expr], + expected_before: Some("DynamicFilter [ empty ]"), + expected_after: Some("DynamicFilter [ a@0 < 1 ]"), + scan_support: true, + }) + .await; +} + +/// `MAX(a)`: able to pushdown dynamic filter +#[tokio::test] +async fn test_aggregate_dynamic_filter_max_simple() { + // Single max(a) mirrors the base case on the upper bound. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let batches = vec![record_batch!(("a", Int32, [5, 1, 3, 8])).unwrap()]; + + let max_expr = + AggregateExprBuilder::new(max_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("max_a") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![max_expr], + expected_before: Some("DynamicFilter [ empty ]"), + expected_after: Some("DynamicFilter [ a@0 > 8 ]"), + scan_support: true, + }) + .await; +} + +/// `MIN(a+1)`: Can't pushdown dynamic filter +#[tokio::test] +async fn test_aggregate_dynamic_filter_min_expression_not_supported() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let batches = vec![record_batch!(("a", Int32, [5, 1, 3, 8])).unwrap()]; + + let expr: Arc = Arc::new(BinaryExpr::new( + col("a", &schema).unwrap(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let min_expr = AggregateExprBuilder::new(min_udaf(), vec![expr]) + .schema(Arc::clone(&schema)) + .alias("min_a_plus_one") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![min_expr], + expected_before: None, + expected_after: None, + scan_support: true, + }) + .await; +} + +/// `MIN(a), MAX(a)`: Pushdown dynamic filter like `(a<1) or (a>8)` +#[tokio::test] +async fn test_aggregate_dynamic_filter_min_max_same_column() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let batches = vec![record_batch!(("a", Int32, [5, 1, 3, 8])).unwrap()]; + + let min_expr = + AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("min_a") + .build() + .unwrap(); + let max_expr = + AggregateExprBuilder::new(max_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("max_a") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![min_expr, max_expr], + expected_before: Some("DynamicFilter [ empty ]"), + expected_after: Some("DynamicFilter [ a@0 < 1 OR a@0 > 8 ]"), + scan_support: true, + }) + .await; +} + +/// `MIN(a), MAX(b)`: Pushdown dynamic filter like `(a<1) or (b>9)` +#[tokio::test] +async fn test_aggregate_dynamic_filter_min_max_different_columns() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + let batches = + vec![ + record_batch!(("a", Int32, [5, 1, 3, 8]), ("b", Int32, [7, 2, 4, 9])) + .unwrap(), + ]; + + let min_expr = + AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("min_a") + .build() + .unwrap(); + let max_expr = + AggregateExprBuilder::new(max_udaf(), vec![col("b", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("max_b") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![min_expr, max_expr], + expected_before: Some("DynamicFilter [ empty ]"), + expected_after: Some("DynamicFilter [ a@0 < 1 OR b@1 > 9 ]"), + scan_support: true, + }) + .await; +} + +/// Mix of supported/unsupported aggregates retains only the valid ones. +/// `MIN(a), MAX(a), MAX(b), MIN(c+1)`: Pushdown dynamic filter like `(a<1) or (a>8) OR (b>12)` +#[tokio::test] +async fn test_aggregate_dynamic_filter_multiple_mixed_expressions() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ])); + let batches = vec![record_batch!( + ("a", Int32, [5, 1, 3, 8]), + ("b", Int32, [10, 4, 6, 12]), + ("c", Int32, [100, 70, 90, 110]) + ) + .unwrap()]; + + let min_a = AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("min_a") + .build() + .unwrap(); + let max_a = AggregateExprBuilder::new(max_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("max_a") + .build() + .unwrap(); + let max_b = AggregateExprBuilder::new(max_udaf(), vec![col("b", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("max_b") + .build() + .unwrap(); + let expr_c: Arc = Arc::new(BinaryExpr::new( + col("c", &schema).unwrap(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let min_c_expr = AggregateExprBuilder::new(min_udaf(), vec![expr_c]) + .schema(Arc::clone(&schema)) + .alias("min_c_plus_one") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![min_a, max_a, max_b, min_c_expr], + expected_before: Some("DynamicFilter [ empty ]"), + expected_after: Some("DynamicFilter [ a@0 < 1 OR a@0 > 8 OR b@1 > 12 ]"), + scan_support: true, + }) + .await; +} + +/// Don't tighten the dynamic filter if all inputs are null +#[tokio::test] +async fn test_aggregate_dynamic_filter_min_all_nulls() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let batches = vec![record_batch!(("a", Int32, [None, None, None, None])).unwrap()]; + + let min_expr = + AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("min_a") + .build() + .unwrap(); + + run_aggregate_dyn_filter_case(AggregateDynFilterCase { + schema, + batches, + aggr_exprs: vec![min_expr], + expected_before: Some("DynamicFilter [ empty ]"), + // After reading the input it hasn't a meaningful bound to update, so the + // predicate `true` means don't filter out anything + expected_after: Some("DynamicFilter [ true ]"), + scan_support: true, + }) + .await; +} + +/// Test aggregate dynamic filter is working when reading parquet files +/// +/// Runs 'select max(id) from test_table where id > 1', and ensure some file ranges +/// pruned by the dynamic filter. +#[tokio::test] +async fn test_aggregate_dynamic_filter_parquet_e2e() { + let config = SessionConfig::new() + .with_collect_statistics(true) + .with_target_partitions(2) + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true) + .set_bool("datafusion.execution.parquet.pushdown_filters", true); + let ctx = SessionContext::new_with_config(config); + + let data_path = format!( + "{}/tests/data/test_statistics_per_partition/", + env!("CARGO_MANIFEST_DIR") + ); + + ctx.register_parquet("test_table", &data_path, ParquetReadOptions::default()) + .await + .unwrap(); + + // partition 1: + // files: ..03-01(id=4), ..03-02(id=3) + // partition 1: + // files: ..03-03(id=2), ..03-04(id=1) + // + // In partition 1, after reading the first file, the dynamic filter will be update + // to "id > 4", so the `..03-02` file must be able to get pruned out + let df = ctx + .sql("explain analyze select max(id) from test_table where id > 1") + .await + .unwrap(); + + let result = df.collect().await.unwrap(); + + let formatted = pretty_format_batches(&result).unwrap(); + let explain_analyze = format!("{formatted}"); + + // Capture "2" from "files_ranges_pruned_statistics=4 total → 2 matched" + let re = Regex::new( + r"files_ranges_pruned_statistics\s*=\s*(\d+)\s*total\s*[→>\-]\s*(\d+)\s*matched", + ) + .unwrap(); + + if let Some(caps) = re.captures(&explain_analyze) { + let matched_num: i32 = caps[2].parse().unwrap(); + assert!( + matched_num < 4, + "Total 4 files, if some pruned, the matched count is < 4" + ); + } else { + unreachable!("metrics should exist") + } +} + +/// Non-partial (Single) aggregates should skip dynamic filter initialization. +#[test] +fn test_aggregate_dynamic_filter_not_created_for_single_mode() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let batches = vec![record_batch!(("a", Int32, [5, 1, 3, 8])).unwrap()]; + + let scan = TestScanBuilder::new(Arc::clone(&schema)) + .with_support(true) + .with_batches(batches) + .build(); + + let min_expr = + AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema).unwrap()]) + .schema(Arc::clone(&schema)) + .alias("min_a") + .build() + .unwrap(); + + let plan: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new_single(vec![]), + vec![min_expr.into()], + vec![None], + scan, + Arc::clone(&schema), + ) + .unwrap(), + ); + + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let optimized = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + let formatted = format_plan_for_test(&optimized); + assert!( + !formatted.contains("DynamicFilter ["), + "dynamic filter should not be created for AggregateMode::Single: {formatted}" + ); +} + #[tokio::test] async fn test_aggregate_filter_pushdown() { // Test that filters can pass through AggregateExec even with aggregate functions diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5fafce0bea16..12ede9df61c8 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -27,7 +27,8 @@ use crate::aggregates::{ }; use crate::execution_plan::{CardinalityEffect, EmissionType}; use crate::filter_pushdown::{ - ChildFilterDescription, FilterDescription, FilterPushdownPhase, PushedDownPredicate, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PushedDownPredicate, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::get_ordered_partition_by_indices; @@ -37,6 +38,7 @@ use crate::{ }; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::utils::collect_columns; +use parking_lot::Mutex; use std::collections::HashSet; use arrow::array::{ArrayRef, UInt16Array, UInt32Array, UInt64Array, UInt8Array}; @@ -46,13 +48,13 @@ use arrow_schema::FieldRef; use datafusion_common::stats::Precision; use datafusion_common::{ assert_eq_or_internal_err, not_impl_err, Constraint, Constraints, DataFusionError, - Result, + Result, ScalarValue, }; use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::expressions::{lit, Column, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::{ physical_exprs_contains, ConstExpr, EquivalenceProperties, }; @@ -393,6 +395,88 @@ impl From for SendableRecordBatchStream { } } +/// # Aggregate Dynamic Filter Pushdown Overview +/// +/// For queries like +/// -- `example_table(type TEXT, val INT)` +/// SELECT min(val) +/// FROM example_table +/// WHERE type='A'; +/// +/// And `example_table`'s physical representation is a partitioned parquet file with +/// column statistics +/// - part-0.parquet: val {min=0, max=100} +/// - part-1.parquet: val {min=100, max=200} +/// - ... +/// - part-100.parquet: val {min=10000, max=10100} +/// +/// After scanning the 1st file, we know we only have to read files if their minimal +/// value on `val` column is less than 0, the minimal `val` value in the 1st file. +/// +/// We can skip scanning the remaining file by implementing dynamic filter, the +/// intuition is we keep a shared data structure for current min in both `AggregateExec` +/// and `DataSourceExec`, and let it update during execution, so the scanner can +/// know during execution if it's possible to skip scanning certain files. See +/// physical optimizer rule `FilterPushdown` for details. +/// +/// # Implementation +/// +/// ## Enable Condition +/// - No grouping (no `GROUP BY` clause in the sql, only a single global group to aggregate) +/// - The aggregate expression must be `min`/`max`, and evaluate directly on columns. +/// Note multiple aggregate expressions that satisfy this requirement are allowed, +/// and a dynamic filter will be constructed combining all applicable expr's +/// states. See more in the following example with dynamic filter on multiple columns. +/// +/// ## Filter Construction +/// The filter is kept in the `DataSourceExec`, and it will gets update during execution, +/// the reader will interpret it as "the upstream only needs rows that such filter +/// predicate is evaluated to true", and certain scanner implementation like `parquet` +/// can evalaute column statistics on those dynamic filters, to decide if they can +/// prune a whole range. +/// +/// ### Examples +/// - Expr: `min(a)`, Dynamic Filter: `a < a_cur_min` +/// - Expr: `min(a), max(a), min(b)`, Dynamic Filter: `(a < a_cur_min) OR (a > a_cur_max) OR (b < b_cur_min)` +#[derive(Debug, Clone)] +struct AggrDynFilter { + /// The physical expr for the dynamic filter shared between the `AggregateExec` + /// and the parquet scanner. + filter: Arc, + /// The current bounds for the dynamic filter, updates during the execution to + /// tighten the bound for more effective pruning. + /// + /// Each vector element is for the accumulators that support dynamic filter. + /// e.g. This `AggregateExec` has accumulator: + /// min(a), avg(a), max(b) + /// And this field stores [PerAccumulatorDynFilter(min(a)), PerAccumulatorDynFilter(min(b))] + supported_accumulators_info: Vec, +} + +// ---- Aggregate Dynamic Filter Utility Structs ---- + +/// Aggregate expressions that support the dynamic filter pushdown in aggregation. +/// See comments in [`AggrDynFilter`] for conditions. +#[derive(Debug, Clone)] +struct PerAccumulatorDynFilter { + aggr_type: DynamicFilterAggregateType, + /// During planning and optimization, the parent structure is kept in `AggregateExec`, + /// this index is into `aggr_expr` vec inside `AggregateExec`. + /// During execution, the parent struct is moved into `AggregateStream` (stream + /// for no grouping aggregate execution), and this index is into `aggregate_expressions` + /// vec inside `AggregateStreamInner` + aggr_index: usize, + // The current bound. Shared among all streams. + shared_bound: Arc>, +} + +/// Aggregate types that are supported for dynamic filter in `AggregateExec` +#[derive(Debug, Clone)] +enum DynamicFilterAggregateType { + Min, + Max, +} + /// Hash aggregate execution plan #[derive(Debug, Clone)] pub struct AggregateExec { @@ -422,6 +506,13 @@ pub struct AggregateExec { /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, cache: PlanProperties, + /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), + /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. + /// + /// During filter pushdown optimization, if a child node can accept this filter, + /// it remains `Some(..)` to enable dynamic filtering during aggregate execution; + /// otherwise, it is cleared to `None`. + dynamic_filter: Option>, } impl AggregateExec { @@ -446,6 +537,7 @@ impl AggregateExec { input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), + dynamic_filter: self.dynamic_filter.clone(), } } @@ -562,7 +654,7 @@ impl AggregateExec { aggr_expr.as_slice(), )?; - Ok(AggregateExec { + let mut exec = AggregateExec { mode, group_by, aggr_expr, @@ -575,7 +667,12 @@ impl AggregateExec { limit: None, input_order_mode, cache, - }) + dynamic_filter: None, + }; + + exec.init_dynamic_filter(); + + Ok(exec) } /// Aggregation mode (full, partial) @@ -823,6 +920,65 @@ impl AggregateExec { } } } + + /// Check if dynamic filter is possible for the current plan node. + /// - If yes, init one inside `AggregateExec`'s `dynamic_filter` field. + /// - If not supported, `self.dynamic_filter` should be kept `None` + fn init_dynamic_filter(&mut self) { + if (!self.group_by.is_empty()) || (!matches!(self.mode, AggregateMode::Partial)) { + debug_assert!( + self.dynamic_filter.is_none(), + "The current operator node does not support dynamic filter" + ); + return; + } + + // Already initialized. + if self.dynamic_filter.is_some() { + return; + } + + // Collect supported accumulators + // It is assumed the order of aggregate expressions are not changed from `AggregateExec` + // to `AggregateStream` + let mut aggr_dyn_filters = Vec::new(); + // All column references in the dynamic filter, used when initializing the dynamic + // filter, and it's used to decide if this dynamic filter is able to get push + // through certain node during optimization. + let mut all_cols: Vec> = Vec::new(); + for (i, aggr_expr) in self.aggr_expr.iter().enumerate() { + // 1. Only `min` or `max` aggregate function + let fun_name = aggr_expr.fun().name(); + // HACK: Should check the function type more precisely + // Issue: + let aggr_type = if fun_name.eq_ignore_ascii_case("min") { + DynamicFilterAggregateType::Min + } else if fun_name.eq_ignore_ascii_case("max") { + DynamicFilterAggregateType::Max + } else { + continue; + }; + + // 2. arg should be only 1 column reference + if let [arg] = aggr_expr.expressions().as_slice() { + if arg.as_any().is::() { + all_cols.push(Arc::clone(arg)); + aggr_dyn_filters.push(PerAccumulatorDynFilter { + aggr_type, + aggr_index: i, + shared_bound: Arc::new(Mutex::new(ScalarValue::Null)), + }); + } + } + } + + if !aggr_dyn_filters.is_empty() { + self.dynamic_filter = Some(Arc::new(AggrDynFilter { + filter: Arc::new(DynamicFilterPhysicalExpr::new(all_cols, lit(true))), + supported_accumulators_info: aggr_dyn_filters, + })) + } + } } impl DisplayAs for AggregateExec { @@ -1011,6 +1167,7 @@ impl ExecutionPlan for AggregateExec { Arc::clone(&self.schema), )?; me.limit = self.limit; + me.dynamic_filter = self.dynamic_filter.clone(); Ok(Arc::new(me)) } @@ -1041,12 +1198,12 @@ impl ExecutionPlan for AggregateExec { } /// Push down parent filters when possible (see implementation comment for details), - /// but do not introduce any new self filters. + /// and also pushdown self dynamic filters (see `AggrDynFilter` for details) fn gather_filters_for_pushdown( &self, - _phase: FilterPushdownPhase, + phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result { // It's safe to push down filters through aggregates when filters only reference // grouping columns, because such filters determine which groups to compute, not @@ -1119,8 +1276,68 @@ impl ExecutionPlan for AggregateExec { .map(PushedDownPredicate::unsupported), ); + // Include self dynamic filter when it's possible + if matches!(phase, FilterPushdownPhase::Post) + && config.optimizer.enable_aggregate_dynamic_filter_pushdown + { + if let Some(self_dyn_filter) = &self.dynamic_filter { + let dyn_filter = Arc::clone(&self_dyn_filter.filter); + child_desc = child_desc.with_self_filter(dyn_filter); + } + } + Ok(FilterDescription::new().with_child(child_desc)) } + + /// If child accepts self's dynamic filter, keep `self.dynamic_filter` with Some, + /// otherwise clear it to None. + fn handle_child_pushdown_result( + &self, + phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); + + // If this node tried to pushdown some dynamic filter before, now we check + // if the child accept the filter + if matches!(phase, FilterPushdownPhase::Post) && self.dynamic_filter.is_some() { + // let child_accepts_dyn_filter = child_pushdown_result + // .self_filters + // .first() + // .map(|filters| { + // assert_eq_or_internal_err!( + // filters.len(), + // 1, + // "Aggregate only pushdown one self dynamic filter" + // ); + // let filter = filters.get(0).unwrap(); // Asserted above + // Ok(matches!(filter.discriminant, PushedDown::Yes)) + // }) + // .unwrap_or_else(|| internal_err!("The length of self filters equals to the number of child of this ExecutionPlan, so it must be 1"))?; + + // HACK: The above snippet should be used, however, now the child reply + // `PushDown::No` can indicate they're not able to push down row-level + // filter, but still keep the filter for statistics pruning. + // So here, we try to use ref count to determine if the dynamic filter + // has actually be pushed down. + // Issue: + let dyn_filter = self.dynamic_filter.as_ref().unwrap(); + let child_accepts_dyn_filter = Arc::strong_count(dyn_filter) > 1; + + if !child_accepts_dyn_filter { + // Child can't consume the self dynamic filter, so disable it by setting + // to `None` + let mut new_node = self.clone(); + new_node.dynamic_filter = None; + + result = result + .with_updated_node(Arc::new(new_node) as Arc); + } + } + + Ok(result) + } } fn create_schema( diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index fc398427ac1f..4adccd6de797 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -19,17 +19,20 @@ use crate::aggregates::{ aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem, - AggregateMode, + AggrDynFilter, AggregateMode, DynamicFilterAggregateType, }; use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; use datafusion_execution::TaskContext; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{lit, BinaryExpr}; use datafusion_physical_expr::PhysicalExpr; use futures::stream::BoxStream; use std::borrow::Cow; +use std::cmp::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; @@ -53,15 +56,197 @@ pub(crate) struct AggregateStream { /// /// The latter requires a state object, which is [`AggregateStreamInner`]. struct AggregateStreamInner { + // ==== Properties ==== schema: SchemaRef, mode: AggregateMode, input: SendableRecordBatchStream, - baseline_metrics: BaselineMetrics, aggregate_expressions: Vec>>, filter_expressions: Vec>>, + + // ==== Runtime States/Buffers ==== accumulators: Vec, - reservation: MemoryReservation, + // None if the dynamic filter is not applicable. See details in `AggrDynFilter`. + agg_dyn_filter_state: Option>, finished: bool, + + // ==== Execution Resources ==== + baseline_metrics: BaselineMetrics, + reservation: MemoryReservation, +} + +impl AggregateStreamInner { + // TODO: check if we get Null handling correct + /// # Examples + /// - Example 1 + /// Accumulators: min(c1) + /// Current Bounds: min(c1)=10 + /// --> dynamic filter PhysicalExpr: c1 < 10 + /// + /// - Example 2 + /// Accumulators: min(c1), max(c1), min(c2) + /// Current Bounds: min(c1)=10, max(c1)=100, min(c2)=20 + /// --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20) + /// + /// # Errors + /// Returns internal errors if the dynamic filter is not enabled, or other + /// invariant check fails. + fn build_dynamic_filter_from_accumulator_bounds( + &self, + ) -> Result> { + let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else { + return internal_err!("`build_dynamic_filter_from_accumulator_bounds()` is only called when dynamic filter is enabled"); + }; + + let mut predicates: Vec> = + Vec::with_capacity(filter_state.supported_accumulators_info.len()); + + for acc_info in &filter_state.supported_accumulators_info { + // Skip if we don't yet have a meaningful bound + let bound = { + let guard = acc_info.shared_bound.lock(); + if (*guard).is_null() { + continue; + } + guard.clone() + }; + + let agg_exprs = self + .aggregate_expressions + .get(acc_info.aggr_index) + .ok_or_else(|| { + internal_datafusion_err!( + "Invalid aggregate expression index {} for dynamic filter", + acc_info.aggr_index + ) + })?; + // Only aggregates with a single argument are supported. + let column_expr = agg_exprs.first().ok_or_else(|| { + internal_datafusion_err!( + "Aggregate expression at index {} expected a single argument", + acc_info.aggr_index + ) + })?; + + let literal = lit(bound); + let predicate: Arc = match acc_info.aggr_type { + DynamicFilterAggregateType::Min => Arc::new(BinaryExpr::new( + Arc::clone(column_expr), + Operator::Lt, + literal, + )), + DynamicFilterAggregateType::Max => Arc::new(BinaryExpr::new( + Arc::clone(column_expr), + Operator::Gt, + literal, + )), + }; + predicates.push(predicate); + } + + let combined = predicates.into_iter().reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) as Arc + }); + + Ok(combined.unwrap_or_else(|| lit(true))) + } + + // If the dynamic filter is enabled, update it using the current accumulator's + // values + fn maybe_update_dyn_filter(&mut self) -> Result<()> { + // Step 1: Update each partition's current bound + let Some(filter_state) = self.agg_dyn_filter_state.as_ref() else { + return Ok(()); + }; + + for acc_info in &filter_state.supported_accumulators_info { + let acc = + self.accumulators + .get_mut(acc_info.aggr_index) + .ok_or_else(|| { + internal_datafusion_err!( + "Invalid accumulator index {} for dynamic filter", + acc_info.aggr_index + ) + })?; + // First get current partition's bound, then update the shared bound among + // all partitions. + let current_bound = acc.evaluate()?; + { + let mut bound = acc_info.shared_bound.lock(); + match acc_info.aggr_type { + DynamicFilterAggregateType::Max => { + *bound = scalar_max(&bound, ¤t_bound)?; + } + DynamicFilterAggregateType::Min => { + *bound = scalar_min(&bound, ¤t_bound)?; + } + } + } + } + + // Step 2: Sync the dynamic filter physical expression with reader + let predicate = self.build_dynamic_filter_from_accumulator_bounds()?; + filter_state.filter.update(predicate)?; + + Ok(()) + } +} + +/// Returns the element-wise minimum of two `ScalarValue`s. +/// +/// # Null semantics +/// - `min(NULL, NULL) = NULL` +/// - `min(NULL, x) = x` +/// - `min(x, NULL) = x` +/// +/// # Errors +/// Returns internal error if v1 and v2 has incompatible types. +fn scalar_min(v1: &ScalarValue, v2: &ScalarValue) -> Result { + if let Some(result) = scalar_cmp_null_short_circuit(v1, v2) { + return Ok(result); + } + + match v1.partial_cmp(v2) { + Some(Ordering::Less | Ordering::Equal) => Ok(v1.clone()), + Some(Ordering::Greater) => Ok(v2.clone()), + None => datafusion_common::internal_err!( + "cannot compare values of different or incompatible types: {v1:?} vs {v2:?}" + ), + } +} + +/// Returns the element-wise maximum of two `ScalarValue`s. +/// +/// # Null semantics +/// - `max(NULL, NULL) = NULL` +/// - `max(NULL, x) = x` +/// - `max(x, NULL) = x` +/// +/// # Errors +/// Returns internal error if v1 and v2 has incompatible types. +fn scalar_max(v1: &ScalarValue, v2: &ScalarValue) -> Result { + if let Some(result) = scalar_cmp_null_short_circuit(v1, v2) { + return Ok(result); + } + + match v1.partial_cmp(v2) { + Some(Ordering::Greater | Ordering::Equal) => Ok(v1.clone()), + Some(Ordering::Less) => Ok(v2.clone()), + None => datafusion_common::internal_err!( + "cannot compare values of different or incompatible types: {v1:?} vs {v2:?}" + ), + } +} + +fn scalar_cmp_null_short_circuit( + v1: &ScalarValue, + v2: &ScalarValue, +) -> Option { + match (v1, v2) { + (ScalarValue::Null, ScalarValue::Null) => Some(ScalarValue::Null), + (ScalarValue::Null, other) | (other, ScalarValue::Null) => Some(other.clone()), + _ => None, + } } impl AggregateStream { @@ -91,6 +276,24 @@ impl AggregateStream { let reservation = MemoryConsumer::new(format!("AggregateStream[{partition}]")) .register(context.memory_pool()); + // Enable dynamic filter if: + // 1. AggregateExec did the check and ensure it supports the dynamic filter + // (its dynamic_filter field will be Some(..)) + // 2. Aggregate dynamic filter is enabled from the config + let mut maybe_dynamic_filter = match agg.dynamic_filter.as_ref() { + Some(filter) => Some(Arc::clone(filter)), + _ => None, + }; + + if !context + .session_config() + .options() + .optimizer + .enable_aggregate_dynamic_filter_pushdown + { + maybe_dynamic_filter = None; + } + let inner = AggregateStreamInner { schema: Arc::clone(&agg.schema), mode: agg.mode, @@ -101,27 +304,33 @@ impl AggregateStream { accumulators, reservation, finished: false, + agg_dyn_filter_state: maybe_dynamic_filter, }; + let stream = futures::stream::unfold(inner, |mut this| async move { if this.finished { return None; } - let elapsed_compute = this.baseline_metrics.elapsed_compute(); - loop { let result = match this.input.next().await { Some(Ok(batch)) => { - let timer = elapsed_compute.timer(); - let result = aggregate_batch( - &this.mode, - batch, - &mut this.accumulators, - &this.aggregate_expressions, - &this.filter_expressions, - ); + let result = { + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + let _timer = elapsed_compute.timer(); // Stops on drop + aggregate_batch( + &this.mode, + batch, + &mut this.accumulators, + &this.aggregate_expressions, + &this.filter_expressions, + ) + }; - timer.done(); + let result = result.and_then(|allocated| { + this.maybe_update_dyn_filter()?; + Ok(allocated) + }); // allocate memory // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8a05bfcee3ab..b47ae740fab4 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -494,6 +494,7 @@ impl ExecutionPlan for FilterExec { .into_iter() .map(PushedDownPredicate::supported) .collect(); + return Ok(FilterDescription::new().with_child(ChildFilterDescription { parent_filters: filter_supports, self_filters: vec![], diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index e5cd6d88b08f..5b30599c4103 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -18,7 +18,8 @@ # Tests for dynamic filter pushdown configuration options # - enable_topk_dynamic_filter_pushdown (for TopK dynamic filters) # - enable_join_dynamic_filter_pushdown (for Join dynamic filters) -# - enable_dynamic_filter_pushdown (controls both) +# - enable_aggregate_dynamic_filter_pushdown (for Aggregate dynamic filters) +# - enable_dynamic_filter_pushdown (controls all three) # Setup: Create parquet test files statement ok @@ -213,7 +214,80 @@ physical_plan 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet -# Test 4: Backward compatibility +# Test 4: Aggregate dynamic filter pushdown + +# Prepare aggregate-specific parquet data without statistics so aggregate statistics optimizer +# doesn't pre-compute results. +statement ok +CREATE TABLE agg_source(category VARCHAR, score INT) AS VALUES +('alpha', 10), +('alpha', 25), +('beta', 5), +('beta', 12), +('gamma', 42), +('gamma', 8); + +statement ok +SET datafusion.execution.parquet.statistics_enabled = 'none'; + +statement ok +COPY agg_source TO 'test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet' STORED AS PARQUET; + +statement ok +SET datafusion.execution.parquet.statistics_enabled = 'page'; + +statement ok +CREATE EXTERNAL TABLE agg_parquet(category VARCHAR, score INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Aggregate dynamic filter should be pushed into the scan when enabled +# Expecting a `DynamicFilter` inside parquet scanner's predicate +query TT +EXPLAIN SELECT MAX(score) FROM agg_parquet WHERE category = 'alpha' +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[max(agg_parquet.score)]] +02)--Projection: agg_parquet.score +03)----Filter: agg_parquet.category = Utf8View("alpha") +04)------TableScan: agg_parquet projection=[category, score], partial_filters=[agg_parquet.category = Utf8View("alpha")] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_parquet.score)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_parquet.score)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha AND DynamicFilter [ empty ], pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] + +# Disable aggregate dynamic filters only +statement ok +SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = false; + +# Expecting no `DynamicFilter` inside parquet scanner's predicate +query TT +EXPLAIN SELECT MAX(score) FROM agg_parquet WHERE category = 'alpha' +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[max(agg_parquet.score)]] +02)--Projection: agg_parquet.score +03)----Filter: agg_parquet.category = Utf8View("alpha") +04)------TableScan: agg_parquet projection=[category, score], partial_filters=[agg_parquet.category = Utf8View("alpha")] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_parquet.score)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_parquet.score)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha, pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] + +statement ok +SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; + +# Test 5: Backward compatibility # First, set both new configs to specific values statement ok @@ -229,7 +303,7 @@ set datafusion.catalog.information_schema = true statement ok SET datafusion.optimizer.enable_dynamic_filter_pushdown = false; -# Verify both configs are now false +# Verify all configs are now false query T SELECT value FROM information_schema.df_settings WHERE name = 'datafusion.optimizer.enable_topk_dynamic_filter_pushdown'; @@ -242,6 +316,12 @@ WHERE name = 'datafusion.optimizer.enable_join_dynamic_filter_pushdown'; ---- false +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown'; +---- +false + statement ok set datafusion.catalog.information_schema = false @@ -272,7 +352,7 @@ SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; statement ok set datafusion.catalog.information_schema = true -# Verify both configs are now true +# Verify all configs are now true query T SELECT value FROM information_schema.df_settings WHERE name = 'datafusion.optimizer.enable_topk_dynamic_filter_pushdown'; @@ -285,6 +365,12 @@ WHERE name = 'datafusion.optimizer.enable_join_dynamic_filter_pushdown'; ---- true +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown'; +---- +true + statement ok set datafusion.catalog.information_schema = false @@ -328,6 +414,12 @@ DROP TABLE left_parquet; statement ok DROP TABLE right_parquet; +statement ok +DROP TABLE agg_source; + +statement ok +DROP TABLE agg_parquet; + # Reset configs to defaults statement ok SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; @@ -335,5 +427,8 @@ SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; statement ok SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; +statement ok +SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; + statement ok SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index eba527ed2b21..610fc0f6683a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -290,6 +290,7 @@ datafusion.format.timestamp_tz_format NULL datafusion.format.types_info false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 +datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_join_dynamic_filter_pushdown true @@ -412,8 +413,9 @@ datafusion.format.timestamp_tz_format NULL Timestamp format for timestamp with t datafusion.format.types_info false Show types in visual representation batches datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). +datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. -datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. +datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c3eda544a1de..59c002a793b8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -136,7 +136,8 @@ The following configuration settings are available: | datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | | datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | -| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | +| datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |