From 67412fb75e077b1ceb912de9c77c89c1930e9667 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Tue, 19 Aug 2025 22:13:09 +0200 Subject: [PATCH 1/5] Check-in NestedLoopJoinProjectionPushDown --- datafusion/physical-optimizer/Cargo.toml | 1 + datafusion/physical-optimizer/src/lib.rs | 1 + .../src/nl_join_projection_pushdown.rs | 809 ++++++++++++++++++ .../physical-optimizer/src/optimizer.rs | 4 + 4 files changed, 815 insertions(+) create mode 100644 datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 15466cd86bb0..4df011fc0a05 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -52,5 +52,6 @@ recursive = { workspace = true, optional = true } [dev-dependencies] datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } insta = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 79db43c1cbe9..c5d135e90d9e 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -44,5 +44,6 @@ pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; pub mod utils; +mod nl_join_projection_pushdown; pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs b/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs new file mode 100644 index 000000000000..0205ffb882b9 --- /dev/null +++ b/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs @@ -0,0 +1,809 @@ +use crate::PhysicalOptimizerRule; +use arrow::datatypes::{Fields, Schema, SchemaRef}; +use datafusion_common::alias::AliasGenerator; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{JoinSide, JoinType, Result}; +use datafusion_expr_common::signature::Volatility; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use datafusion_physical_plan::joins::NestedLoopJoinExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::ExecutionPlan; +use std::collections::HashSet; +use std::sync::Arc; + +/// Tries to push down projections from join filters that only depend on one side of the join. +/// +/// This can be a crucial optimization for nested loop joins. By pushing these projections +/// down, even functions that only depend on one side of the join must be done for all row +/// combinations. +#[derive(Debug)] +pub struct NestedLoopJoinProjectionPushDown; + +impl NestedLoopJoinProjectionPushDown { + /// Creates a new [NestedLoopJoinProjectionPushDown]. + pub fn new() -> Self { + Self {} + } +} + +impl Default for NestedLoopJoinProjectionPushDown { + fn default() -> Self { + Self::new() + } +} + +impl PhysicalOptimizerRule for NestedLoopJoinProjectionPushDown { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + let alias_generator = AliasGenerator::new(); + plan.transform_up(|plan| { + match plan.as_any().downcast_ref::() { + None => Ok(Transformed::no(plan)), + Some(hash_join) => try_push_down_join_filter( + Arc::clone(&plan), + hash_join, + config, + &alias_generator, + ), + } + }) + .map(|t| t.data) + } + fn name(&self) -> &str { + "nl-join-projection-push-down" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Tries to push down parts of the filter. +/// +/// See [JoinFilterRewriter] for details. +fn try_push_down_join_filter( + original_plan: Arc, + join: &NestedLoopJoinExec, + config: &ConfigOptions, + alias_generator: &AliasGenerator, +) -> Result>> { + // Mark joins are currently not supported. + if matches!(join.join_type(), JoinType::LeftMark | JoinType::RightMark) { + return Ok(Transformed::no(original_plan)); + } + + let projections = join.projection(); + let Some(filter) = join.filter() else { + return Ok(Transformed::no(original_plan)); + }; + + let original_lhs_length = join.left().schema().fields().len(); + let original_rhs_length = join.right().schema().fields().len(); + + let lhs_rewrite = try_push_down_projection( + Arc::clone(&join.right().schema()), + Arc::clone(join.left()), + JoinSide::Left, + filter.clone(), + config, + alias_generator, + )?; + let rhs_rewrite = try_push_down_projection( + Arc::clone(&lhs_rewrite.data.0.schema()), + Arc::clone(join.right()), + JoinSide::Right, + lhs_rewrite.data.1, + config, + alias_generator, + )?; + if !lhs_rewrite.transformed && !rhs_rewrite.transformed { + return Ok(Transformed::no(original_plan)); + } + + let join_filter = minimize_join_filter( + Arc::clone(rhs_rewrite.data.1.expression()), + rhs_rewrite.data.1.column_indices().to_vec(), + lhs_rewrite.data.0.schema().as_ref(), + rhs_rewrite.data.0.schema().as_ref(), + ); + + let new_lhs_length = lhs_rewrite.data.0.schema().fields.len(); + let projections = match projections { + None => match join.join_type() { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { + // Build projections that ignore the newly projected columns. + let mut projections = Vec::new(); + projections.extend(0..original_lhs_length); + projections.extend(new_lhs_length..new_lhs_length + original_rhs_length); + projections + } + JoinType::LeftSemi | JoinType::LeftAnti => { + // Only return original left columns + let mut projections = Vec::new(); + projections.extend(0..original_lhs_length); + projections + } + JoinType::RightSemi | JoinType::RightAnti => { + // Only return original right columns + let mut projections = Vec::new(); + projections.extend(0..original_rhs_length); + projections + } + _ => unreachable!("Unsupported join type"), + }, + Some(projections) => { + let rhs_offset = new_lhs_length - original_lhs_length; + projections + .iter() + .map(|idx| { + if *idx >= original_lhs_length { + idx + rhs_offset + } else { + *idx + } + }) + .collect() + } + }; + + Ok(Transformed::yes(Arc::new(NestedLoopJoinExec::try_new( + lhs_rewrite.data.0, + rhs_rewrite.data.0, + Some(join_filter), + join.join_type(), + Some(projections), + )?))) +} + +/// Tries to push down parts of `expr` into the `join_side`. +fn try_push_down_projection( + other_schema: SchemaRef, + plan: Arc, + join_side: JoinSide, + join_filter: JoinFilter, + config: &ConfigOptions, + alias_generator: &AliasGenerator, +) -> Result, JoinFilter)>> { + let expr = join_filter.expression().clone(); + let original_plan_schema = plan.schema(); + let mut rewriter = JoinFilterRewriter::new( + join_side, + original_plan_schema.as_ref(), + join_filter.column_indices().to_vec(), + alias_generator, + ); + let new_expr = rewriter.rewrite(expr)?; + + if new_expr.transformed { + let plan = ensure_batch_size(plan, config); + let new_join_side = + ProjectionExec::try_new(rewriter.join_side_projections, plan)?; + let new_schema = Arc::clone(&new_join_side.schema()); + + let (lhs_schema, rhs_schema) = match join_side { + JoinSide::Left => (new_schema, other_schema), + JoinSide::Right => (other_schema, new_schema), + JoinSide::None => unreachable!("Mark join not supported"), + }; + let intermediate_schema = rewriter + .intermediate_column_indices + .iter() + .map(|ci| match ci.side { + JoinSide::Left => lhs_schema.fields[ci.index].clone(), + JoinSide::Right => rhs_schema.fields[ci.index].clone(), + JoinSide::None => unreachable!("Mark join not supported"), + }) + .collect::(); + + let join_filter = JoinFilter::new( + new_expr.data, + rewriter.intermediate_column_indices, + Arc::new(Schema::new(intermediate_schema)), + ); + Ok(Transformed::yes((Arc::new(new_join_side), join_filter))) + } else { + Ok(Transformed::no((plan, join_filter))) + } +} + +/// Adds a [CoalesceBatchesExec], if necessary. +/// +/// The nested loop join can handle small batches quite efficiently, but our UDFs suffer greatly +/// from small batches. +fn ensure_batch_size( + plan: Arc, + config: &ConfigOptions, +) -> Arc { + if plan + .as_any() + .downcast_ref::() + .is_some() + { + plan + } else { + Arc::new(CoalesceBatchesExec::new(plan, config.execution.batch_size)) + } +} + +/// Creates a new [JoinFilter] and tries to minimize the internal schema. +fn minimize_join_filter( + expr: Arc, + old_column_indices: Vec, + lhs_schema: &Schema, + rhs_schema: &Schema, +) -> JoinFilter { + let mut used_columns = HashSet::new(); + expr.apply(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + used_columns.insert(col.index()); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("Closure cannot fail"); + + let new_column_indices = old_column_indices + .iter() + .enumerate() + .filter(|(idx, _)| used_columns.contains(idx)) + .map(|(_, ci)| ci.clone()) + .collect::>(); + let fields = new_column_indices + .iter() + .map(|ci| match ci.side { + JoinSide::Left => lhs_schema.field(ci.index).clone(), + JoinSide::Right => rhs_schema.field(ci.index).clone(), + JoinSide::None => unreachable!("Mark join not supported"), + }) + .collect::(); + + let final_expr = expr + .transform_up(|expr| match expr.as_any().downcast_ref::() { + None => Ok(Transformed::no(expr)), + Some(column) => { + let new_idx = used_columns + .iter() + .filter(|idx| **idx < column.index()) + .count(); + let new_column = Column::new(column.name(), new_idx); + Ok(Transformed::yes( + Arc::new(new_column) as Arc + )) + } + }) + .expect("Closure cannot fail"); + + JoinFilter::new( + final_expr.data, + new_column_indices, + Arc::new(Schema::new(fields)), + ) +} + +/// Implements the push-down machinery. +/// +/// The rewriter starts at the top of the filter expression and traverses the expression tree. For +/// each (sub-)expression, the rewriter checks whether it only refers to one side of the join. If +/// this is never the case, no subexpressions of the filter can be pushed down. If there is a +/// subexpression that can be computed using only one side of the join, the entire subexpression is +/// pushed down to the join side. +struct JoinFilterRewriter<'a> { + join_side: JoinSide, + join_side_schema: &'a Schema, + join_side_projections: Vec<(Arc, String)>, + intermediate_column_indices: Vec, + alias_generator: &'a AliasGenerator, +} + +impl<'a> JoinFilterRewriter<'a> { + /// Creates a new [JoinFilterRewriter]. + fn new( + join_side: JoinSide, + join_side_schema: &'a Schema, + column_indices: Vec, + alias_generator: &'a AliasGenerator, + ) -> Self { + let projections = join_side_schema + .fields() + .iter() + .enumerate() + .map(|(idx, field)| { + ( + Arc::new(Column::new(field.name(), idx)) as Arc, + field.name().to_string(), + ) + }) + .collect(); + + Self { + join_side, + join_side_schema, + join_side_projections: projections, + intermediate_column_indices: column_indices, + alias_generator, + } + } + + /// Executes the push-down machinery on `expr`. + /// + /// See the [JoinFilterRewriter] for further information. + fn rewrite( + &mut self, + expr: Arc, + ) -> Result>> { + let depends_on_this_side = self.depends_on_join_side(&expr, self.join_side)?; + // We don't push down things that do not depend on this side (other side or no side). + if !depends_on_this_side { + return Ok(Transformed::no(expr)); + } + + // Recurse if there is a dependency to both sides or if the entire expression is volatile. + let depends_on_other_side = + self.depends_on_join_side(&expr, self.join_side.negate())?; + let is_volatile = is_volatile(expr.as_ref()); + if depends_on_other_side || is_volatile { + return expr.map_children(|expr| self.rewrite(expr)); + } + + // There is only a dependency on this side. + + // If this expression has no children, we do not push down, as it should already be a column + // reference. + if expr.children().is_empty() { + return Ok(Transformed::no(expr)); + } + + // Otherwise, we push down a projection. + let alias = self.alias_generator.next("join_proj_push_down"); + let idx = self.create_new_column(alias.clone(), expr)?; + + Ok(Transformed::yes( + Arc::new(Column::new(&alias, idx)) as Arc + )) + } + + /// Creates a new column in the current join side. + fn create_new_column( + &mut self, + name: String, + expr: Arc, + ) -> Result { + // First, add a new projection. The expression must be rewritten, as it is no longer + // executed against the filter schema. + let new_idx = self.join_side_projections.len(); + let rewritten_expr = expr.transform_up(|expr| { + Ok(match expr.as_any().downcast_ref::() { + None => Transformed::no(expr), + Some(column) => { + let intermediate_column = + &self.intermediate_column_indices[column.index()]; + assert_eq!(intermediate_column.side, self.join_side); + + let join_side_index = intermediate_column.index; + let field = self.join_side_schema.field(join_side_index); + let new_column = Column::new(field.name(), join_side_index); + Transformed::yes(Arc::new(new_column) as Arc) + } + }) + })?; + self.join_side_projections.push((rewritten_expr.data, name)); + + // Then, update the column indices + let new_intermediate_idx = self.intermediate_column_indices.len(); + let idx = ColumnIndex { + index: new_idx, + side: self.join_side, + }; + self.intermediate_column_indices.push(idx); + + Ok(new_intermediate_idx) + } + + /// Checks whether the entire expression depends on the current join side. + fn depends_on_join_side( + &mut self, + expr: &Arc, + join_side: JoinSide, + ) -> Result { + let mut result = false; + expr.apply(|expr| match expr.as_any().downcast_ref::() { + None => Ok(TreeNodeRecursion::Continue), + Some(c) => { + let column_index = &self.intermediate_column_indices[c.index()]; + if column_index.side == join_side { + result = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + } + })?; + + Ok(result) + } +} + +fn is_volatile(expr: &dyn PhysicalExpr) -> bool { + match expr.as_any().downcast_ref::() { + None => expr + .children() + .iter() + .map(|expr| is_volatile(expr.as_ref())) + .reduce(|lhs, rhs| lhs || rhs) + .unwrap_or(false), + Some(expr) => expr.fun().signature().volatility == Volatility::Volatile, + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::{DataType, Field, FieldRef, Schema}; + use datafusion_expr::{ScalarUDF, ScalarUDFImpl}; + use datafusion_expr_common::operator::Operator; + use datafusion_physical_expr::expressions::{binary, lit}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_plan::displayable; + use datafusion_physical_plan::empty::EmptyExec; + use insta::assert_snapshot; + use std::sync::Arc; + use datafusion_functions::math::random; + + #[tokio::test] + async fn no_computation_does_not_project() -> Result<()> { + let (left_schema, right_schema) = create_simple_schemas(); + let optimized_plan = run_test( + left_schema, + right_schema, + a_x(), + None, + a_greater_than_x, + JoinType::Inner, + )?; + + assert_snapshot!(optimized_plan, @r" + NestedLoopJoinExec: join_type=Inner, filter=a@0 > x@1 + EmptyExec + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn simple_push_down() -> Result<()> { + let (left_schema, right_schema) = create_simple_schemas(); + let optimized_plan = run_test( + left_schema, + right_schema, + a_x(), + None, + a_plus_one_greater_than_x_plus_one, + JoinType::Inner, + )?; + + assert_snapshot!(optimized_plan, @r" + NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2] + ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn does_not_push_down_short_circuiting_expressions() -> Result<()> { + let (left_schema, right_schema) = create_simple_schemas(); + let optimized_plan = run_test( + left_schema, + right_schema, + a_x(), + None, + |schema| { + binary( + lit(false), + Operator::And, + a_plus_one_greater_than_x_plus_one(schema)?, + schema, + ) + }, + JoinType::Inner, + )?; + + assert_snapshot!(optimized_plan, @r" + NestedLoopJoinExec: join_type=Inner, filter=false AND join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2] + ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn does_not_push_down_volatile_functions() -> Result<()> { + let (left_schema, right_schema) = create_simple_schemas(); + let optimized_plan = run_test( + left_schema, + right_schema, + a_x(), + None, + a_plus_rand_greater_than_x, + JoinType::Inner, + )?; + + assert_snapshot!(optimized_plan, @r" + NestedLoopJoinExec: join_type=Inner, filter=a@0 + rand() > x@1 + EmptyExec + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn complex_schema_push_down() -> Result<()> { + let (left_schema, right_schema) = create_complex_schemas(); + + let optimized_plan = run_test( + left_schema, + right_schema, + a_b_x_z(), + None, + a_plus_b_greater_than_x_plus_z, + JoinType::Inner, + )?; + + assert_snapshot!(optimized_plan, @r" + NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, b@1, c@2, x@4, y@5, z@6] + ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn push_down_with_existing_projections() -> Result<()> { + let (left_schema, right_schema) = create_complex_schemas(); + + let optimized_plan = run_test( + left_schema, + right_schema, + a_b_x_z(), + Some(vec![1, 3, 5]), // ("b", "x", "z") + a_plus_b_greater_than_x_plus_z, + JoinType::Inner, + )?; + + assert_snapshot!(optimized_plan, @r" + NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[b@1, x@4, z@6] + ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn left_semi_join_projection() -> Result<()> { + let (left_schema, right_schema) = create_simple_schemas(); + + let left_semi_join_plan = run_test( + left_schema.clone(), + right_schema.clone(), + a_x(), + None, + a_plus_one_greater_than_x_plus_one, + JoinType::LeftSemi, + )?; + + assert_snapshot!(left_semi_join_plan, @r" + NestedLoopJoinExec: join_type=LeftSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0] + ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + "); + Ok(()) + } + + #[tokio::test] + async fn right_semi_join_projection() -> Result<()> { + let (left_schema, right_schema) = create_simple_schemas(); + let right_semi_join_plan = run_test( + left_schema, + right_schema, + a_x(), + None, + a_plus_one_greater_than_x_plus_one, + JoinType::RightSemi, + )?; + assert_snapshot!(right_semi_join_plan, @r" + NestedLoopJoinExec: join_type=RightSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[x@0] + ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2] + CoalesceBatchesExec: target_batch_size=8192 + EmptyExec + "); + Ok(()) + } + + fn run_test( + left_schema: Schema, + right_schema: Schema, + column_indices: Vec, + existing_projections: Option>, + filter_expr_builder: impl FnOnce(&Schema) -> Result>, + join_type: JoinType, + ) -> Result { + let left = Arc::new(EmptyExec::new(Arc::new(left_schema.clone()))); + let right = Arc::new(EmptyExec::new(Arc::new(right_schema.clone()))); + + let join_fields: Vec<_> = column_indices + .iter() + .map(|ci| match ci.side { + JoinSide::Left => left_schema.field(ci.index).clone(), + JoinSide::Right => right_schema.field(ci.index).clone(), + JoinSide::None => unreachable!(), + }) + .collect(); + let join_schema = Arc::new(Schema::new(join_fields)); + + let filter_expr = filter_expr_builder(join_schema.as_ref())?; + + let join_filter = JoinFilter::new(filter_expr, column_indices, join_schema); + + let join = NestedLoopJoinExec::try_new( + left, + right, + Some(join_filter), + &join_type, + existing_projections, + )?; + + let optimizer = NestedLoopJoinProjectionPushDown::new(); + let optimized_plan = optimizer.optimize(Arc::new(join), &Default::default())?; + + let displayable_plan = displayable(optimized_plan.as_ref()).indent(false); + Ok(displayable_plan.to_string()) + } + + fn create_simple_schemas() -> (Schema, Schema) { + let left_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let right_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + + (left_schema, right_schema) + } + + fn create_complex_schemas() -> (Schema, Schema) { + let left_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let right_schema = Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + Field::new("z", DataType::Int32, false), + ]); + + (left_schema, right_schema) + } + + fn a_x() -> Vec { + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ] + } + + fn a_b_x_z() -> Vec { + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 1, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ColumnIndex { + index: 2, + side: JoinSide::Right, + }, + ] + } + + fn a_plus_one_greater_than_x_plus_one( + join_schema: &Schema, + ) -> Result> { + let left_expr = binary( + Arc::new(Column::new("a", 0)), + Operator::Plus, + lit(1), + &join_schema, + )?; + let right_expr = binary( + Arc::new(Column::new("x", 1)), + Operator::Plus, + lit(1), + &join_schema, + )?; + binary(left_expr, Operator::Gt, right_expr, &join_schema) + } + + fn a_plus_rand_greater_than_x(join_schema: &Schema) -> Result> { + let left_expr = binary( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(ScalarFunctionExpr::new( + "rand", + random(), + vec![], + FieldRef::new(Field::new("out", DataType::Float64, false)), + Arc::new(ConfigOptions::default()), + )), + join_schema, + )?; + let right_expr = Arc::new(Column::new("x", 1)); + binary(left_expr, Operator::Gt, right_expr, &join_schema) + } + + fn a_greater_than_x(join_schema: &Schema) -> Result> { + binary( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Column::new("x", 1)), + &join_schema, + ) + } + + fn a_plus_b_greater_than_x_plus_z( + join_schema: &Schema, + ) -> Result> { + let lhs = binary( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Column::new("b", 1)), + &join_schema, + )?; + let rhs = binary( + Arc::new(Column::new("x", 2)), + Operator::Plus, + Arc::new(Column::new("z", 3)), + &join_schema, + )?; + binary(lhs, Operator::Gt, rhs, &join_schema) + } +} diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 4d00f1029db7..f749b4745b03 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -38,6 +38,7 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::coalesce_async_exec_input::CoalesceAsyncExecInput; use crate::limit_pushdown_past_window::LimitPushPastWindows; +use crate::nl_join_projection_pushdown::NestedLoopJoinProjectionPushDown; use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_physical_plan::ExecutionPlan; @@ -94,6 +95,9 @@ impl PhysicalOptimizer { // repartitioning and local sorting steps to meet distribution and ordering requirements. // Therefore, it should run before EnforceDistribution and EnforceSorting. Arc::new(JoinSelection::new()), + // After the join type has been determined, the NestedLoopJoinProjectionPushDown rule + // will try to push parts of nested loop join filter expressions down as projections. + Arc::new(NestedLoopJoinProjectionPushDown::new()), // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule, // as that rule may inject other operations in between the different AggregateExecs. // Applying the rule early means only directly-connected AggregateExecs must be examined. From c414159fca5db9b17ecf9f3a810430fc29ca8562 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Oct 2025 19:00:32 +0200 Subject: [PATCH 2/5] Update Cargo.lock --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index a2939f425712..2e4f9a0089c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2451,6 +2451,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-expr-common", + "datafusion-functions", "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", From 135a9462ac9a7fd8801104c951ed80b78beadd33 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Oct 2025 19:25:31 +0200 Subject: [PATCH 3/5] Add some comments --- .../src/nl_join_projection_pushdown.rs | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs b/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs index 0205ffb882b9..0055fbd9f507 100644 --- a/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs @@ -4,9 +4,7 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{JoinSide, JoinType, Result}; -use datafusion_expr_common::signature::Volatility; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; @@ -18,9 +16,9 @@ use std::sync::Arc; /// Tries to push down projections from join filters that only depend on one side of the join. /// -/// This can be a crucial optimization for nested loop joins. By pushing these projections -/// down, even functions that only depend on one side of the join must be done for all row -/// combinations. +/// This optimization is currently only applied to nested loop joins. By pushing these projections +/// down, functions that only depend on one side of the join must be done for the cartesian product +/// of the two sides. #[derive(Debug)] pub struct NestedLoopJoinProjectionPushDown; @@ -234,6 +232,10 @@ fn ensure_batch_size( } /// Creates a new [JoinFilter] and tries to minimize the internal schema. +/// +/// This could eliminate some columns that were only part of a computation that has been pushed +/// down. As this computation is now materialized on one side of the join, the original input +/// columns are not needed anymore. fn minimize_join_filter( expr: Arc, old_column_indices: Vec, @@ -347,7 +349,7 @@ impl<'a> JoinFilterRewriter<'a> { // Recurse if there is a dependency to both sides or if the entire expression is volatile. let depends_on_other_side = self.depends_on_join_side(&expr, self.join_side.negate())?; - let is_volatile = is_volatile(expr.as_ref()); + let is_volatile = is_volatile_expression_tree(expr.as_ref()); if depends_on_other_side || is_volatile { return expr.map_children(|expr| self.rewrite(expr)); } @@ -429,31 +431,31 @@ impl<'a> JoinFilterRewriter<'a> { } } -fn is_volatile(expr: &dyn PhysicalExpr) -> bool { - match expr.as_any().downcast_ref::() { - None => expr - .children() - .iter() - .map(|expr| is_volatile(expr.as_ref())) - .reduce(|lhs, rhs| lhs || rhs) - .unwrap_or(false), - Some(expr) => expr.fun().signature().volatility == Volatility::Volatile, +fn is_volatile_expression_tree(expr: &dyn PhysicalExpr) -> bool { + if expr.is_volatile_node() { + return true; } + + expr.children() + .iter() + .map(|expr| is_volatile_expression_tree(expr.as_ref())) + .reduce(|lhs, rhs| lhs || rhs) + .unwrap_or(false) } #[cfg(test)] mod test { use super::*; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; - use datafusion_expr::{ScalarUDF, ScalarUDFImpl}; use datafusion_expr_common::operator::Operator; + use datafusion_functions::math::random; use datafusion_physical_expr::expressions::{binary, lit}; + use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::displayable; use datafusion_physical_plan::empty::EmptyExec; use insta::assert_snapshot; use std::sync::Arc; - use datafusion_functions::math::random; #[tokio::test] async fn no_computation_does_not_project() -> Result<()> { From 5508a8de5e191f53393b117584ce616541982c02 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Oct 2025 20:01:22 +0200 Subject: [PATCH 4/5] Update slts that are affected by the nl-join-projection-push-down --- .../sqllogictest/test_files/explain.slt | 4 + .../sqllogictest/test_files/join.slt.part | 6 +- datafusion/sqllogictest/test_files/joins.slt | 14 ++- .../test_files/tpch/plans/q11.slt.part | 86 ++++++++++--------- .../test_files/tpch/plans/q22.slt.part | 36 ++++---- 5 files changed, 81 insertions(+), 65 deletions(-) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index a3b6d40aea2d..1fa63c2fb12e 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -230,6 +230,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after nl-join-projection-push-down SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE @@ -309,6 +310,7 @@ physical_plan after OutputRequirements 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after nl-join-projection-push-down SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE @@ -354,6 +356,7 @@ physical_plan after OutputRequirements 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after nl-join-projection-push-down SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE @@ -594,6 +597,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after nl-join-projection-push-down SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/join.slt.part b/datafusion/sqllogictest/test_files/join.slt.part index 2abe654a96c8..3ca5f1d981d3 100644 --- a/datafusion/sqllogictest/test_files/join.slt.part +++ b/datafusion/sqllogictest/test_files/join.slt.part @@ -849,9 +849,11 @@ logical_plan 05)----TableScan: department projection=[dept_name] physical_plan 01)ProjectionExec: expr=[emp_id@1 as emp_id, name@2 as name, dept_name@0 as dept_name] -02)--NestedLoopJoinExec: join_type=Right, filter=name@0 = Alice OR name@0 = Bob +02)--NestedLoopJoinExec: join_type=Right, filter=join_proj_push_down_1@0, projection=[dept_name@0, emp_id@1, name@2] 03)----DataSourceExec: partitions=1, partition_sizes=[1] -04)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----ProjectionExec: expr=[emp_id@0 as emp_id, name@1 as name, name@1 = Alice OR name@1 = Bob as join_proj_push_down_1] +05)------CoalesceBatchesExec: target_batch_size=8192 +06)--------DataSourceExec: partitions=1, partition_sizes=[1] query ITT rowsort SELECT e.emp_id, e.name, d.dept_name diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 96d2bad086e6..a8f634655a73 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3519,10 +3519,16 @@ logical_plan 04)--SubqueryAlias: t2 05)----TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan -01)NestedLoopJoinExec: join_type=Inner, filter=example(CAST(a@0 AS Float64), CAST(a@1 AS Float64)) > 3 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true -03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +01)NestedLoopJoinExec: join_type=Inner, filter=example(join_proj_push_down_1@0, join_proj_push_down_2@1) > 3, projection=[a0@0, a@1, b@2, c@3, d@4, a0@6, a@7, b@8, c@9, d@10] +02)--CoalescePartitionsExec +03)----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_1] +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------CoalesceBatchesExec: target_batch_size=2 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +07)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] +08)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +09)------CoalesceBatchesExec: target_batch_size=2 +10)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true #### # Config teardown diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part index a6225daae436..0320958ed44b 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part @@ -75,51 +75,53 @@ logical_plan physical_plan 01)SortExec: TopK(fetch=10), expr=[value@1 DESC], preserve_partitioning=[false] 02)--ProjectionExec: expr=[ps_partkey@0 as ps_partkey, sum(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value] -03)----NestedLoopJoinExec: join_type=Inner, filter=CAST(sum(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Decimal128(38, 15)) > sum(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@1, projection=[ps_partkey@0, sum(partsupp.ps_supplycost * partsupp.ps_availqty)@1] +03)----NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@1 > sum(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@0, projection=[ps_partkey@0, sum(partsupp.ps_supplycost * partsupp.ps_availqty)@1, sum(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@3] 04)------CoalescePartitionsExec -05)--------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] +05)--------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, sum(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as sum(partsupp.ps_supplycost * partsupp.ps_availqty), CAST(sum(partsupp.ps_supplycost * partsupp.ps_availqty)@1 AS Decimal128(38, 15)) as join_proj_push_down_1] 06)----------CoalesceBatchesExec: target_batch_size=8192 -07)------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 -08)--------------AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] -09)----------------CoalesceBatchesExec: target_batch_size=8192 -10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@3, n_nationkey@0)], projection=[ps_partkey@0, ps_availqty@1, ps_supplycost@2] +07)------------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] +08)--------------CoalesceBatchesExec: target_batch_size=8192 +09)----------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 +10)------------------AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] 11)--------------------CoalesceBatchesExec: target_batch_size=8192 -12)----------------------RepartitionExec: partitioning=Hash([s_nationkey@3], 4), input_partitions=4 +12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@3, n_nationkey@0)], projection=[ps_partkey@0, ps_availqty@1, ps_supplycost@2] 13)------------------------CoalesceBatchesExec: target_batch_size=8192 -14)--------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@1, s_suppkey@0)], projection=[ps_partkey@0, ps_availqty@2, ps_supplycost@3, s_nationkey@5] +14)--------------------------RepartitionExec: partitioning=Hash([s_nationkey@3], 4), input_partitions=4 15)----------------------------CoalesceBatchesExec: target_batch_size=8192 -16)------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@1], 4), input_partitions=4 -17)--------------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost], file_type=csv, has_header=false -18)----------------------------CoalesceBatchesExec: target_batch_size=8192 -19)------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 -20)--------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -21)----------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], file_type=csv, has_header=false -22)--------------------CoalesceBatchesExec: target_batch_size=8192 -23)----------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4 +16)------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@1, s_suppkey@0)], projection=[ps_partkey@0, ps_availqty@2, ps_supplycost@3, s_nationkey@5] +17)--------------------------------CoalesceBatchesExec: target_batch_size=8192 +18)----------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@1], 4), input_partitions=4 +19)------------------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost], file_type=csv, has_header=false +20)--------------------------------CoalesceBatchesExec: target_batch_size=8192 +21)----------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 +22)------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +23)--------------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], file_type=csv, has_header=false 24)------------------------CoalesceBatchesExec: target_batch_size=8192 -25)--------------------------FilterExec: n_name@1 = GERMANY, projection=[n_nationkey@0] -26)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -27)------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], file_type=csv, has_header=false -28)------ProjectionExec: expr=[CAST(CAST(sum(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as sum(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)] -29)--------AggregateExec: mode=Final, gby=[], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] -30)----------CoalescePartitionsExec -31)------------AggregateExec: mode=Partial, gby=[], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] -32)--------------CoalesceBatchesExec: target_batch_size=8192 -33)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1] -34)------------------CoalesceBatchesExec: target_batch_size=8192 -35)--------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 4), input_partitions=4 -36)----------------------CoalesceBatchesExec: target_batch_size=8192 -37)------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, ps_supplycost@2, s_nationkey@4] -38)--------------------------CoalesceBatchesExec: target_batch_size=8192 -39)----------------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 -40)------------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_suppkey, ps_availqty, ps_supplycost], file_type=csv, has_header=false -41)--------------------------CoalesceBatchesExec: target_batch_size=8192 -42)----------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 -43)------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -44)--------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], file_type=csv, has_header=false -45)------------------CoalesceBatchesExec: target_batch_size=8192 -46)--------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4 -47)----------------------CoalesceBatchesExec: target_batch_size=8192 -48)------------------------FilterExec: n_name@1 = GERMANY, projection=[n_nationkey@0] -49)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -50)----------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], file_type=csv, has_header=false +25)--------------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4 +26)----------------------------CoalesceBatchesExec: target_batch_size=8192 +27)------------------------------FilterExec: n_name@1 = GERMANY, projection=[n_nationkey@0] +28)--------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +29)----------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], file_type=csv, has_header=false +30)------ProjectionExec: expr=[CAST(CAST(sum(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as sum(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)] +31)--------AggregateExec: mode=Final, gby=[], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] +32)----------CoalescePartitionsExec +33)------------AggregateExec: mode=Partial, gby=[], aggr=[sum(partsupp.ps_supplycost * partsupp.ps_availqty)] +34)--------------CoalesceBatchesExec: target_batch_size=8192 +35)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1] +36)------------------CoalesceBatchesExec: target_batch_size=8192 +37)--------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 4), input_partitions=4 +38)----------------------CoalesceBatchesExec: target_batch_size=8192 +39)------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, ps_supplycost@2, s_nationkey@4] +40)--------------------------CoalesceBatchesExec: target_batch_size=8192 +41)----------------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 +42)------------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_suppkey, ps_availqty, ps_supplycost], file_type=csv, has_header=false +43)--------------------------CoalesceBatchesExec: target_batch_size=8192 +44)----------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 +45)------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +46)--------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], file_type=csv, has_header=false +47)------------------CoalesceBatchesExec: target_batch_size=8192 +48)--------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4 +49)----------------------CoalesceBatchesExec: target_batch_size=8192 +50)------------------------FilterExec: n_name@1 = GERMANY, projection=[n_nationkey@0] +51)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +52)----------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], file_type=csv, has_header=false diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part index fc9c01843cc7..dc615028d232 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part @@ -83,23 +83,25 @@ physical_plan 07)------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], aggr=[count(Int64(1)), sum(custsale.c_acctbal)] 08)--------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal] 09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > avg(customer.c_acctbal)@1 +10)------------------NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@1 > avg(customer.c_acctbal)@0, projection=[c_phone@0, c_acctbal@1, avg(customer.c_acctbal)@3] 11)--------------------CoalescePartitionsExec -12)----------------------CoalesceBatchesExec: target_batch_size=8192 -13)------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] +12)----------------------ProjectionExec: expr=[c_phone@0 as c_phone, c_acctbal@1 as c_acctbal, CAST(c_acctbal@1 AS Decimal128(19, 6)) as join_proj_push_down_1] +13)------------------------CoalesceBatchesExec: target_batch_size=8192 14)--------------------------CoalesceBatchesExec: target_batch_size=8192 -15)----------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4 +15)----------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] 16)------------------------------CoalesceBatchesExec: target_batch_size=8192 -17)--------------------------------FilterExec: substr(c_phone@1, 1, 2) IN ([13, 31, 23, 29, 30, 18, 17]) -18)----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -19)------------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_phone, c_acctbal], file_type=csv, has_header=false -20)--------------------------CoalesceBatchesExec: target_batch_size=8192 -21)----------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 4), input_partitions=4 -22)------------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_custkey], file_type=csv, has_header=false -23)--------------------AggregateExec: mode=Final, gby=[], aggr=[avg(customer.c_acctbal)] -24)----------------------CoalescePartitionsExec -25)------------------------AggregateExec: mode=Partial, gby=[], aggr=[avg(customer.c_acctbal)] -26)--------------------------CoalesceBatchesExec: target_batch_size=8192 -27)----------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND substr(c_phone@0, 1, 2) IN ([13, 31, 23, 29, 30, 18, 17]), projection=[c_acctbal@1] -28)------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -29)--------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_phone, c_acctbal], file_type=csv, has_header=false +17)--------------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4 +18)----------------------------------CoalesceBatchesExec: target_batch_size=8192 +19)------------------------------------FilterExec: substr(c_phone@1, 1, 2) IN ([13, 31, 23, 29, 30, 18, 17]) +20)--------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +21)----------------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_phone, c_acctbal], file_type=csv, has_header=false +22)------------------------------CoalesceBatchesExec: target_batch_size=8192 +23)--------------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 4), input_partitions=4 +24)----------------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_custkey], file_type=csv, has_header=false +25)--------------------AggregateExec: mode=Final, gby=[], aggr=[avg(customer.c_acctbal)] +26)----------------------CoalescePartitionsExec +27)------------------------AggregateExec: mode=Partial, gby=[], aggr=[avg(customer.c_acctbal)] +28)--------------------------CoalesceBatchesExec: target_batch_size=8192 +29)----------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND substr(c_phone@0, 1, 2) IN ([13, 31, 23, 29, 30, 18, 17]), projection=[c_acctbal@1] +30)------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +31)--------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_phone, c_acctbal], file_type=csv, has_header=false From 61999f3324e5e68695180d818f84b73655e9d4d5 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Oct 2025 20:05:07 +0200 Subject: [PATCH 5/5] please lints --- datafusion/physical-optimizer/src/lib.rs | 2 +- .../src/nl_join_projection_pushdown.rs | 39 +++++++++++++------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index c5d135e90d9e..0b987bc9aa5b 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -40,10 +40,10 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub use datafusion_pruning as pruning; +mod nl_join_projection_pushdown; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; pub mod utils; -mod nl_join_projection_pushdown; pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs b/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs index 0055fbd9f507..4f0e7d4e5f1b 100644 --- a/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/nl_join_projection_pushdown.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::PhysicalOptimizerRule; use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::alias::AliasGenerator; @@ -170,7 +187,7 @@ fn try_push_down_projection( config: &ConfigOptions, alias_generator: &AliasGenerator, ) -> Result, JoinFilter)>> { - let expr = join_filter.expression().clone(); + let expr = Arc::clone(join_filter.expression()); let original_plan_schema = plan.schema(); let mut rewriter = JoinFilterRewriter::new( join_side, @@ -195,8 +212,8 @@ fn try_push_down_projection( .intermediate_column_indices .iter() .map(|ci| match ci.side { - JoinSide::Left => lhs_schema.fields[ci.index].clone(), - JoinSide::Right => rhs_schema.fields[ci.index].clone(), + JoinSide::Left => Arc::clone(&lhs_schema.fields[ci.index]), + JoinSide::Right => Arc::clone(&rhs_schema.fields[ci.index]), JoinSide::None => unreachable!("Mark join not supported"), }) .collect::(); @@ -754,15 +771,15 @@ mod test { Arc::new(Column::new("a", 0)), Operator::Plus, lit(1), - &join_schema, + join_schema, )?; let right_expr = binary( Arc::new(Column::new("x", 1)), Operator::Plus, lit(1), - &join_schema, + join_schema, )?; - binary(left_expr, Operator::Gt, right_expr, &join_schema) + binary(left_expr, Operator::Gt, right_expr, join_schema) } fn a_plus_rand_greater_than_x(join_schema: &Schema) -> Result> { @@ -779,7 +796,7 @@ mod test { join_schema, )?; let right_expr = Arc::new(Column::new("x", 1)); - binary(left_expr, Operator::Gt, right_expr, &join_schema) + binary(left_expr, Operator::Gt, right_expr, join_schema) } fn a_greater_than_x(join_schema: &Schema) -> Result> { @@ -787,7 +804,7 @@ mod test { Arc::new(Column::new("a", 0)), Operator::Gt, Arc::new(Column::new("x", 1)), - &join_schema, + join_schema, ) } @@ -798,14 +815,14 @@ mod test { Arc::new(Column::new("a", 0)), Operator::Plus, Arc::new(Column::new("b", 1)), - &join_schema, + join_schema, )?; let rhs = binary( Arc::new(Column::new("x", 2)), Operator::Plus, Arc::new(Column::new("z", 3)), - &join_schema, + join_schema, )?; - binary(lhs, Operator::Gt, rhs, &join_schema) + binary(lhs, Operator::Gt, rhs, join_schema) } }