Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: propagate EmptyRelation for more join types #10963

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! This file contains an end to end test of extracting statitics from parquet files.
//! This file contains an end to end test of extracting statistics from parquet files.
//! It writes data into a parquet file, reads statistics and verifies they are correct

use std::default::Default;
Expand Down Expand Up @@ -716,8 +716,8 @@ async fn test_timestamp() {
// "seconds_timezoned" --> TimestampSecondArray
// "names" --> StringArray
//
// The file is created by 4 record batches, each has 5 rowws.
// Since the row group isze is set to 5, those 4 batches will go into 4 row groups
// The file is created by 4 record batches, each has 5 rows.
// Since the row group size is set to 5, those 4 batches will go into 4 row groups
// This creates a parquet files of 4 columns named "nanos", "nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", "seconds", "seconds_timezoned"
let reader = TestReader {
scenario: Scenario::Timestamps,
Expand Down Expand Up @@ -2039,7 +2039,7 @@ async fn test_missing_statistics() {
expected_min: Arc::new(Int64Array::from(vec![None])),
expected_max: Arc::new(Int64Array::from(vec![None])),
expected_null_counts: UInt64Array::from(vec![None]),
expected_row_counts: Some(UInt64Array::from(vec![3])), // stil has row count statistics
expected_row_counts: Some(UInt64Array::from(vec![3])), // still has row count statistics
column_name: "i64",
check: Check::RowGroup,
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/eliminate_one_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ mod tests {
}

fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_with_rules(
assert_optimized_plan_with_rules(
vec![Arc::new(EliminateOneUnion::new())],
plan,
expected,
true,
)
}

Expand Down
177 changes: 154 additions & 23 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;

use datafusion_common::tree_node::Transformed;
use datafusion_common::JoinType::Inner;
use datafusion_common::JoinType;
use datafusion_common::{internal_err, plan_err, Result};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::LogicalPlan;
Expand Down Expand Up @@ -94,29 +94,62 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
}

LogicalPlan::Join(ref join) if join.join_type == Inner => {
LogicalPlan::Join(ref join) => {
// TODO: For Join, more join type need to be careful:
jackwener marked this conversation as resolved.
Show resolved Hide resolved
// For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
// For LeftSemi Join, if the right side is empty, the Join result is empty.
// For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??).
// For RightOuter/RightSemi/RightAnti Join, only the right side is empty, the Join result is empty.
// For RightSemi Join, if the left side is empty, the Join result is empty.
// For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??).
// For Full Join, only both sides are empty, the Join result is empty.
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
// columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
// columns + left side columns replaced with null values.
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
if left_empty || right_empty {
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked that this code faithfully implements the semantics in the comments, and I did some mental review and verification to convince myself that these semantics are correct

i wonder if @jackwener has a moment to double check too?

match join.join_type {
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
},
)));
}),
)),
JoinType::Left if left_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
JoinType::Right if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
JoinType::LeftSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
})),
),
JoinType::RightSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
})),
),
JoinType::LeftAnti if left_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
JoinType::RightAnti if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
jackwener marked this conversation as resolved.
Show resolved Hide resolved
_ => Ok(Transformed::no(LogicalPlan::Join(join.clone()))),
}
Ok(Transformed::no(LogicalPlan::Join(join.clone())))
}
LogicalPlan::Aggregate(ref agg) => {
if !agg.group_expr.is_empty() {
Expand Down Expand Up @@ -222,7 +255,7 @@ mod tests {
use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_nested_union::EliminateNestedUnion;
use crate::test::{
assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan,
assert_optimized_plan_eq, assert_optimized_plan_with_rules, test_table_scan,
test_table_scan_fields, test_table_scan_with_name,
};

Expand All @@ -232,18 +265,20 @@ mod tests {
assert_optimized_plan_eq(Arc::new(PropagateEmptyRelation::new()), plan, expected)
}

fn assert_together_optimized_plan_eq(
fn assert_together_optimized_plan(
plan: LogicalPlan,
expected: &str,
eq: bool,
) -> Result<()> {
assert_optimized_plan_eq_with_rules(
assert_optimized_plan_with_rules(
vec![
Arc::new(EliminateFilter::new()),
Arc::new(EliminateNestedUnion::new()),
Arc::new(PropagateEmptyRelation::new()),
],
plan,
expected,
eq,
)
}

Expand Down Expand Up @@ -279,7 +314,7 @@ mod tests {
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -292,7 +327,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;

let expected = "TableScan: test";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -317,7 +352,7 @@ mod tests {
let expected = "Union\
\n TableScan: test1\
\n TableScan: test4";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -342,7 +377,7 @@ mod tests {
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -369,7 +404,7 @@ mod tests {
let expected = "Union\
\n TableScan: test2\
\n TableScan: test3";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -382,7 +417,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;

let expected = "TableScan: test";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -397,7 +432,103 @@ mod tests {
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

fn assert_empty_left_empty_right_lp(
left_empty: bool,
right_empty: bool,
join_type: JoinType,
eq: bool,
) -> Result<()> {
let left_lp = if left_empty {
let left_table_scan = test_table_scan()?;

LogicalPlanBuilder::from(left_table_scan)
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
.build()
} else {
let scan = test_table_scan_with_name("left").unwrap();
LogicalPlanBuilder::from(scan).build()
}?;

let right_lp = if right_empty {
let right_table_scan = test_table_scan_with_name("right")?;

LogicalPlanBuilder::from(right_table_scan)
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
.build()
} else {
let scan = test_table_scan_with_name("right").unwrap();
LogicalPlanBuilder::from(scan).build()
}?;

let plan = LogicalPlanBuilder::from(left_lp)
.join_using(
right_lp,
join_type,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan(plan, expected, eq)
}

#[test]
fn test_join_empty_propagation_rules() -> Result<()> {
// test left join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::Left, true)?;

// test right join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::Right, true)?;

// test left semi join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::LeftSemi, true)?;

// test left semi join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::LeftSemi, true)?;

// test right semi join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::RightSemi, true)?;

// test right semi join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::RightSemi, true)?;

// test left anti join empty left
assert_empty_left_empty_right_lp(true, false, JoinType::LeftAnti, true)?;

// test right anti join empty right
assert_empty_left_empty_right_lp(false, true, JoinType::RightAnti, true)
}

#[test]
fn test_join_empty_propagation_rules_noop() -> Result<()> {
// these cases should not result in an empty relation

// test left join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::Left, false)?;

// test right join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::Right, false)?;

// test left semi with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::LeftSemi, false)?;

// test right semi with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::RightSemi, false)?;

// test left anti join with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::LeftAnti, false)?;

// test left anti with non-empty left and empty right
assert_empty_left_empty_right_lp(false, true, JoinType::LeftAnti, false)?;

// test right anti join with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::RightAnti, false)?;

// test right anti with empty left and non-empty right
assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
}

#[test]
Expand Down Expand Up @@ -430,6 +561,6 @@ mod tests {
let expected = "Projection: a, b, c\
\n TableScan: test";

assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}
}
39 changes: 33 additions & 6 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ pub fn assert_analyzed_plan_eq(

Ok(())
}

pub fn assert_analyzed_plan_ne(
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan:?}");
assert_ne!(formatted_plan, expected);

Ok(())
}

pub fn assert_analyzed_plan_eq_display_indent(
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
Expand Down Expand Up @@ -169,21 +184,33 @@ pub fn assert_optimized_plan_eq(
Ok(())
}

pub fn assert_optimized_plan_eq_with_rules(
fn generate_optimized_plan_with_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
) -> LogicalPlan {
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let config = &mut OptimizerContext::new()
.with_max_passes(1)
.with_skip_failing_rules(false);
let optimizer = Optimizer::with_rules(rules);
let optimized_plan = optimizer
optimizer
.optimize(plan, config, observe)
.expect("failed to optimize plan");
.expect("failed to optimize plan")
}

pub fn assert_optimized_plan_with_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
plan: LogicalPlan,
expected: &str,
eq: bool,
) -> Result<()> {
let optimized_plan = generate_optimized_plan_with_rules(rules, plan);
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(formatted_plan, expected);
if eq {
assert_eq!(formatted_plan, expected);
} else {
assert_ne!(formatted_plan, expected);
}
Ok(())
}

Expand Down
Loading