New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in AQE with InMemoryTableScanExec #43729
Conversation
Fixes correctness issue in 3.5.0. The problem seems to be that when AQEShuffleRead does a coalesced read it can return a HashPartitioning with the coalesced number of partitions. This causes a correctness bug as the partitioning is not compatible for joins with other HashPartitioning even though the number of partitions matches. This is resolved in this patch by introducing CoalescedHashPartitioning and making AQEShuffleRead return that instead. The fix was suggested by cloud-fan > AQEShuffleRead should probably return a different partitioning, e.g. CoalescedHashPartitioning. It still satisfies ClusterDistribution, so Aggregate is fine and there will be no shuffle. For joins, two CoalescedHashPartitionings are compatible if they have the same original partition number and coalesce boundaries, and CoalescedHashPartitioning is not compatible with HashPartitioning. Correctness bug. Yes, fixed correctness issue. New and existing unit test. No Closes apache#43435 from eejbyfeldt/SPARK-45592. Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 2be03d8) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thank you, @eejbyfeldt . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @eejbyfeldt .
We can reuse the main title of PR because the code is the same.
[SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec
Let me revise this PR title.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM from my side. Thank you so much for helping Apache Spark 3.4.2 release., @eejbyfeldt .
cc @cloud-fan , too
* fewer number of partitions. | ||
*/ | ||
case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary]) | ||
extends Expression with Partitioning with Unevaluable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm why this needs to extend Expression
and Unevaluable
, I thought just Partitioning
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was just based on how it was done for HashPartitioning, could be that it not needed.
case SinglePartitionShuffleSpec => | ||
numPartitions == 1 | ||
case CoalescedHashShuffleSpec(otherParent, otherPartitions) => | ||
partitions == otherPartitions && from.isCompatibleWith(otherParent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suppose both from
and otherParent
are HashShuffleSpec
, is it possible that they may have different number of partitions, but after coalescing the number of partitions become the same?
In this case we'll consider the two incompatible but they actually should be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even they are coalesced to same number of partitions, the coalesced boundary could be different. I think this is root of the issue and why it needs to make sure boundaries are the same when checking compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is not related to my above comment. The check partitions == otherPartitions
is OK, my question is on from.isCompatibleWith(otherParent)
, which checks (assuming both are HashShuffleSpec
) whether the original number of partitions are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it must to be HashShuffleSpec
. You mean that their partition numbers can be different but compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean their partition numbers are different and thus from.isCompatibleWith(otherParent)
will return false, and thus cause this CoalescedHashShuffleSpec.isCompatibleWith
to also return false. But should we return true instead if the partitions after coalescing are the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, from.isCompatibleWith(otherParent)
could be too conservative
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if first hash partitioning is 5 partitions, second is 4 partitions. How can we get same coalesced partitions with that?
For example:
[[0, 3], [3, 5]] != [[0, 3], [3, 4]]
[[0, 2], [2, 3], [3, 5]] != [[0, 2], [2, 3], [3, 4]]
The end reducer index of last coalesced partition should be different always, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether it is possible for this case to happen. But irrespective of that, I feel this check is unnecessary here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course this is relatively minor stuff and not related to this backport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what about two (nonsensical) CoalescedHashShuffleSpec where is partition is just coalesced from single partition from the some parent HashShuffleSpec. Is it not then correct to do the from.isCompatibleWith(otherParent)
check? or is it expected that we can make them compatible by just coalescing them in such a way?
To @sunchao , if you don't mind, could you comment on the original PR once more? If we need those change, we need to start from |
I took a look at the JIRA. So the bug is only happened on 3.4 if these configurations are enabled? But they are not needed in 3.5? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This backport looks good. I just wonder why in 3.4 these configuration are needed to trigger the bug.
@@ -2461,6 +2461,19 @@ class DatasetSuite extends QueryTest | |||
) | |||
assert(result == expected) | |||
} | |||
|
|||
test("SPARK-45592: Coaleasced shuffle read is not compatible with hash partitioning") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test looks similar to the reproducer reported in SPARK-45282. Does it need the configurations used in the reproducer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we need to add the reproducer as new test (with the configurations) in 3.4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the test to actually reproduce the issue on 3.4.
Is it possible to add it as test? |
OK I cross posted there |
First of all, the test does NOT repro for us. It's not written in a robust way (certain enough to trigger the bug). Can we close this one for now? We will push a new fix out to OSS soon. |
The defaults were change in 3.5: SPARK-42768 but the test case from SPARK-45592 also depends on newer features like SPARK-42101. I there are many different ways to hit the same root bug. |
The title might have been slightly miss leading as InMemoryTableScanExec was only enable in AQE in 3.5.0 SPARK-42101. Which is how is need to reproduce using the test case in SPARK-45592 but I guess the root cause of the bug existed before as shown by the case describe in SPARK-45282 . |
To @eejbyfeldt , feel free to revise back if you think so. To @maryannxue , for the following comment, although we can wait for a new fix, I don't think we need to close this PR first because we don't know what you are going to propose there. I'm looking forward to reviewing your PR. Please let us know when a new fix is ready.
To be clear, I'm reviewing this PR as the Apache Spark 3.4.2 release manager. I'd love to deliver the Apache Spark 3.4.2 with the proper fix. |
First of all, @maryannxue 's comment was about the original PR, not for specific this PR (branch-3.4).
Second, she already agreed the original PR here.
Third, given the situation on @maryannxue 's PR (#43760), it's not written for branch-3.4 at all. To me, it seems that we need to spend more times on her PR. In addition, I'd like to have the consistent status for Apache Spark 4.0/3.5.2/3.4.2. In other words, her patch and test case will land in the same way to Lastly, for now, there is no landed patch not only |
The failure is irrelevant memory issue. Merged to branch-3.4. |
…MemoryTableScanExec ### What changes were proposed in this pull request? This backports #43435 SPARK-45592 to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug. ### Why are the changes needed? Fix correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixing correctness issue. ### How was this patch tested? New tests based on the reproduction example in SPARK-45282 ### Was this patch authored or co-authored using generative AI tooling? No Closes #43729 from eejbyfeldt/SPARK-45282. Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Merged to branch-3.4. Thank you all to help Apache Spark 3.4.2 release. I'll also share the AS-IS status in the dev mailing thread. |
…MemoryTableScanExec ### What changes were proposed in this pull request? This backports apache#43435 SPARK-45592 to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug. ### Why are the changes needed? Fix correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixing correctness issue. ### How was this patch tested? New tests based on the reproduction example in SPARK-45282 ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43729 from eejbyfeldt/SPARK-45282. Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This backports #43435 SPARK-45592 to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug.
Why are the changes needed?
Fix correctness issue.
Does this PR introduce any user-facing change?
Yes, fixing correctness issue.
How was this patch tested?
New tests based on the reproduction example in SPARK-45282
Was this patch authored or co-authored using generative AI tooling?
No