[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.#14012
[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.#14012jiangxb1987 wants to merge 7 commits intoapache:masterfrom
Conversation
…dicates currectly in non-deterministic condition.
|
ok to test |
|
Test build #61691 has finished for PR 14012 at commit
|
|
add to whitelist |
| val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => | ||
| cond.references.subsetOf(partitionAttrs) && cond.deterministic && | ||
| isPredicatePushdownAble = isPredicatePushdownAble && cond.deterministic | ||
| isPredicatePushdownAble && cond.references.subsetOf(partitionAttrs) && |
There was a problem hiding this comment.
The following can be easier to read:
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)
val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs) && partitionAttrs.forall(_.isInstanceOf[Attribute])
}
val stayUp = rest ++ containingNonDeterministicThere was a problem hiding this comment.
And we should move the partitionAttrs.forall(_.isInstanceOf[Attribute]) predicate out of the closure.
There was a problem hiding this comment.
Sure, I'll update that!
|
Test build #61748 has finished for PR 14012 at commit
|
|
Test build #61751 has finished for PR 14012 at commit
|
|
Test build #61752 has finished for PR 14012 at commit
|
|
Test build #61753 has finished for PR 14012 at commit
|
|
cc @liancheng please review this PR, thanks! |
| @@ -1106,21 +1106,32 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { | |||
| // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be | |||
| // pushed beneath must satisfy the following two conditions: | |||
|
LGTM except for some minor comments. Thanks for improving this! |
|
One more thing, please complete the PR title. |
| // This is for ensuring all the partitioning expressions have been converted to alias | ||
| // in Analyzer. Thus, we do not need to check if the expressions in conditions are | ||
| // the same as the expressions used in partitioning columns. | ||
| if (partitionAttrs.forall(_.isInstanceOf[Attribute])) { |
There was a problem hiding this comment.
I don't think this check is necessary. partitionAttrs is AttributeSet and AttributeSet extends Traversable[Attribute].
There was a problem hiding this comment.
Yep, I'll remove that check, thanks!
|
is it a typo in PR title? |
|
Test build #61985 has finished for PR 14012 at commit
|
| * @return (pushDown, stayUp) | ||
| */ | ||
| private def splitPushdownPredicates( | ||
| condition: Expression)(specificRules: (Expression) => Boolean) = { |
There was a problem hiding this comment.
hmmm, looks like duplicating these codes is more readable, @liancheng what do you think?
There was a problem hiding this comment.
Yea... @jiangxb1987 Sorry that I had to agree with @cloud-fan. Seems that factoring out this method makes the code harder to understand, mostly because the semantics of specificRules is quite convoluted. Could you please revert this part? Sorry again for the trouble!
There was a problem hiding this comment.
I have reverted this part, thanks!
|
@liancheng please find some time to review the latest updates, thanks! |
|
Test build #62100 has finished for PR 14012 at commit
|
|
Thanks! Merged this to master. |
|
Since this is a correctness fix, albeit a minor one, I'd like to backport it. I'm going to cherry-pick this to branch-2.0. |
…dicates correctly in non-deterministic condition. ## What changes were proposed in this pull request? Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example: ```SELECT a FROM t WHERE rand() < 0.1 AND a = 1``` And ```SELECT a FROM t WHERE a = 1 AND rand() < 0.1``` may call rand() for different times and therefore the output rows differ. This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates. ## How was this patch tested? Expanded related testcases in FilterPushdownSuite. Author: 蒋星博 <jiangxingbo@meituan.com> Closes #14012 from jiangxb1987/ppd. (cherry picked from commit f376c37) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
What changes were proposed in this pull request?
Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example:
SELECT a FROM t WHERE rand() < 0.1 AND a = 1And
SELECT a FROM t WHERE a = 1 AND rand() < 0.1may call rand() for different times and therefore the output rows differ.
This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates.
How was this patch tested?
Expanded related testcases in FilterPushdownSuite.