-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9082][SQL] Filter using non-deterministic expressions should not be pushed down #7446
Conversation
cc @yhuai |
Test build #37499 has finished for PR 7446 at commit
|
test("nondeterministic: can't push down filter through project") { | ||
val originalQuery = testRelation | ||
.select(Rand(10).as('rand)) | ||
.where('rand > 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is better to use a condition having both deterministic and non-deterministic expressions, e.g.
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 && 'a > 5)
Test build #37518 has finished for PR 7446 at commit
|
Thanks for the fix! There is one thing I think it is better to double check. Let's say I have a partitioned Parquet table and I select a few columns from it and then I have predicates on top of it. These predicates have non-deterministic expressions, a predicate on the partition column, and some predicates that can be pushed down to parquet table scan (pushed into parquet's reader). With our fix, will partitioning column get correctly pruned and those predicates get pushed down to parquet's reader? |
test("nondeterministic: can't push down filter through project") { | ||
val originalQuery = testRelation | ||
.select(Rand(10).as('rand), 'a) | ||
.where('rand > 5 && 'a > 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we should optimize in to
testRelation
.where('a > 5)
.select(Rand(10).as('rand), 'a)
.where('rand > 5)
Can we do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it may require much more changes. I am fine if we do that in a separate pr in future.
Test build #37527 has finished for PR 7446 at commit
|
@yhuai I believe expressions with UDF(s) are never pushed down in Parquet or any other data sources. Only simple comparison and string predicates dealing with constants can be pushed down. I'm double checking partition pruning. |
Yeah. But my concern is if this fix will prevent any legitimate predicates from being pushed down. |
// We only push down filter if their overlapped expressions are all | ||
// deterministic. | ||
val hasNondeterministic = condition.collect { | ||
case a: Attribute if aliasMap.contains(a) => aliasMap(a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably can not use the contains
here, as we have to use the semanticEquals
for finding the identical expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But aliasMap
is AttributeMap
, I think it should be safe to call contains
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes, the original code seems has bug, it use the .toMap
val hasNondeterministic = projectList1.flatMap(_.collect { | ||
case a: Attribute if aliasMap.contains(a) => aliasMap(a).child | ||
}).exists(_.find(!_.deterministic).isDefined) | ||
val canCollapse = projectList1.find(hasNondeterministic(_, aliasMap)).isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an unrelated but small change, we don't need to go through the whole project list, can stop once we find nondeterministic expressions.
Test build #37618 has finished for PR 7446 at commit
|
val hasNondeterministic = projectList1.flatMap(_.collect { | ||
case a: Attribute if aliasMap.contains(a) => aliasMap(a).child | ||
}).exists(_.find(!_.deterministic).isDefined) | ||
val noCollapse = projectList1.exists(hasNondeterministic(_, aliasMap)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an unrelated but small change, we don't need to go through the whole project list, can stop once we find nondeterministic expressions.
Test build #37630 has finished for PR 7446 at commit
|
sourceAliases: AttributeMap[Alias]) = { | ||
project.exists { | ||
case a: Attribute => | ||
sourceAliases.get(a).map(_.child.exists(!_.deterministic)).getOrElse(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sourceAliases.get(a).exists(_.child.exists(!_.deterministic))
Test build #37901 has finished for PR 7446 at commit
|
ping @yhuai |
@yhuai Sorry that I misunderstood your question at first. I think this PR is safe for normal filter push-down. Verified that partition pruning and Parquet filter push-down are both working properly. |
LGTM |
LGTM. I am merging it to master. |
// Split the condition into small conditions by `And`, so that we can push down part of this | ||
// condition without nondeterministic expressions. | ||
val andConditions = splitConjunctivePredicates(condition) | ||
val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use a partition
at here will be better?
No description provided.