-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20758][SQL] Add Constant propagation optimization #17993
[SPARK-20758][SQL] Add Constant propagation optimization #17993
Conversation
Jenkins test this please |
ok to test |
it is weird that jenkins is not kicking off |
*/ | ||
object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { | ||
|
||
def containsNonConjunctionPredicates(expression: Expression): Boolean = expression match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be more straightfoward:
expression.find {
case _: Not | _: Or => true
}.isDefined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did this change
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case q: LogicalPlan => q transformExpressionsUp { | ||
case and @ (left And right) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case and: And if containsNonConjunctionPredicates(and)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did this change
case and @ (left And right) | ||
if !containsNonConjunctionPredicates(left) && !containsNonConjunctionPredicates(right) => | ||
|
||
val leftEntries = left.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets put the collect in a function, so we can avoid the repetition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
val predicates = (leftEntries.map(_._2) ++ rightEntries.map(_._2)).toSet | ||
|
||
def replaceConstants(expression: Expression) = expression transform { | ||
case a: AttributeReference if constantsMap.contains(a) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the double lookup is necessary. constantsMap.get(a).getOrElse(a)
should cover this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if I do something stupid like i = 1 and ((j = 1) = (j = i))
? I think j = 1
might replaced by 1 = 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch !!! I changed the logic to handle that.
Test build #76964 has finished for PR 17993 at commit
|
|
||
and transform { | ||
case e @ EqualTo(_, _) if !predicates.contains(e) && | ||
e.references.exists(ref => constantsMap.contains(ref)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Building the references
map is more expensive, shall we just skip this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipped it
if !containsNonConjunctionPredicates(left) && !containsNonConjunctionPredicates(right) => | ||
|
||
val leftEntries = left.collect { | ||
case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about EqualNullSafe
? Normally, we use Equality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did this change
private val columnB = 'b.int | ||
|
||
/** | ||
* Unit tests for constant propagation in expressions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @tejasapatil . Nit. It looks like test suite comment. Can we move this comment to line 27?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did this change
cc026da
to
b8c4147
Compare
}.isDefined | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case f: Filter => f transformExpressionsUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was initially doing this for the entire logical plan but now switched to do only for filter operator.
Reason: Doing this for the entire logical plan will mess up with JOIN predicates. eg.
SELECT * FROM a JOIN b ON a.i = 1 AND b.i = a.i
=>
SELECT * FROM a JOIN b ON a.i = 1 AND b.i = 1
.. the result is a cartesian product and Spark fails (asking to set a config). In case of OUTER JOINs, changing the join predicates might cause regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am being myopic here but the result should be the same right? The only way this regresses is when we plan a CartesianProduct
instead of an BroadcastNestedLoopJoin
... I am fine with not optimizing this for now, it would be nice if these constraints are at least generated here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the result should be the same. I don't have any theoretical proof if doing this over joins will be safe so want to be cautious here ... any bad rules might lead to correctness bugs which is super bad for end users.
it would be nice if these constraints are at least generated here
Sorry I am not able to get you here and want to make sure if I am not ignoring your comment. Are you suggesting any changes over the existing version ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently infer is not null
constraints up and down the plan. This could be easily extended to other constraints. Your PR has some overlap with this. However, lets focus on getting this merged first, and then we might take a stab at extending this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also cc @sameeragarwal
Test build #77133 has finished for PR 17993 at commit
|
Test build #77134 has finished for PR 17993 at commit
|
Jenkins test this please |
Test build #77160 has started for PR 17993 at commit |
Jenkins test this please |
Test build #77193 has finished for PR 17993 at commit
|
Test build #77392 has finished for PR 17993 at commit
|
* in the AND node. | ||
*/ | ||
object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { | ||
def containsNonConjunctionPredicates(expression: Expression): Boolean = expression.find { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
val constantsMap = AttributeMap(equalityPredicates.map(_._1)) | ||
val predicates = equalityPredicates.map(_._2).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it's safe when we have both a = 1
and a = 2
at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current impl will pick the last one (ie. a = 2
) and propagate it. Given that its one of the equality predicates user provided, there is nothing wrong in propagating it. When the query is evaluated, it would return empty result given that a = 1
and a = 2
cannot be true at the same time.
scala> hc.sql(" SELECT * FROM table1 a WHERE a.j = 1 AND a.j = 2 AND a.k = (a.j + 3)").explain(true)
== Physical Plan ==
*Project [i#51, j#52, k#53]
+- *Filter ((((isnotnull(k#53) && isnotnull(j#52)) && (j#52 = 1)) && (j#52 = 2)) && (cast(k#53 as int) = 5))
+- *FileScan orc default.table1[i#51,j#52,k#53] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/Users/tejasp/warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j), EqualTo(j,1), EqualTo(j,2)], ReadSchema: struct<i:int,j:int,k:string>
.where(columnA === Literal(11) && columnB === Literal(10)).analyze | ||
|
||
comparePlans(Optimize.execute(query.analyze), correctAnswer) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a negative test case like SELECT * FROM t WHERE a=1 and a=2 and b=a+3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
- FileSourceStrategySuite.partitioned table - FileSourceStrategySuite.partitioned table - case insensitive - FileSourceStrategySuite.partitioned table - after scan filters
399f348
to
731f796
Compare
Test build #77432 has finished for PR 17993 at commit
|
} | ||
|
||
and transform { | ||
case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check for identity instead of equality? I think you are doing the latter. What will happen in the following example: select * from bla where (a = 1 or b = 2) and a = 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the behavior with this PR. Seems reasonable because a = 1
has to be true so (a = 1 or b = 2)
would always be true
and can be eliminated.
scala> hc.sql(" select * from bla where (a = 1 or b = 2) and a = 1 ").explain(true)
== Physical Plan ==
*Project [a#34, b#35]
+- *Filter (isnotnull(a#34) && (a#34 = 1))
+- *FileScan ....
.where( | ||
columnA === Literal(11) && | ||
columnB === Literal(10) && | ||
(columnA === Add(columnC, Literal(3)) || Literal(10) === columnC)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be able to infer that columnA == Literal(11)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps if you increase the number of iterations on ConstantPropagation
batch...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had set a higher value for iterations in previous version of this PR but somehow the unit tests kept failing for me over terminal (surprisingly they worked fine over Intellij). This seems unrelated to the change done in the PR. If you have any advice here, let me know
sbt.ForkMain$ForkError: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (2) reached for batch ConstantPropagation, tree:
!Filter ((a#82440 = 11) && (b#82441 = 10))
+- !Project [a#82440]
+- LocalRelation <empty>, [a#82437, b#82438, c#82439]
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:105)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.catalyst.optimizer.ConstantPropagationSuite$$anonfun$1.apply$mcV$sp(ConstantPropagationSuite.scala:60)
at org.apache.spark.sql.catalyst.optimizer.ConstantPropagationSuite$$anonfun$1.apply(ConstantPropagationSuite.scala:50)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... that means the optimizer is not converging to a fixed point. Could you try to increase the number of iterations? You can also check if the optimizer reaches 100 iterations during regular execution; it should log a warning. If it does something is wrong with the rule, and it might cause the optimizer to run prohibitively long...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That did it !! updated the change
Test build #77493 has finished for PR 17993 at commit
|
LGTM - merging to master. Thanks! |
What changes were proposed in this pull request?
See class doc of
ConstantPropagation
for the approach used.How was this patch tested?