From 94bfe0eb7c4ba6c7c84d45a5d49a28ceac64b428 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 22 Jun 2024 08:22:53 -0400 Subject: [PATCH 1/3] Migrate code from `Expr::to_columns` to `Expr::column_refs` --- datafusion/core/src/physical_planner.rs | 2 +- datafusion/expr/src/logical_plan/builder.rs | 6 +++--- datafusion/expr/src/utils.rs | 6 +++--- datafusion/optimizer/src/analyzer/subquery.rs | 2 +- datafusion/optimizer/src/push_down_filter.rs | 19 +++++-------------- datafusion/optimizer/src/utils.rs | 12 +++++++++++- 6 files changed, 24 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b539544d8372..52d976c45cf3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1259,7 +1259,7 @@ impl DefaultPhysicalPlanner { let join_filter = match filter { Some(expr) => { // Extract columns from filter expression and saved in a HashSet - let cols = expr.to_columns()?; + let cols = expr.column_refs(); // Collect left & right field indices, the field indices are sorted in ascending order let left_field_indices = cols diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2f1ece32ab15..434f4dace1de 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -534,11 +534,11 @@ impl LogicalPlanBuilder { .clone() .into_iter() .try_for_each::<_, Result<()>>(|expr| { - let columns = expr.to_columns()?; + let columns = expr.column_refs(); columns.into_iter().for_each(|c| { - if schema.field_from_column(&c).is_err() { - missing_cols.push(c); + if !schema.has_column(c) { + missing_cols.push(c.clone()); } }); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6baabfcc7130..9018b6f5ce62 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -872,7 +872,7 @@ pub fn can_hash(data_type: &DataType) -> bool { /// Check whether all columns are from the schema. pub fn check_all_columns_from_schema( - columns: &HashSet, + columns: &HashSet<&Column>, schema: &DFSchema, ) -> Result { for col in columns.iter() { @@ -900,8 +900,8 @@ pub fn find_valid_equijoin_key_pair( left_schema: &DFSchema, right_schema: &DFSchema, ) -> Result> { - let left_using_columns = left_key.to_columns()?; - let right_using_columns = right_key.to_columns()?; + let left_using_columns = left_key.column_refs(); + let right_using_columns = right_key.column_refs(); // Conditions like a = 10, will be added to non-equijoin. if left_using_columns.is_empty() || right_using_columns.is_empty() { diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 7d1290204eb7..5725a725e64a 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -249,7 +249,7 @@ fn check_aggregation_in_scalar_subquery( let mut group_columns = agg .group_expr .iter() - .map(|group| Ok(group.to_columns()?.into_iter().collect::>())) + .map(|group| Ok(group.column_refs().into_iter().cloned().collect::>())) .collect::>>()? .into_iter() .flatten(); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 89bcd6085cca..d19fafb99ae2 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -41,6 +41,7 @@ use datafusion_expr::{ }; use crate::optimizer::ApplyOrder; +use crate::utils::has_all_refs; use crate::{OptimizerConfig, OptimizerRule}; /// Optimizer rule for pushing (moving) filter expressions down in a plan so @@ -199,13 +200,9 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result>(); - let columns = predicate.to_columns()?; + let columns = predicate.column_refs(); - Ok(schema_columns - .intersection(&columns) - .collect::>() - .len() - == columns.len()) + Ok(has_all_refs(&schema_columns, &columns)) } /// Determine whether the predicate can evaluate as the join conditions @@ -372,14 +369,8 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet) -> Option { - let columns = expr.to_columns().ok().unwrap(); - - if schema_columns - .intersection(&columns) - .collect::>() - .len() - == columns.len() - { + let columns = expr.column_refs(); + if has_all_refs(schema_columns, &columns) { predicate = Some(expr.clone()); } } diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 00aaff196c3b..982ed3bc1776 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -17,7 +17,7 @@ //! Utility functions leveraged by the query optimizer rules -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use crate::{OptimizerConfig, OptimizerRule}; @@ -66,6 +66,16 @@ pub fn optimize_children( } } +/// Returns true if all columns in col_refs are in `schema_cols` +/// +/// Note: can't use `HashSet::intersect` here because they have different types. +pub(crate) fn has_all_refs( + schema_cols: &HashSet, + col_refs: &HashSet<&Column>, +) -> bool { + schema_cols.iter().filter(|c| col_refs.contains(c)).count() == col_refs.len() +} + pub(crate) fn collect_subquery_cols( exprs: &[Expr], subquery_schema: DFSchemaRef, From d4346f384b5a843997a9c0fcb37878b5007a024e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 23 Jun 2024 08:10:14 -0400 Subject: [PATCH 2/3] Improve naming and DRY --- datafusion/optimizer/src/push_down_filter.rs | 9 +++------ datafusion/optimizer/src/utils.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index d19fafb99ae2..db64cb86db45 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -41,7 +41,7 @@ use datafusion_expr::{ }; use crate::optimizer::ApplyOrder; -use crate::utils::has_all_refs; +use crate::utils::has_all_column_refs; use crate::{OptimizerConfig, OptimizerRule}; /// Optimizer rule for pushing (moving) filter expressions down in a plan so @@ -200,9 +200,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result>(); - let columns = predicate.column_refs(); - - Ok(has_all_refs(&schema_columns, &columns)) + Ok(has_all_column_refs(predicate, &schema_columns)) } /// Determine whether the predicate can evaluate as the join conditions @@ -369,8 +367,7 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet) -> Option { - let columns = expr.column_refs(); - if has_all_refs(schema_columns, &columns) { + if has_all_column_refs(expr, schema_columns) { predicate = Some(expr.clone()); } } diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 982ed3bc1776..aeaed8d899db 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -66,14 +66,14 @@ pub fn optimize_children( } } -/// Returns true if all columns in col_refs are in `schema_cols` -/// -/// Note: can't use `HashSet::intersect` here because they have different types. -pub(crate) fn has_all_refs( +/// Returns true if `expr` contains all columns in `schema_cols` +pub(crate) fn has_all_column_refs( + expr: &Expr, schema_cols: &HashSet, - col_refs: &HashSet<&Column>, ) -> bool { - schema_cols.iter().filter(|c| col_refs.contains(c)).count() == col_refs.len() + let column_refs = expr.column_refs(); + // note can't use HashSet::intersect because of different types (owned vs References) + schema_cols.iter().filter(|c| column_refs.contains(c)).count() == column_refs.len() } pub(crate) fn collect_subquery_cols( From 98adb5e64a5084d08f2c7a766a8d49a96d1c0fa9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 23 Jun 2024 09:46:15 -0400 Subject: [PATCH 3/3] fmt --- datafusion/optimizer/src/utils.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index aeaed8d899db..05b1744d90c5 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -67,13 +67,14 @@ pub fn optimize_children( } /// Returns true if `expr` contains all columns in `schema_cols` -pub(crate) fn has_all_column_refs( - expr: &Expr, - schema_cols: &HashSet, -) -> bool { +pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet) -> bool { let column_refs = expr.column_refs(); // note can't use HashSet::intersect because of different types (owned vs References) - schema_cols.iter().filter(|c| column_refs.contains(c)).count() == column_refs.len() + schema_cols + .iter() + .filter(|c| column_refs.contains(c)) + .count() + == column_refs.len() } pub(crate) fn collect_subquery_cols(