New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-4589: [Rust] Projection push down query optimizer rule #3664
Conversation
@sunchao @paddyhoran @nevi-me This is ready for review |
use arrow::error::Result; | ||
use std::rc::Rc; | ||
|
||
/// An optimizer rules performs a transformation on a logical plan to produce an optimized logical plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we also restrict the comments to be 90 characters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems is still over 90 characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a second commit to fix this
// sort the projection otherwise we get non-deterministic behavior | ||
projection.sort(); | ||
|
||
// now that the table scan is returning a different schema we need to create a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it "returning a different schema" - seems the same schema is still returned?
Also, to help me understand, does each plan operator has its own schema and it could be different from the global schema (e.g., the schema of the input source). If so, is the column index the index to the schema of the current schema (e.g., column index in the expr
of Sort
will point to the schema
in the Sort
plan)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you just found a bug. Good catch. It should be returning the schema after the projection has been applied.
Yes each plan operator has its own schema (for its output). In some cases (filter, limit, sort) the schema does not change so they can just delegate to their input relation.
Column indexes are always for the schema of the input relation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a fix for the bug and added a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column indexes are always for the schema of the input relation.
Does the schema of the input relation change? I'm still not sure why we need to rewrite the column indexes - can they always point to the complete schema of the input source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logical plan created via the SQL query planner (and the DataFrame API when we have one) does just refer to the original table schema.
The query optimizer transforms the plan and pushes the projection down to the TableScan so that we basically pretend the table only contains the columns we care about. The rest of the plan is then rewritten to be relative to that.
Each operator in the plan is relative to its input and doesn't know about the underlying table schema which could be many levels down, especially once we have joins and subqueries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess one of the reasons for doing this, other than having a concise and simple to comprehend plan, is that ultimately these are indexes into RecordBatch instances.
Let's say we have a csv/parquet file with 300 columns and the query only references 12 of them... If we don't do this rewriting then we are going to have to load a RecordBatch with 300 columns where 288 of them are empty arrays / or empty options, or we have to have some special implementation of RecordBatch which does a mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Makes sense. Thanks.
} | ||
|
||
fn optimize(plan: &LogicalPlan) -> Rc<LogicalPlan> { | ||
let rule: Rc<RefCell<OptimizerRule>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need Rc
and RefCell
here? can we do:
let mut rule = ProjectionPushDown::new();
rule.optimize(plan).unwrap()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. I was just trying to case from ProjectionPushDown
to OptimizerRule
since eventually there will be a list of rules to apply. I will simplify this for now though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Hi @andygrove, I'll be able to take a look on Monday GMT morning |
This PR adds the first query optimizer rule, which rewrites a logical plan to push the projection down to the TableScan. Once this is merged, I will create a follow up PR to integrate this into the query engine so that only the necessary columns are loaded from disk. Author: Andy Grove <andygrove73@gmail.com> Closes apache#3664 from andygrove/ARROW-4589-wip and squashes the following commits: b876f28 <Andy Grove> revert formatting change that broke the tests 2051deb <Andy Grove> formatting comments and test strings to be < 90 columns wide 8effde3 <Andy Grove> Address PR feedback, fix bug, add extra unit test ecdd32a <Andy Grove> refactor code to reduce duplication 6229b32 <Andy Grove> refactor code to reduce duplication f959500 <Andy Grove> implement projection push down for rest of logical plan variants 5fd5382 <Andy Grove> implement collect_expr and rewrite_expr for all expression types bd49f17 <Andy Grove> improve error handling 92918dd <Andy Grove> Implement projection push-down for selection and make projection deterministic a80cfdf <Andy Grove> Implement mapping and expression rewrite logic 26fd3b4 <Andy Grove> revert change d7c4822 <Andy Grove> formatting and add assertion to test e81af14 <Andy Grove> Roughing out projection push down rule
This PR adds the first query optimizer rule, which rewrites a logical plan to push the projection down to the TableScan.
Once this is merged, I will create a follow up PR to integrate this into the query engine so that only the necessary columns are loaded from disk.