Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ impl DefaultPhysicalPlanner {
let join_filter = match filter {
Some(expr) => {
// Extract columns from filter expression and saved in a HashSet
let cols = expr.to_columns()?;
let cols = expr.column_refs();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this saves copying all the Columns (which have owned strings in them)


// Collect left & right field indices, the field indices are sorted in ascending order
let left_field_indices = cols
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,11 @@ impl LogicalPlanBuilder {
.clone()
.into_iter()
.try_for_each::<_, Result<()>>(|expr| {
let columns = expr.to_columns()?;
let columns = expr.column_refs();

columns.into_iter().for_each(|c| {
if schema.field_from_column(&c).is_err() {
missing_cols.push(c);
if !schema.has_column(c) {
missing_cols.push(c.clone());
}
});

Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ pub fn can_hash(data_type: &DataType) -> bool {

/// Check whether all columns are from the schema.
pub fn check_all_columns_from_schema(
columns: &HashSet<Column>,
columns: &HashSet<&Column>,
schema: &DFSchema,
) -> Result<bool> {
for col in columns.iter() {
Expand Down Expand Up @@ -900,8 +900,8 @@ pub fn find_valid_equijoin_key_pair(
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<Option<(Expr, Expr)>> {
let left_using_columns = left_key.to_columns()?;
let right_using_columns = right_key.to_columns()?;
let left_using_columns = left_key.column_refs();
let right_using_columns = right_key.column_refs();

// Conditions like a = 10, will be added to non-equijoin.
if left_using_columns.is_empty() || right_using_columns.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ fn check_aggregation_in_scalar_subquery(
let mut group_columns = agg
.group_expr
.iter()
.map(|group| Ok(group.to_columns()?.into_iter().collect::<Vec<_>>()))
.map(|group| Ok(group.column_refs().into_iter().cloned().collect::<Vec<_>>()))
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten();
Expand Down
18 changes: 3 additions & 15 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_expr::{
};

use crate::optimizer::ApplyOrder;
use crate::utils::has_all_column_refs;
use crate::{OptimizerConfig, OptimizerRule};

/// Optimizer rule for pushing (moving) filter expressions down in a plan so
Expand Down Expand Up @@ -199,13 +200,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result<bo
]
})
.collect::<HashSet<_>>();
let columns = predicate.to_columns()?;

Ok(schema_columns
.intersection(&columns)
.collect::<HashSet<_>>()
.len()
== columns.len())
Ok(has_all_column_refs(predicate, &schema_columns))
}

/// Determine whether the predicate can evaluate as the join conditions
Expand Down Expand Up @@ -372,14 +367,7 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> Option<Ex
}
}
_ => {
let columns = expr.to_columns().ok().unwrap();

if schema_columns
.intersection(&columns)
.collect::<HashSet<_>>()
.len()
== columns.len()
{
if has_all_column_refs(expr, schema_columns) {
predicate = Some(expr.clone());
}
}
Expand Down
13 changes: 12 additions & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Utility functions leveraged by the query optimizer rules

use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};

use crate::{OptimizerConfig, OptimizerRule};

Expand Down Expand Up @@ -66,6 +66,17 @@ pub fn optimize_children(
}
}

/// Returns true if `expr` contains all columns in `schema_cols`
pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) -> bool {
let column_refs = expr.column_refs();
// note can't use HashSet::intersect because of different types (owned vs References)
schema_cols
.iter()
.filter(|c| column_refs.contains(c))
.count()
== column_refs.len()
}

pub(crate) fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: DFSchemaRef,
Expand Down