Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ impl OptimizerRule for MyRule {
match plan {
LogicalPlan::Filter(filter) => {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate().clone();
let predicate = filter.predicate.clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
Ok(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input().clone(),
filter.input,
)?))
}
_ => Ok(plan.clone()),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,12 +756,12 @@ impl DefaultPhysicalPlanner {
)?))
}
LogicalPlan::Filter(filter) => {
let physical_input = self.create_initial_plan(filter.input(), session_state).await?;
let physical_input = self.create_initial_plan(&filter.input, session_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = filter.input().schema();
let input_dfschema = filter.input.schema();

let runtime_expr = self.create_physical_expr(
filter.predicate(),
&filter.predicate,
input_dfschema,
&input_schema,
session_state,
Expand Down
14 changes: 2 additions & 12 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,9 +1193,9 @@ pub struct SubqueryAlias {
#[derive(Clone)]
pub struct Filter {
/// The predicate expression, which must have Boolean type.
predicate: Expr,
pub predicate: Expr,
Copy link
Contributor

@tustvold tustvold Dec 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would allow side-stepping the schema validation in try_new? I don't know how important this is

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think it isn't a problem.
Projection Aggregate Subquery .... also is like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can at least document what invariants are required between fields

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would allow side-stepping the schema validation in try_new? I don't know how important this is

Yes, the motivation for the current design (introduced in #3796) was to prevent invalid filters from being created.

/// The incoming logical plan
input: Arc<LogicalPlan>,
pub input: Arc<LogicalPlan>,
}

impl Filter {
Expand Down Expand Up @@ -1236,16 +1236,6 @@ impl Filter {
_ => plan_err!("Could not coerce into Filter!"),
}
}

/// Access the filter predicate expression
pub fn predicate(&self) -> &Expr {
&self.predicate
}

/// Access the filter input plan
pub fn input(&self) -> &Arc<LogicalPlan> {
&self.input
}
}

/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ impl OptimizerRule for CommonSubexprEliminate {
)?))
}
LogicalPlan::Filter(filter) => {
let input = filter.input();
let predicate = filter.predicate();
let input = &filter.input;
let predicate = &filter.predicate;
let input_schema = Arc::clone(input.schema());
let mut id_array = vec![];
expr_to_identifier(
Expand All @@ -134,7 +134,7 @@ impl OptimizerRule for CommonSubexprEliminate {
let (mut new_expr, new_input) = self.rewrite_expr(
&[&[predicate.clone()]],
&[&[id_array]],
filter.input(),
&filter.input,
&mut expr_set,
optimizer_config,
)?;
Expand Down
13 changes: 6 additions & 7 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ impl OptimizerRule for DecorrelateWhereExists {
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
let filter_input = filter.input();
let predicate = &filter.predicate;
let filter_input = &filter.input;

// Apply optimizer rule to current input
let optimized_input = self.optimize(filter_input, optimizer_config)?;
Expand Down Expand Up @@ -173,22 +173,21 @@ fn optimize_exists(
.map_err(|e| context!("cannot optimize non-correlated subquery", e))?;

// split into filters
let subqry_filter_exprs = split_conjunction(subqry_filter.predicate());
let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
let (col_exprs, other_subqry_exprs) =
find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema())?;
find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?;
let (outer_cols, subqry_cols, join_filters) =
exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?;
exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?;
if subqry_cols.is_empty() || outer_cols.is_empty() {
// cannot optimize non-correlated subquery
return Ok(None);
}

// build subquery side of join - the thing the subquery was querying
let mut subqry_plan =
LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone());
let mut subqry_plan = LogicalPlanBuilder::from(subqry_filter.input.as_ref().clone());
if let Some(expr) = conjunction(other_subqry_exprs) {
subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ impl OptimizerRule for DecorrelateWhereIn {
) -> datafusion_common::Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
let filter_input = filter.input();
let predicate = &filter.predicate;
let filter_input = &filter.input;

// Apply optimizer rule to current input
let optimized_input = self.optimize(filter_input, optimizer_config)?;
Expand Down Expand Up @@ -150,18 +150,18 @@ fn optimize_where_in(
let mut other_subqry_exprs = vec![];
if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
// split into filters
let subqry_filter_exprs = split_conjunction(subqry_filter.predicate());
let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
let (col_exprs, other_exprs) =
find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema())
find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())
.map_err(|e| context!("column correlation not found", e))?;
if !col_exprs.is_empty() {
// it's correlated
subqry_input = subqry_filter.input().clone();
subqry_input = subqry_filter.input.clone();
(outer_cols, subqry_cols, join_filters) =
exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)
exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)
.map_err(|e| context!("column correlation not found", e))?;
other_subqry_exprs = other_exprs;
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl OptimizerRule for EliminateCrossJoin {
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => {
let input = (**filter.input()).clone();
let input = filter.input.as_ref().clone();

let mut possible_join_keys: Vec<(Column, Column)> = vec![];
let mut all_inputs: Vec<LogicalPlan> = vec![];
Expand All @@ -86,7 +86,7 @@ impl OptimizerRule for EliminateCrossJoin {
}
}

let predicate = filter.predicate();
let predicate = &filter.predicate;
// join keys are handled locally
let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();

Expand Down
6 changes: 2 additions & 4 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ impl OptimizerRule for EliminateFilter {
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
Some((*v, filter.input()))
}
LogicalPlan::Filter(filter) => match filter.predicate {
Expr::Literal(ScalarValue::Boolean(Some(v))) => Some((v, &filter.input)),
_ => None,
},
_ => None,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ impl OptimizerRule for EliminateOuterJoin {
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => match filter.input().as_ref() {
LogicalPlan::Filter(filter) => match filter.input.as_ref() {
LogicalPlan::Join(join) => {
let mut non_nullable_cols: Vec<Column> = vec![];

extract_non_nullable_columns(
filter.predicate(),
&filter.predicate,
&mut non_nullable_cols,
join.left.schema(),
join.right.schema(),
Expand Down
29 changes: 13 additions & 16 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,14 +524,14 @@ impl OptimizerRule for PushDownFilter {
_ => return utils::optimize_children(self, plan, optimizer_config),
};

let child_plan = &**filter.input();
let child_plan = filter.input.as_ref();
let new_plan = match child_plan {
LogicalPlan::Filter(child_filter) => {
let new_predicate =
and(filter.predicate().clone(), child_filter.predicate().clone());
and(filter.predicate.clone(), child_filter.predicate.clone());
let new_plan = LogicalPlan::Filter(Filter::try_new(
new_predicate,
child_filter.input().clone(),
child_filter.input.clone(),
)?);
return self.optimize(&new_plan, optimizer_config);
}
Expand Down Expand Up @@ -561,7 +561,7 @@ impl OptimizerRule for PushDownFilter {
);
}
let new_predicate =
replace_cols_by_name(filter.predicate().clone(), &replace_map)?;
replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
let new_filter = LogicalPlan::Filter(Filter::try_new(
new_predicate,
subquery_alias.input.clone(),
Expand Down Expand Up @@ -590,7 +590,7 @@ impl OptimizerRule for PushDownFilter {
// re-write all filters based on this projection
// E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
let new_filter = LogicalPlan::Filter(Filter::try_new(
replace_cols_by_name(filter.predicate().clone(), &replace_map)?,
replace_cols_by_name(filter.predicate.clone(), &replace_map)?,
projection.input.clone(),
)?);

Expand All @@ -608,7 +608,7 @@ impl OptimizerRule for PushDownFilter {
}

let push_predicate =
replace_cols_by_name(filter.predicate().clone(), &replace_map)?;
replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
inputs.push(Arc::new(LogicalPlan::Filter(Filter::try_new(
push_predicate,
input.clone(),
Expand All @@ -635,7 +635,7 @@ impl OptimizerRule for PushDownFilter {
used_columns.extend(agg_columns);

let predicates = utils::split_conjunction_owned(utils::cnf_rewrite(
filter.predicate().clone(),
filter.predicate.clone(),
));

let mut keep_predicates = vec![];
Expand All @@ -661,11 +661,8 @@ impl OptimizerRule for PushDownFilter {
)?),
None => (*agg.input).clone(),
};
let new_agg = from_plan(
filter.input(),
&filter.input().expressions(),
&vec![child],
)?;
let new_agg =
from_plan(&filter.input, &filter.input.expressions(), &vec![child])?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Expand All @@ -675,24 +672,24 @@ impl OptimizerRule for PushDownFilter {
}
}
LogicalPlan::Join(join) => {
match push_down_join(filter.input(), join, Some(filter.predicate()))? {
match push_down_join(&filter.input, join, Some(&filter.predicate))? {
Some(optimized_plan) => optimized_plan,
None => plan.clone(),
}
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let predicates = utils::split_conjunction_owned(utils::cnf_rewrite(
filter.predicate().clone(),
filter.predicate.clone(),
));

push_down_all_join(predicates, filter.input(), left, right, vec![])?
push_down_all_join(predicates, &filter.input, left, right, vec![])?
}
LogicalPlan::TableScan(scan) => {
let mut new_scan_filters = scan.filters.clone();
let mut new_predicate = vec![];

let filter_predicates = utils::split_conjunction_owned(
utils::cnf_rewrite(filter.predicate().clone()),
utils::cnf_rewrite(filter.predicate.clone()),
);

for filter_expr in &filter_predicates {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = predicate(filter.predicate())?;
let predicate = predicate(&filter.predicate)?;
let rewritten_predicate = rewrite_predicate(predicate);
let rewritten_expr = normalize_predicate(rewritten_predicate);
Ok(LogicalPlan::Filter(Filter::try_new(
rewritten_expr,
Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?),
Arc::new(Self::optimize(self, &filter.input, _optimizer_config)?),
)?))
}
_ => utils::optimize_children(self, plan, _optimizer_config),
Expand Down
14 changes: 7 additions & 7 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ impl OptimizerRule for ScalarSubqueryToJoin {
match plan {
LogicalPlan::Filter(filter) => {
// Apply optimizer rule to current input
let optimized_input = self.optimize(filter.input(), optimizer_config)?;
let optimized_input = self.optimize(&filter.input, optimizer_config)?;

let (subqueries, other_exprs) =
self.extract_subquery_exprs(filter.predicate(), optimizer_config)?;
self.extract_subquery_exprs(&filter.predicate, optimizer_config)?;

if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
filter.predicate.clone(),
Arc::new(optimized_input),
)?));
}

// iterate through all subqueries in predicate, turning each into a join
let mut cur_input = filter.input().as_ref().clone();
let mut cur_input = filter.input.as_ref().clone();
for subquery in subqueries {
if let Some(optimized_subquery) = optimize_scalar(
&subquery,
Expand All @@ -123,7 +123,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
filter.predicate.clone(),
Arc::new(optimized_input),
)?));
}
Expand Down Expand Up @@ -228,14 +228,14 @@ fn optimize_scalar(

// if there were filters, we use that logical plan, otherwise the plan from the aggregate
let input = if let Some(filter) = filter {
filter.input()
&filter.input
} else {
&aggr.input
};

// if there were filters, split and capture them
let subqry_filter_exprs = if let Some(filter) = filter {
split_conjunction(filter.predicate())
split_conjunction(&filter.predicate)
} else {
vec![]
};
Expand Down
8 changes: 4 additions & 4 deletions datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ impl OptimizerRule for SubqueryFilterToJoin {
match plan {
LogicalPlan::Filter(filter) => {
// Apply optimizer rule to current input
let optimized_input = self.optimize(filter.input(), optimizer_config)?;
let optimized_input = self.optimize(&filter.input, optimizer_config)?;

// Splitting filter expression into components by AND
let filters = utils::split_conjunction(filter.predicate());
let filters = utils::split_conjunction(&filter.predicate);

// Searching for subquery-based filters
let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
Expand All @@ -79,7 +79,7 @@ impl OptimizerRule for SubqueryFilterToJoin {

if !subqueries_in_regular.is_empty() {
return Ok(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
filter.predicate.clone(),
Arc::new(optimized_input),
)?));
};
Expand Down Expand Up @@ -151,7 +151,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
Ok(plan) => plan,
Err(_) => {
return Ok(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
filter.predicate.clone(),
Arc::new(optimized_input),
)?))
}
Expand Down
Loading