Skip to content

Commit

Permalink
Fix push down limit regression when meet SubqueryAlias (#4425)
Browse files Browse the repository at this point in the history
* support `SubqueryAlias` in `PushdownLimit`

* fix regression for push_down_limit meet subquery-alias
  • Loading branch information
jackwener committed Dec 2, 2022
1 parent fb8eeb2 commit 1aa645f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 21 deletions.
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/view.rs
Expand Up @@ -474,8 +474,7 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
// TODO: limit_push_down support SubqueryAlias
assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10"));
assert!(formatted.contains("ParquetExec: limit=Some(10)"));
Ok(())
}

Expand Down
47 changes: 28 additions & 19 deletions datafusion/optimizer/src/limit_push_down.rs
Expand Up @@ -20,9 +20,7 @@
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
logical_plan::{
Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
},
logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union},
CrossJoin,
};
use std::sync::Arc;
Expand Down Expand Up @@ -123,7 +121,8 @@ impl OptimizerRule for LimitPushDown {
};
let skip = limit.skip;

let plan = match &*limit.input {
let child_plan = &*limit.input;
let plan = match child_plan {
LogicalPlan::TableScan(scan) => {
let limit = if fetch != 0 { fetch + skip } else { 0 };
let new_input = LogicalPlan::TableScan(TableScan {
Expand All @@ -136,21 +135,6 @@ impl OptimizerRule for LimitPushDown {
});
plan.with_new_inputs(&[new_input])?
}

LogicalPlan::Projection(projection) => {
let new_input = LogicalPlan::Limit(Limit {
skip,
fetch: Some(fetch),
input: Arc::new((*projection.input).clone()),
});
// Push down limit directly (projection doesn't change number of rows)
LogicalPlan::Projection(Projection::try_new_with_schema(
projection.expr.clone(),
Arc::new(new_input),
projection.schema.clone(),
)?)
}

LogicalPlan::Union(union) => {
let new_inputs = union
.inputs
Expand Down Expand Up @@ -230,6 +214,14 @@ impl OptimizerRule for LimitPushDown {
});
plan.with_new_inputs(&[new_sort])?
}
LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
// commute
let new_limit =
plan.with_new_inputs(&[
(*(child_plan.inputs().get(0).unwrap())).clone()
])?;
child_plan.with_new_inputs(&[new_limit])?
}
_ => plan.clone(),
};

Expand Down Expand Up @@ -931,4 +923,21 @@ mod test {

assert_optimized_plan_eq(&plan, expected)
}

#[test]
fn push_down_subquery_alias() -> Result<()> {
let scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(scan)
.alias("a")?
.limit(0, Some(1))?
.limit(1000, None)?
.build()?;

let expected = "SubqueryAlias: a\
\n Limit: skip=1000, fetch=0\
\n TableScan: test, fetch=0";

assert_optimized_plan_eq(&plan, expected)
}
}

0 comments on commit 1aa645f

Please sign in to comment.