Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 4 additions & 62 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_data_access::object_store::ObjectStore;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_to_columns,
};
use std::convert::TryFrom;
use std::iter;
use std::{
Expand Down Expand Up @@ -600,7 +603,7 @@ impl LogicalPlanBuilder {
.into_iter()
.try_for_each::<_, Result<()>>(|expr| {
let mut columns: HashSet<Column> = HashSet::new();
utils::expr_to_columns(&expr, &mut columns)?;
expr_to_columns(&expr, &mut columns)?;

columns.into_iter().for_each(|c| {
if schema.field_from_column(&c).is_err() {
Expand Down Expand Up @@ -1142,67 +1145,6 @@ pub fn project_with_alias(
}))
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub(crate) fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let using_columns = plan.using_columns()?;
let columns_to_skip = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one column in projection
.flat_map(|cols| {
let mut cols = cols.into_iter().collect::<Vec<_>>();
// sort join columns to make sure we consistently keep the same
// qualified column
cols.sort();
cols.into_iter().skip(1)
})
.collect::<HashSet<_>>();

if columns_to_skip.is_empty() {
Ok(schema
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect::<Vec<Expr>>())
} else {
Ok(schema
.fields()
.iter()
.filter_map(|f| {
let col = f.qualified_column();
if !columns_to_skip.contains(&col) {
Some(Expr::Column(col))
} else {
None
}
})
.collect::<Vec<Expr>>())
}
}

pub(crate) fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let qualified_fields: Vec<DFField> = schema
.fields_with_qualified(qualifier)
.into_iter()
.cloned()
.collect();
if qualified_fields.is_empty() {
return Err(DataFusionError::Plan(format!(
"Invalid qualifier {}",
qualifier
)));
}
let qualifier_schema =
DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
expand_wildcard(&qualifier_schema, plan)
}

#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ pub(crate) mod builder;
mod expr;
mod expr_rewriter;
mod expr_simplier;
mod expr_visitor;
pub mod plan;
mod registry;
pub mod window_frames;
pub use builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
};
pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use datafusion_expr::{expr_fn::binary_expr, Operator};

pub use crate::logical_expr::ExprSchemable;
pub use datafusion_expr::{
expr_fn::binary_expr,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
ExprSchemable, Operator,
};
pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
Expand All @@ -56,7 +57,6 @@ pub use expr_rewriter::{
ExprRewriter, RewriteRecursion,
};
pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::logical_plan::{
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns};
use std::collections::{HashMap, HashSet};

/// Filter Push Down optimizer rule pushes filter clauses down the plan
Expand Down Expand Up @@ -276,7 +277,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
.into_iter()
.try_for_each::<_, Result<()>>(|predicate| {
let mut columns: HashSet<Column> = HashSet::new();
utils::expr_to_columns(predicate, &mut columns)?;
expr_to_columns(predicate, &mut columns)?;
if columns.is_empty() {
no_col_predicates.push(predicate)
} else {
Expand Down Expand Up @@ -327,7 +328,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
*predicate = rewrite(predicate, &projection)?;

columns.clear();
utils::expr_to_columns(predicate, columns)?;
expr_to_columns(predicate, columns)?;
}

// optimize inner
Expand All @@ -344,7 +345,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {

// construct set of columns that `aggr_expr` depends on
let mut used_columns = HashSet::new();
utils::exprlist_to_columns(aggr_expr, &mut used_columns)?;
exprlist_to_columns(aggr_expr, &mut used_columns)?;

let agg_columns = aggr_expr
.iter()
Expand Down Expand Up @@ -377,7 +378,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
*predicate = rewrite(predicate, &projection)?;

columns.clear();
utils::expr_to_columns(predicate, columns)?;
expr_to_columns(predicate, columns)?;
}
}

Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::optimizer::utils;
use crate::sql::utils::find_sort_exprs;
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns};
use datafusion_expr::Expr;
use std::{
collections::{BTreeSet, HashSet},
Expand Down Expand Up @@ -159,7 +160,7 @@ fn optimize_plan(
new_fields.push(field.clone());

// gather the new set of required columns
utils::expr_to_columns(&expr[i], &mut new_required_columns)
expr_to_columns(&expr[i], &mut new_required_columns)
} else {
Ok(())
}
Expand Down Expand Up @@ -263,15 +264,15 @@ fn optimize_plan(
new_window_expr.push(expr.clone());
new_required_columns.insert(column);
// add to the new set of required columns
utils::expr_to_columns(expr, &mut new_required_columns)
expr_to_columns(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
}

// for all the retained window expr, find their sort expressions if any, and retain these
utils::exprlist_to_columns(
exprlist_to_columns(
&find_sort_exprs(&new_window_expr),
&mut new_required_columns,
)?;
Expand All @@ -296,7 +297,7 @@ fn optimize_plan(
// * remove any aggregate expression that is not required
// * construct the new set of required columns

utils::exprlist_to_columns(group_expr, &mut new_required_columns)?;
exprlist_to_columns(group_expr, &mut new_required_columns)?;

// Gather all columns needed for expressions in this Aggregate
let mut new_aggr_expr = Vec::new();
Expand All @@ -309,7 +310,7 @@ fn optimize_plan(
new_required_columns.insert(column);

// add to the new set of required columns
utils::expr_to_columns(expr, &mut new_required_columns)
expr_to_columns(expr, &mut new_required_columns)
} else {
Ok(())
}
Expand Down Expand Up @@ -484,7 +485,7 @@ fn optimize_plan(
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
utils::exprlist_to_columns(&expr, &mut new_required_columns)?;
exprlist_to_columns(&expr, &mut new_required_columns)?;

// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
Expand Down
75 changes: 6 additions & 69 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,87 +24,22 @@ use datafusion_expr::logical_plan::{
SubqueryAlias, Window,
};

use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
and, build_join_schema, Column, CreateMemoryTable, CreateView, DFSchemaRef, Expr,
ExprVisitable, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning,
Recursion, Repartition, Union, Values,
and, build_join_schema, CreateMemoryTable, CreateView, DFSchemaRef, Expr, Limit,
LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Repartition, Union, Values,
};
use crate::prelude::lit;
use crate::scalar::ScalarValue;
use crate::{
error::{DataFusionError, Result},
logical_plan::ExpressionVisitor,
};
use datafusion_common::DFSchema;
use datafusion_expr::expr::GroupingSet;
use std::{collections::HashSet, sync::Arc};
use std::sync::Arc;

const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__";
const WINDOW_PARTITION_MARKER: &str = "__DATAFUSION_WINDOW_PARTITION__";
const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";

/// Recursively walk a list of expression trees, collecting the unique set of columns
/// referenced in the expression
pub fn exprlist_to_columns(expr: &[Expr], accum: &mut HashSet<Column>) -> Result<()> {
for e in expr {
expr_to_columns(e, accum)?;
}
Ok(())
}

/// Recursively walk an expression tree, collecting the unique set of column names
/// referenced in the expression
struct ColumnNameVisitor<'a> {
accum: &'a mut HashSet<Column>,
}

impl ExpressionVisitor for ColumnNameVisitor<'_> {
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
match expr {
Expr::Column(qc) => {
self.accum.insert(qc.clone());
}
Expr::ScalarVariable(_, var_names) => {
self.accum.insert(Column::from_name(var_names.join(".")));
}
Expr::Alias(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::Negative(_)
| Expr::Between { .. }
| Expr::Case { .. }
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::Sort { .. }
| Expr::ScalarFunction { .. }
| Expr::ScalarUDF { .. }
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::GroupingSet(_)
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::GetIndexedField { .. } => {}
}
Ok(Recursion::Continue(self))
}
}

/// Recursively walk an expression tree, collecting the unique set of columns
/// referenced in the expression
pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
expr.accept(ColumnNameVisitor { accum })?;
Ok(())
}

/// Convenience rule for writing optimizers: recursively invoke
/// optimize on plan's children and then return a node of the same
/// type. Useful for optimizer rules which want to leave the type
Expand Down Expand Up @@ -623,6 +558,8 @@ mod tests {
use super::*;
use crate::logical_plan::col;
use arrow::datatypes::DataType;
use datafusion_common::Column;
use datafusion_expr::utils::expr_to_columns;
use std::collections::HashSet;

#[test]
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_expr::utils::expr_to_columns;

use crate::execution::context::ExecutionProps;
use crate::physical_plan::planner::create_physical_expr;
Expand Down Expand Up @@ -412,9 +413,9 @@ impl<'a> PruningExpressionBuilder<'a> {
) -> Result<Self> {
// find column name; input could be a more complicated expression
let mut left_columns = HashSet::<Column>::new();
utils::expr_to_columns(left, &mut left_columns)?;
expr_to_columns(left, &mut left_columns)?;
let mut right_columns = HashSet::<Column>::new();
utils::expr_to_columns(right, &mut right_columns)?;
expr_to_columns(right, &mut right_columns)?;
let (column_expr, scalar_expr, columns, correct_operator) =
match (left_columns.len(), right_columns.len()) {
(1, 0) => (left, right, left_columns, op),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ use crate::datasource::TableProvider;
use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit,
normalize_col, normalize_col_with_schemas, union_with_alias, Column, CreateCatalog,
CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
and, col, lit, normalize_col, normalize_col_with_schemas, union_with_alias, Column,
CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, FileType,
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan,
};
use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::{make_decimal_type, normalize_ident, resolve_columns};
use crate::{
error::{DataFusionError, Result},
logical_expr::utils::{expand_qualified_wildcard, expand_wildcard},
physical_plan::aggregates,
physical_plan::udaf::AggregateUDF,
physical_plan::udf::ScalarUDF,
sql::parser::{CreateExternalTable, Statement as DFStatement},
};
use arrow::datatypes::*;
use datafusion_expr::utils::exprlist_to_columns;
use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
use hashbrown::HashMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

//! Expression visitor

use super::Expr;
use crate::{expr::GroupingSet, Expr};
use datafusion_common::Result;
use datafusion_expr::expr::GroupingSet;

/// Controls how the visitor recursion should proceed.
pub enum Recursion<V: ExpressionVisitor> {
Expand Down
Loading