-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32635][SQL] Fix foldable propagation #29771
[SPARK-32635][SQL] Fix foldable propagation #29771
Conversation
Test build #128764 has finished for PR 29771 at commit
|
cc @cloud-fan, @maropu, @viirya |
sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
@@ -2555,6 +2555,18 @@ class DataFrameSuite extends QueryTest | |||
val df = Seq(0.0 -> -0.0).toDF("pos", "neg") | |||
checkAnswer(df.select($"pos" > $"neg"), Row(false)) | |||
} | |||
|
|||
test("SPARK-32635: Fix foldable propagation") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make the title clearer? e.g., Replace references with foldables coming only from the node's children
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, fixed.
Looks ok otherwise. Nice fix. |
0c0a115
to
5f7bbfb
Compare
last commit is just a minor improvement |
LGTM (not a Reviewer). Eliminating I'd like to ask about a few things though:
Thanks! |
Test build #128812 has finished for PR 29771 at commit
|
Good point. |
This is exactly what happens without this PR. Basically
I think this is a very good point actually. |
Thanks @rednaxelafx, latest commit contains the fix. |
Test build #128808 has finished for PR 29771 at commit
|
Test build #128810 has finished for PR 29771 at commit
|
val b = Seq("2").toDF("col1").withColumn("col2", lit("2")) | ||
val aub = a.union(b) | ||
val c = aub.filter($"col1" === "2").cache() | ||
val d = Seq("2").toDF( "col4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space before "col4"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
Show resolved
Hide resolved
@@ -624,67 +624,89 @@ object NullPropagation extends Rule[LogicalPlan] { | |||
*/ | |||
object FoldablePropagation extends Rule[LogicalPlan] { | |||
def apply(plan: LogicalPlan): LogicalPlan = { | |||
var foldableMap = AttributeMap(plan.flatMap { | |||
case Project(projectList, _) => projectList.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about Aggregate? Why we are not handling Aggregate in the original rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ticket and PR just focuses on the correctness issue found. But I think we can extend the rule to handle Aggregate
in another ticket if that is ok with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question when I first looked at the PR, but since this PR might need to be hotfixed to released branches, it's a good idea to keep it simple and don't add any new features. So I'm +1 for handling Aggregate in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @peter-toth and @rednaxelafx 's opinion.
@@ -2555,6 +2555,18 @@ class DataFrameSuite extends QueryTest | |||
val df = Seq(0.0 -> -0.0).toDF("pos", "neg") | |||
checkAnswer(df.select($"pos" > $"neg"), Row(false)) | |||
} | |||
|
|||
test("SPARK-32635: Replace references with foldables coming only from the node's children") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this end-to-end test is also useful, we had better have a narrow-downed test coverage at FoldablePropagationSuite
in catalyst
module, too. Could you add one please? If then, we can easily detect regression in catalyst
module test instead of running full sql
tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will try to add a test case to FoldablePropagationSuite
tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thanks, @peter-toth and all.
I guess most comments are already addressed. I have only one minor test related comment.
Ur, @peter-toth . Could you check the PR description once more? scala> spark.version
res1: String = 3.0.1
scala> :paste
// Entering paste mode (ctrl-D to finish)
val a = Seq("1").toDF("col1").withColumn("col2", lit("1"))
val b = Seq("2").toDF("col1").withColumn("col2", lit("2"))
val aub = a.union(b)
val c = aub.filter($"col1" === "2").cache()
val d = Seq("2").toDF( "col4")
val r = d.join(aub, $"col2" === $"col4").select("col4")
val l = c.select("col2")
val df = l.join(r, $"col2" === $"col4", "LeftOuter")
df.show()
// Exiting paste mode, now interpreting.
+----+----+
|col2|col4|
+----+----+
| 2| 2|
+----+----+ scala> spark.version
res0: String = 2.4.7
scala> :paste
// Entering paste mode (ctrl-D to finish)
val a = Seq("1").toDF("col1").withColumn("col2", lit("1"))
val b = Seq("2").toDF("col1").withColumn("col2", lit("2"))
val aub = a.union(b)
val c = aub.filter($"col1" === "2").cache()
val d = Seq("2").toDF( "col4")
val r = d.join(aub, $"col2" === $"col4").select("col4")
val l = c.select("col2")
val df = l.join(r, $"col2" === $"col4", "LeftOuter")
df.show()
// Exiting paste mode, now interpreting.
+----+----+
|col2|col4|
+----+----+
| 2| 2|
+----+----+ |
@dongjoon-hyun, please run the shell with |
Got it. Thanks, @peter-toth ! Please add that conf into PR description and JIRA description. |
Test build #128825 has finished for PR 29771 at commit
|
Test build #128835 has finished for PR 29771 at commit
|
Test build #128837 has finished for PR 29771 at commit
|
Nice suggestion, @rednaxelafx ! I checked the latest commits again and they looks okay. |
### What changes were proposed in this pull request? This PR rewrites `FoldablePropagation` rule to replace attribute references in a node with foldables coming only from the node's children. Before this PR in the case of this example (with setting`spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation`): ```scala val a = Seq("1").toDF("col1").withColumn("col2", lit("1")) val b = Seq("2").toDF("col1").withColumn("col2", lit("2")) val aub = a.union(b) val c = aub.filter($"col1" === "2").cache() val d = Seq("2").toDF( "col4") val r = d.join(aub, $"col2" === $"col4").select("col4") val l = c.select("col2") val df = l.join(r, $"col2" === $"col4", "LeftOuter") df.show() ``` foldable propagation happens incorrectly: ``` Join LeftOuter, (col2#6 = col4#34) Join LeftOuter, (col2#6 = col4#34) !:- Project [col2#6] :- Project [1 AS col2#6] : +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory, deserialized, 1 replicas) : +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory, deserialized, 1 replicas) : +- Union : +- Union : :- *(1) Project [value#1 AS col1#4, 1 AS col2#6] : :- *(1) Project [value#1 AS col1#4, 1 AS col2#6] : : +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2)) : : +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2)) : : +- *(1) LocalTableScan [value#1] : : +- *(1) LocalTableScan [value#1] : +- *(2) Project [value#10 AS col1#13, 2 AS col2#15] : +- *(2) Project [value#10 AS col1#13, 2 AS col2#15] : +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2)) : +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2)) : +- *(2) LocalTableScan [value#10] : +- *(2) LocalTableScan [value#10] +- Project [col4#34] +- Project [col4#34] +- Join Inner, (col2#6 = col4#34) +- Join Inner, (col2#6 = col4#34) :- Project [value#31 AS col4#34] :- Project [value#31 AS col4#34] : +- LocalRelation [value#31] : +- LocalRelation [value#31] +- Project [col2#6] +- Project [col2#6] +- Union false, false +- Union false, false :- Project [1 AS col2#6] :- Project [1 AS col2#6] : +- LocalRelation [value#1] : +- LocalRelation [value#1] +- Project [2 AS col2#15] +- Project [2 AS col2#15] +- LocalRelation [value#10] +- LocalRelation [value#10] ``` and so the result is wrong: ``` +----+----+ |col2|col4| +----+----+ | 1|null| +----+----+ ``` After this PR foldable propagation will not happen incorrectly and the result is correct: ``` +----+----+ |col2|col4| +----+----+ | 2| 2| +----+----+ ``` ### Why are the changes needed? To fix a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes a correctness issue. ### How was this patch tested? Existing and new UTs. Closes #29771 from peter-toth/SPARK-32635-fix-foldable-propagation. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org> (cherry picked from commit 4ced588) Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Thanks! Merged to master/3.0. Could you backport it into branch-2.4, @peter-toth ? |
Also, do you want to make a PR for the @gatorsmile comment above #29771 (comment)? |
@maropu and @peter-toth . Please don't forget #29771 (comment) . That should be backported together. |
Oh, I missed it. Could you fix it first in follow-up @peter-toth ? |
Thanks all for the review! I've opened a follow-up PR: #29802 to add a new test case to I will open a 2.4 backport PR soon and I'm happy to extend propagation to |
### What changes were proposed in this pull request? This is a follow-up PR to #29771 and just adds a new test case. ### Why are the changes needed? To have better test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes #29802 from peter-toth/SPARK-32635-fix-foldable-propagation-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? This is a follow-up PR to #29771 and just adds a new test case. ### Why are the changes needed? To have better test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes #29802 from peter-toth/SPARK-32635-fix-foldable-propagation-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 3309a2b) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Nice @peter-toth, LGTM too! cc @vinodkc FYI |
### What changes were proposed in this pull request? This PR adds foldable propagation from `Aggregate` as per: #29771 (comment) ### Why are the changes needed? This is an improvement as `Aggregate`'s `aggregateExpressions` can contain foldables that can be propagated up. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes #29816 from peter-toth/SPARK-32951-foldable-propagation-from-aggregate. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? This PR rewrites `FoldablePropagation` rule to replace attribute references in a node with foldables coming only from the node's children. Before this PR in the case of this example (with setting`spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation`): ```scala val a = Seq("1").toDF("col1").withColumn("col2", lit("1")) val b = Seq("2").toDF("col1").withColumn("col2", lit("2")) val aub = a.union(b) val c = aub.filter($"col1" === "2").cache() val d = Seq("2").toDF( "col4") val r = d.join(aub, $"col2" === $"col4").select("col4") val l = c.select("col2") val df = l.join(r, $"col2" === $"col4", "LeftOuter") df.show() ``` foldable propagation happens incorrectly: ``` Join LeftOuter, (col2#6 = col4#34) Join LeftOuter, (col2#6 = col4#34) !:- Project [col2#6] :- Project [1 AS col2#6] : +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory, deserialized, 1 replicas) : +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory, deserialized, 1 replicas) : +- Union : +- Union : :- *(1) Project [value#1 AS col1#4, 1 AS col2#6] : :- *(1) Project [value#1 AS col1#4, 1 AS col2#6] : : +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2)) : : +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2)) : : +- *(1) LocalTableScan [value#1] : : +- *(1) LocalTableScan [value#1] : +- *(2) Project [value#10 AS col1#13, 2 AS col2#15] : +- *(2) Project [value#10 AS col1#13, 2 AS col2#15] : +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2)) : +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2)) : +- *(2) LocalTableScan [value#10] : +- *(2) LocalTableScan [value#10] +- Project [col4#34] +- Project [col4#34] +- Join Inner, (col2#6 = col4#34) +- Join Inner, (col2#6 = col4#34) :- Project [value#31 AS col4#34] :- Project [value#31 AS col4#34] : +- LocalRelation [value#31] : +- LocalRelation [value#31] +- Project [col2#6] +- Project [col2#6] +- Union false, false +- Union false, false :- Project [1 AS col2#6] :- Project [1 AS col2#6] : +- LocalRelation [value#1] : +- LocalRelation [value#1] +- Project [2 AS col2#15] +- Project [2 AS col2#15] +- LocalRelation [value#10] +- LocalRelation [value#10] ``` and so the result is wrong: ``` +----+----+ |col2|col4| +----+----+ | 1|null| +----+----+ ``` After this PR foldable propagation will not happen incorrectly and the result is correct: ``` +----+----+ |col2|col4| +----+----+ | 2| 2| +----+----+ ``` ### Why are the changes needed? To fix a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes a correctness issue. ### How was this patch tested? Existing and new UTs. Closes apache#29771 from peter-toth/SPARK-32635-fix-foldable-propagation. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org> (cherry picked from commit 4ced588) Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request? This is a follow-up PR to apache#29771 and just adds a new test case. ### Why are the changes needed? To have better test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes apache#29802 from peter-toth/SPARK-32635-fix-foldable-propagation-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 3309a2b) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This PR rewrites
FoldablePropagation
rule to replace attribute references in a node with foldables coming only from the node's children.Before this PR in the case of this example (with setting
spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
):foldable propagation happens incorrectly:
and so the result is wrong:
After this PR foldable propagation will not happen incorrectly and the result is correct:
Why are the changes needed?
To fix a correctness issue.
Does this PR introduce any user-facing change?
Yes, fixes a correctness issue.
How was this patch tested?
Existing and new UTs.