-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13739] [SQL] Push Predicate Through Window #11635
Conversation
@@ -958,6 +958,26 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { | |||
|
|||
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) | |||
|
|||
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be | |||
// pushed beneath must satisfy three conditions: | |||
// 1. involving one and only one column that is part of window partitioning key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can push any deterministic predicate down as all of its references are part of the partition key. So why limit this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, we are unable to push the predicate (key + value) > '2'
in the following query. Predicate push down will change the value of sum(key)
select * from (SELECT key, value, sum(key) over(partition by key, value) as c1 from src)r1 where (key + value) > '2';
The example is copied from the test case of Hive. https://issues.apache.org/jira/secure/attachment/12788757/HIVE-12808.05.patch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I am not sure I agree with you here.
Lets take your example. We filter the rows with the following predicate: key + value > 2
. We use both key
and value
in PARTITION BY
clause, this means they are constant during window function evaluation. The value of key + value > 1
will (as a result) also be constant during window evaluation. This predicate would filter out entire partitions, so we can safely push it down.
I think we can safely push down any deterministic filter which only references partitioning columns. Let me know what you think.
I am not sure why Hive does not push this down, but this could well be due to the way Hive evaluates window functions (PTFs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. In our window function evaluation, both key
and value
are constant. It should be safe to push down key + value > 2
. Will follow your idea. Thanks!
Test build #56329 has finished for PR 11635 at commit
|
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => | ||
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => | ||
cond.references.size == 1 && | ||
cond.references.subsetOf(AttributeSet(w.partitionSpec.flatMap(_.references))) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should build the attribute set ahead, and use partitionAttrs.contains(cond.references.head)
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do. Thanks!
Test build #56358 has finished for PR 11635 at commit
|
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => | ||
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) | ||
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => | ||
cond.references.subsetOf(partitionAttrs) && cond.deterministic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not right. Will fix it later.
Test build #56369 has finished for PR 11635 at commit
|
Test build #56383 has finished for PR 11635 at commit
|
if (pushDown.nonEmpty) { | ||
val pushDownPredicate = pushDown.reduce(And) | ||
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) | ||
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Math.pow(NIT, 100): What does the style guide say about ternary expression onliners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nevermind, I looked it up. It is allowed.
@gatorsmile I left two minor comments in the tests. Could you touch these up? I'll merge after that. |
Test build #56875 has finished for PR 11635 at commit
|
LGTM |
Merging to master. Thanks! |
What changes were proposed in this pull request?
For performance, predicates can be pushed through Window if and only if the following conditions are satisfied:
How was this patch tested?
TODO: