Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl LimitPushDown {
}
}

fn is_no_join_condition(join: &Join) -> bool {
join.on.is_empty() && join.filter.is_none()
}

fn push_down_join(
join: &Join,
left_limit: Option<usize>,
Expand Down Expand Up @@ -192,6 +196,24 @@ impl OptimizerRule for LimitPushDown {
LogicalPlan::Join(join) => {
let limit = fetch + skip;
let new_join = match join.join_type {
JoinType::Left | JoinType::Right | JoinType::Full
if is_no_join_condition(join) =>
Copy link
Contributor

@alamb alamb Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how often there will be no join conditions 🤔 that effectively then becomes a CROSS JOIN

To be clear I think the optimization is still correct, I just wonder how often it will appear in real queries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how often there will be no join conditions 🤔 that effectively then becomes a CROSS JOIN

To be clear I think the optimization is still correct, I just wonder how often it will appear in real queries

Good question for this, in the current framework of optimizer in the datafusion, the join query will be converted to the cross join by other rule.

The limit push down rule just do the best to optimize the plan.

Some SQL are generated by the program, and the join type and on condition are added by the program, it will generate sql like select * from left_table right join right_table.

{
// push left and right
push_down_join(join, Some(limit), Some(limit))
}
JoinType::LeftSemi | JoinType::LeftAnti
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget RightSemi/RightAnti?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 good catch

if is_no_join_condition(join) =>
{
// push left
push_down_join(join, Some(limit), None)
}
JoinType::RightSemi | JoinType::RightAnti
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the right semi and right anti
cc @jackwener

if is_no_join_condition(join) =>
{
// push right
push_down_join(join, None, Some(limit))
}
JoinType::Left => push_down_join(join, Some(limit), None),
JoinType::Right => push_down_join(join, None, Some(limit)),
Comment on lines 217 to 218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those are the lesser restrictive versions of push down (only to left/right side)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these

JoinType::Left | JoinType::Right | JoinType::Full
                        if is_no_join_condition(join) =>

the on condition is empty, the rule can be applied, but if the on condition is not empty,

     JoinType::Left => push_down_join(join, Some(limit), None),
                    JoinType::Right => push_down_join(join, None, Some(limit)),

can be applied.

cc @jackwener

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understanded it

_ => push_down_join(join, None, None),
Expand Down Expand Up @@ -606,6 +628,142 @@ mod test {
assert_optimized_plan_eq(&outer_query, expected)
}

#[test]
fn limit_should_push_down_join_without_condition() -> Result<()> {
let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;
let left_keys: Vec<&str> = Vec::new();
let right_keys: Vec<&str> = Vec::new();
let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
&LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::Left,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n Left Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_eq(&plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
&LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::Right,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n Right Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_eq(&plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
&LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::Full,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n Full Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_eq(&plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
&LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::LeftSemi,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n LeftSemi Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n TableScan: test2";

assert_optimized_plan_eq(&plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
&LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::LeftAnti,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n LeftAnti Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n TableScan: test2";

assert_optimized_plan_eq(&plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
&LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::RightSemi,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n RightSemi Join: \
\n TableScan: test\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_eq(&plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1)
.join(
&LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::RightAnti,
(left_keys, right_keys),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n RightAnti Join: \
\n TableScan: test\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_eq(&plan, expected)
}

#[test]
fn limit_should_push_down_left_outer_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
Expand Down