diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b539544d8372..52d976c45cf3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1259,7 +1259,7 @@ impl DefaultPhysicalPlanner { let join_filter = match filter { Some(expr) => { // Extract columns from filter expression and saved in a HashSet - let cols = expr.to_columns()?; + let cols = expr.column_refs(); // Collect left & right field indices, the field indices are sorted in ascending order let left_field_indices = cols diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2f1ece32ab15..434f4dace1de 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -534,11 +534,11 @@ impl LogicalPlanBuilder { .clone() .into_iter() .try_for_each::<_, Result<()>>(|expr| { - let columns = expr.to_columns()?; + let columns = expr.column_refs(); columns.into_iter().for_each(|c| { - if schema.field_from_column(&c).is_err() { - missing_cols.push(c); + if !schema.has_column(c) { + missing_cols.push(c.clone()); } }); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6baabfcc7130..9018b6f5ce62 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -872,7 +872,7 @@ pub fn can_hash(data_type: &DataType) -> bool { /// Check whether all columns are from the schema. pub fn check_all_columns_from_schema( - columns: &HashSet, + columns: &HashSet<&Column>, schema: &DFSchema, ) -> Result { for col in columns.iter() { @@ -900,8 +900,8 @@ pub fn find_valid_equijoin_key_pair( left_schema: &DFSchema, right_schema: &DFSchema, ) -> Result> { - let left_using_columns = left_key.to_columns()?; - let right_using_columns = right_key.to_columns()?; + let left_using_columns = left_key.column_refs(); + let right_using_columns = right_key.column_refs(); // Conditions like a = 10, will be added to non-equijoin. if left_using_columns.is_empty() || right_using_columns.is_empty() { diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 7d1290204eb7..5725a725e64a 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -249,7 +249,7 @@ fn check_aggregation_in_scalar_subquery( let mut group_columns = agg .group_expr .iter() - .map(|group| Ok(group.to_columns()?.into_iter().collect::>())) + .map(|group| Ok(group.column_refs().into_iter().cloned().collect::>())) .collect::>>()? .into_iter() .flatten(); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 013e74c8fcc0..fa432ad76de5 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -41,6 +41,7 @@ use datafusion_expr::{ }; use crate::optimizer::ApplyOrder; +use crate::utils::has_all_column_refs; use crate::{OptimizerConfig, OptimizerRule}; /// Optimizer rule for pushing (moving) filter expressions down in a plan so @@ -199,13 +200,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result>(); - let columns = predicate.to_columns()?; - - Ok(schema_columns - .intersection(&columns) - .collect::>() - .len() - == columns.len()) + Ok(has_all_column_refs(predicate, &schema_columns)) } /// Determine whether the predicate can evaluate as the join conditions @@ -372,14 +367,7 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet) -> Option { - let columns = expr.to_columns().ok().unwrap(); - - if schema_columns - .intersection(&columns) - .collect::>() - .len() - == columns.len() - { + if has_all_column_refs(expr, schema_columns) { predicate = Some(expr.clone()); } } diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 00aaff196c3b..05b1744d90c5 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -17,7 +17,7 @@ //! Utility functions leveraged by the query optimizer rules -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use crate::{OptimizerConfig, OptimizerRule}; @@ -66,6 +66,17 @@ pub fn optimize_children( } } +/// Returns true if `expr` contains all columns in `schema_cols` +pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet) -> bool { + let column_refs = expr.column_refs(); + // note can't use HashSet::intersect because of different types (owned vs References) + schema_cols + .iter() + .filter(|c| column_refs.contains(c)) + .count() + == column_refs.len() +} + pub(crate) fn collect_subquery_cols( exprs: &[Expr], subquery_schema: DFSchemaRef,