-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Eliminate multi limit-offset nodes to emptyRelation #2823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f94f008
f179e3f
bcdd42f
ff837eb
1616314
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,9 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation. | ||
| //! Optimizer rule to replace `LIMIT 0` or | ||
| //! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch` | ||
| //! on a plan with an empty relation. | ||
| //! This saves time in planning and executing the query. | ||
| use crate::{OptimizerConfig, OptimizerRule}; | ||
| use datafusion_common::Result; | ||
|
|
@@ -24,7 +26,9 @@ use datafusion_expr::{ | |
| utils::from_plan, | ||
| }; | ||
|
|
||
| /// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] | ||
| /// Optimization rule that replaces LIMIT 0 or | ||
| /// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch | ||
| /// with an [LogicalPlan::EmptyRelation] | ||
| #[derive(Default)] | ||
| pub struct EliminateLimit; | ||
|
|
||
|
|
@@ -35,35 +39,98 @@ impl EliminateLimit { | |
| } | ||
| } | ||
|
|
||
| /// Ancestor indicates the current ancestor in the LogicalPlan tree | ||
| /// when traversing down related to "eliminate limit". | ||
| enum Ancestor { | ||
| /// Limit | ||
| FromLimit { skip: Option<usize> }, | ||
| /// Other nodes that don't affect the adjustment of "Limit" | ||
| NotRelevant, | ||
| } | ||
|
|
||
| /// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] | ||
| /// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal to current's fetch | ||
| /// with an [LogicalPlan::EmptyRelation] | ||
| fn eliminate_limit( | ||
| _optimizer: &EliminateLimit, | ||
| ancestor: &Ancestor, | ||
| plan: &LogicalPlan, | ||
| _optimizer_config: &OptimizerConfig, | ||
| ) -> Result<LogicalPlan> { | ||
| match plan { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Limit with only skip should be handled here. what do you think about this?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I thought when the current node is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ancestor LIMIT node is OFFSET 5 for query "SELECT * FROM (SELECT * FROM test_a LIMIT 5) AS test_b OFFSET 5".
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see, I misunderstood your code. |
||
| LogicalPlan::Limit(Limit { | ||
| skip, fetch, input, .. | ||
| }) => { | ||
| let ancestor_skip = match ancestor { | ||
| Ancestor::FromLimit { skip, .. } => skip.unwrap_or(0), | ||
| _ => 0, | ||
| }; | ||
| // If ancestor's skip is equal or greater than current's fetch, | ||
| // replaces with an [LogicalPlan::EmptyRelation]. | ||
| // For such query, the inner query(select * from xxx limit 5) should be optimized as an EmptyRelation: | ||
| // select * from (select * from xxx limit 5) a limit 2 offset 5; | ||
| match fetch { | ||
| Some(fetch) => { | ||
| if *fetch == 0 || ancestor_skip >= *fetch { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| return Ok(LogicalPlan::EmptyRelation(EmptyRelation { | ||
| produce_one_row: false, | ||
| schema: input.schema().clone(), | ||
| })); | ||
| } | ||
| } | ||
| None => {} | ||
| } | ||
|
|
||
| let expr = plan.expressions(); | ||
|
|
||
| // apply the optimization to all inputs of the plan | ||
| let inputs = plan.inputs(); | ||
| let new_inputs = inputs | ||
| .iter() | ||
| .map(|plan| { | ||
| eliminate_limit( | ||
| _optimizer, | ||
| &Ancestor::FromLimit { skip: *skip }, | ||
| plan, | ||
| _optimizer_config, | ||
| ) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| from_plan(plan, &expr, &new_inputs) | ||
| } | ||
| // Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes | ||
| _ => { | ||
| // For those plans(projection/sort/..) which do not affect the output rows of sub-plans, we still use ancestor; | ||
| // otherwise, use NotRelevant instead. | ||
| let ancestor = match plan { | ||
| LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } => ancestor, | ||
| _ => &Ancestor::NotRelevant, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is consistent with limit_push_down. |
||
| }; | ||
|
|
||
| let expr = plan.expressions(); | ||
|
|
||
| // apply the optimization to all inputs of the plan | ||
| let inputs = plan.inputs(); | ||
| let new_inputs = inputs | ||
| .iter() | ||
| .map(|plan| { | ||
| eliminate_limit(_optimizer, ancestor, plan, _optimizer_config) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| from_plan(plan, &expr, &new_inputs) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl OptimizerRule for EliminateLimit { | ||
| fn optimize( | ||
| &self, | ||
| plan: &LogicalPlan, | ||
| optimizer_config: &OptimizerConfig, | ||
| ) -> Result<LogicalPlan> { | ||
| match plan { | ||
| LogicalPlan::Limit(Limit { | ||
| fetch: Some(0), | ||
| input, | ||
| .. | ||
| }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation { | ||
| produce_one_row: false, | ||
| schema: input.schema().clone(), | ||
| })), | ||
| // Rest: recurse and find possible LIMIT 0 nodes | ||
| _ => { | ||
| let expr = plan.expressions(); | ||
|
|
||
| // apply the optimization to all inputs of the plan | ||
| let inputs = plan.inputs(); | ||
| let new_inputs = inputs | ||
| .iter() | ||
| .map(|plan| self.optimize(plan, optimizer_config)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| from_plan(plan, &expr, &new_inputs) | ||
| } | ||
| } | ||
| eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config) | ||
| } | ||
|
|
||
| fn name(&self) -> &str { | ||
|
|
@@ -75,7 +142,12 @@ impl OptimizerRule for EliminateLimit { | |
| mod tests { | ||
| use super::*; | ||
| use crate::test::*; | ||
| use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder, sum}; | ||
| use datafusion_common::Column; | ||
| use datafusion_expr::{ | ||
| col, | ||
| logical_plan::{builder::LogicalPlanBuilder, JoinType}, | ||
| sum, | ||
| }; | ||
|
|
||
| fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { | ||
| let rule = EliminateLimit::new(); | ||
|
|
@@ -128,4 +200,114 @@ mod tests { | |
| \n TableScan: test"; | ||
| assert_optimized_plan_eq(&plan, expected); | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way, can we add more tests here to handle all combination of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I'll add more test cases. |
||
| #[test] | ||
| fn limit_fetch_with_ancestor_limit_skip() { | ||
| let table_scan = test_table_scan().unwrap(); | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .aggregate(vec![col("a")], vec![sum(col("b"))]) | ||
| .unwrap() | ||
| .limit(None, Some(2)) | ||
| .unwrap() | ||
| .limit(Some(2), None) | ||
| .unwrap() | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| // No aggregate / scan / limit | ||
| let expected = "Limit: skip=2, fetch=None\ | ||
| \n EmptyRelation"; | ||
| assert_optimized_plan_eq(&plan, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn multi_limit_offset_sort_eliminate() { | ||
| let table_scan = test_table_scan().unwrap(); | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .aggregate(vec![col("a")], vec![sum(col("b"))]) | ||
| .unwrap() | ||
| .limit(None, Some(2)) | ||
| .unwrap() | ||
| .sort(vec![col("a")]) | ||
| .unwrap() | ||
| .limit(Some(2), Some(1)) | ||
| .unwrap() | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let expected = "Limit: skip=2, fetch=1\ | ||
| \n Sort: #test.a\ | ||
| \n EmptyRelation"; | ||
| assert_optimized_plan_eq(&plan, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn limit_fetch_with_ancestor_limit_fetch() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| let table_scan = test_table_scan().unwrap(); | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .aggregate(vec![col("a")], vec![sum(col("b"))]) | ||
| .unwrap() | ||
| .limit(None, Some(2)) | ||
| .unwrap() | ||
| .sort(vec![col("a")]) | ||
| .unwrap() | ||
| .limit(None, Some(1)) | ||
| .unwrap() | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let expected = "Limit: skip=None, fetch=1\ | ||
| \n Sort: #test.a\ | ||
| \n Limit: skip=None, fetch=2\ | ||
| \n Aggregate: groupBy=[[#test.a]], aggr=[[SUM(#test.b)]]\ | ||
| \n TableScan: test"; | ||
| assert_optimized_plan_eq(&plan, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn limit_with_ancestor_limit() { | ||
| let table_scan = test_table_scan().unwrap(); | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .aggregate(vec![col("a")], vec![sum(col("b"))]) | ||
| .unwrap() | ||
| .limit(Some(2), Some(1)) | ||
| .unwrap() | ||
| .sort(vec![col("a")]) | ||
| .unwrap() | ||
| .limit(Some(3), Some(1)) | ||
| .unwrap() | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let expected = "Limit: skip=3, fetch=1\ | ||
| \n Sort: #test.a\ | ||
| \n EmptyRelation"; | ||
| assert_optimized_plan_eq(&plan, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn limit_join_with_ancestor_limit() { | ||
| let table_scan = test_table_scan().unwrap(); | ||
| let table_scan_inner = test_table_scan_with_name("test1").unwrap(); | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .limit(Some(2), Some(1)) | ||
| .unwrap() | ||
| .join_using( | ||
| &table_scan_inner, | ||
| JoinType::Inner, | ||
| vec![Column::from_name("a".to_string())], | ||
| ) | ||
| .unwrap() | ||
| .limit(Some(3), Some(1)) | ||
| .unwrap() | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let expected = "Limit: skip=3, fetch=1\ | ||
| \n Inner Join: Using #test.a = #test1.a\ | ||
| \n Limit: skip=2, fetch=1\ | ||
| \n TableScan: test\ | ||
| \n TableScan: test1"; | ||
| assert_optimized_plan_eq(&plan, expected); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also add some comments in top this file to clarify the intention of this optimizer rule in line 18 and line 27.