Skip to content

Commit

Permalink
ARROW-8259: [Rust] [DataFusion] ProjectionPushDown now respects LIMIT
Browse files Browse the repository at this point in the history
ProjectionPushDown now respects LIMIT.

I had to update the tests because they were not referencing any columns in the input file and that caused the new arrow reader to fail. I will file a separate issue to add validation for this case.

Closes #6753 from andygrove/ARROW-8259

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
andygrove committed Mar 29, 2020
1 parent e33fec7 commit fd51e9d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
39 changes: 30 additions & 9 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,13 @@ impl ProjectionPushDown {
projection: Some(projection),
})
}
LogicalPlan::Limit {
expr,
input,
schema,
} => Ok(LogicalPlan::Limit {
expr: expr.clone(),
input: input.clone(),
schema: schema.clone(),
}),
LogicalPlan::Limit { expr, input, .. } => {
// Note that limit expressions are scalar values so there is no need to
// rewrite them but we do need to optimize the input to the limit plan
LogicalPlanBuilder::from(&self.optimize_plan(&input, accum, mapping)?)
.limit(expr.clone())?
.build()
}
LogicalPlan::CreateExternalTable {
schema,
name,
Expand Down Expand Up @@ -255,6 +253,7 @@ mod tests {

use super::*;
use crate::logicalplan::Expr::*;
use crate::logicalplan::ScalarValue;
use crate::test::*;
use arrow::datatypes::DataType;
use std::sync::Arc;
Expand Down Expand Up @@ -348,6 +347,28 @@ mod tests {
Ok(())
}

#[test]
fn table_limit() -> Result<()> {
let table_scan = test_table_scan()?;
assert_eq!(3, table_scan.schema().fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);

let plan = LogicalPlanBuilder::from(&table_scan)
.project(vec![Column(2), Column(0)])?
.limit(Expr::Literal(ScalarValue::UInt32(5)))?
.build()?;

assert_fields_eq(&plan, vec!["c", "a"]);

let expected = "Limit: UInt32(5)\
\n Projection: #1, #0\
\n TableScan: test projection=Some([0, 2])";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimized_plan = optimize(plan).expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
Expand Down
6 changes: 3 additions & 3 deletions rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ fn csv_query_cast_literal() {
fn csv_query_limit() {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx);
let sql = "SELECT 0 FROM aggregate_test_100 LIMIT 2";
let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 2";
let actual = execute(&mut ctx, sql).join("\n");
let expected = "0\n0".to_string();
let expected = "\"c\"\n\"d\"".to_string();
assert_eq!(expected, actual);
}

Expand Down Expand Up @@ -283,7 +283,7 @@ fn csv_query_limit_with_same_nbr_of_rows() {
fn csv_query_limit_zero() {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx);
let sql = "SELECT 0 FROM aggregate_test_100 LIMIT 0";
let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 0";
let actual = execute(&mut ctx, sql).join("\n");
let expected = "".to_string();
assert_eq!(expected, actual);
Expand Down

0 comments on commit fd51e9d

Please sign in to comment.