Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan

use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{Column, DFSchema, Result};
use datafusion_common::{Column, DFSchema, DataFusionError, Result};
use datafusion_expr::{
col,
expr_rewriter::{replace_col, ExprRewritable, ExprRewriter},
Expand Down Expand Up @@ -165,41 +165,46 @@ fn issue_filters(
// non-preserved side it can be more tricky.
//
// Returns a tuple of booleans - (left_preserved, right_preserved).
fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
fn lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> {
match plan {
LogicalPlan::Join(Join { join_type, .. }) => match join_type {
JoinType::Inner => (true, true),
JoinType::Left => (true, false),
JoinType::Right => (false, true),
JoinType::Full => (false, false),
JoinType::Inner => Ok((true, true)),
JoinType::Left => Ok((true, false)),
JoinType::Right => Ok((false, true)),
JoinType::Full => Ok((false, false)),
// No columns from the right side of the join can be referenced in output
// predicates for semi/anti joins, so whether we specify t/f doesn't matter.
JoinType::Semi | JoinType::Anti => (true, false),
JoinType::Semi | JoinType::Anti => Ok((true, false)),
},
LogicalPlan::CrossJoin(_) => (true, true),
_ => unreachable!("lr_is_preserved only valid for JOIN nodes"),
LogicalPlan::CrossJoin(_) => Ok((true, true)),
_ => Err(DataFusionError::Internal(
"lr_is_preserved only valid for JOIN nodes".to_string(),
)),
}
}

// For a given JOIN logical plan, determine whether each side of the join is preserved
// in terms on join filtering.
// Predicates from join filter can only be pushed to preserved join side.
fn on_lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
fn on_lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> {
match plan {
LogicalPlan::Join(Join { join_type, .. }) => match join_type {
JoinType::Inner => (true, true),
JoinType::Left => (false, true),
JoinType::Right => (true, false),
JoinType::Full => (false, false),
JoinType::Inner => Ok((true, true)),
JoinType::Left => Ok((false, true)),
JoinType::Right => Ok((true, false)),
JoinType::Full => Ok((false, false)),
// Semi/Anti joins can not have join filter.
JoinType::Semi | JoinType::Anti => unreachable!(
JoinType::Semi | JoinType::Anti => Err(DataFusionError::Internal(
"on_lr_is_preserved cannot be appplied to SEMI/ANTI-JOIN nodes"
),
.to_string(),
)),
},
LogicalPlan::CrossJoin(_) => {
unreachable!("on_lr_is_preserved cannot be applied to CROSSJOIN nodes")
}
_ => unreachable!("on_lr_is_preserved only valid for JOIN nodes"),
LogicalPlan::CrossJoin(_) => Err(DataFusionError::Internal(
"on_lr_is_preserved cannot be applied to CROSSJOIN nodes".to_string(),
)),
_ => Err(DataFusionError::Internal(
"on_lr_is_preserved only valid for JOIN nodes".to_string(),
)),
}
}

Expand Down Expand Up @@ -251,7 +256,7 @@ fn optimize_join(
on_filter: Vec<Predicate>,
) -> Result<LogicalPlan> {
// Get pushable predicates from current optimizer state
let (left_preserved, right_preserved) = lr_is_preserved(plan);
let (left_preserved, right_preserved) = lr_is_preserved(plan)?;
let to_left =
get_pushable_join_predicates(&state.filters, left.schema(), left_preserved);
let to_right =
Expand All @@ -267,7 +272,7 @@ fn optimize_join(
let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
((vec![], vec![]), (vec![], vec![]), vec![])
} else {
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan);
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan)?;
let on_to_left =
get_pushable_join_predicates(&on_filter, left.schema(), on_left_preserved);
let on_to_right =
Expand Down