-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36206][CORE] Support shuffle data corruption diagnosis via shuffle checksum #33451
Conversation
cc @mridulm @tgravescs @otterc @cloud-fan Please help review, thanks! |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141382 has finished for PR 33451 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141385 has finished for PR 33451 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did one round of review. My general thoughts about this change are
- Though it avoids re-fetch of a corrupted block for which the cause of corruption is disk_issue, the act of finding the cause of corruption, which is by sending another message to the server, is as high as just retrying the corrupt block. The retry of a corrupt block happens just once. So, I don't think this change saves much time with respect to this. If there is any data (benchmark results) which suggest otherwise, please do share.
- With respect to diagnostics, most of the times the corruption is due to disk. I think corruption due to network(other issues) would be rare. I feel that this broad classification of corruption may not be that helpful to the user. Again, if there are metrics that suggest otherwise, please share. I looked at the other PR but didn't find additional information there.
common/network-common/src/main/java/org/apache/spark/network/corruption/Cause.java
Outdated
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
Outdated
Show resolved
Hide resolved
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a few comments, mostly looks good - thanks for working on this @Ngone51 !
File probeFile = ExecutorDiskUtils.getFile( | ||
executor.localDirs, | ||
executor.subDirsPerLocalDir, | ||
fileName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, while looking at this, an unrelated potential issue - we use intern
in ExecutorDiskUtils
.
Probably should move to using guava interner (Utils.weakIntern
does this) ... thoughts @Ngone51 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, Utils
can't be referenced in network-shuffle
module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I meant something similar ... we dont need to do this for this PR btw; just thinking out.
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
Show resolved
Hide resolved
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/DiagnoseCorruption.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/spark/network/shuffle/checksum/ShuffleCorruptionDiagnosisHelper.java
Outdated
Show resolved
Hide resolved
The main motivation behind the shuffle checksum project is to give the cause of data corruption to users/developers to help debug the underlying root causes further. It doesn't really try to bring performance improvement here. And please also note that diagnosis only happens for the corruption error (which is a corner case). So, it won't have a big impact on performance.
These are the only causes we can give under the current solution. And I think it's actually helpful. Without this change, people can only guess the cause. Even if we all suspect the most cause is due to disk issues, but no one can tell it for sure. |
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
Test build #141617 has finished for PR 33451 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Show resolved
Hide resolved
cause = Cause.CHECKSUM_VERIFY_PASS; | ||
} | ||
} catch (UnsupportedOperationException e) { | ||
cause = Cause.UNSUPPORTED_CHECKSUM_ALGORITHM; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @mridulm @otterc @tgravescs Who discussed the upgrade issue of ESS before.
FYI, there's a major change after addressing #33451 (comment): Previously, we'd diagnose corruption when the first corruption of the block is detected. Now, we only diagnose corruption at the send corruption of the same block. Thus, we resolve the concern #33451 (comment). |
Test build #141647 has finished for PR 33451 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, just a few minor comments.
Thanks for the fixes @Ngone51 !
common/network-common/src/main/java/org/apache/spark/network/corruption/Cause.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
Show resolved
Hide resolved
File probeFile = ExecutorDiskUtils.getFile( | ||
executor.localDirs, | ||
executor.subDirsPerLocalDir, | ||
fileName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I meant something similar ... we dont need to do this for this PR btw; just thinking out.
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Show resolved
Hide resolved
...k-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
Outdated
Show resolved
Hide resolved
...etwork-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
Show resolved
Hide resolved
...etwork-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There doesn't seem to by any UTs added to ShuffleBlockFetchIteratorSuite. Don't have any other comments.
...k-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
Outdated
Show resolved
Hide resolved
Test build #141710 has finished for PR 33451 at commit
|
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status success |
Sure. I'll add there. |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141944 has finished for PR 33451 at commit
|
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out #32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes #33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Tests passed, merged to master and branch-3.2 Thanks for working on this @Ngone51 ! |
Thank you, everybody! |
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
What changes were proposed in this pull request?
This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this:
The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown.
After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis.
Please check out #32385 to see the completed proposal of the shuffle checksum project.
Why are the changes needed?
Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users.
Does this PR introduce any user-facing change?
Yes, users may know the cause of the shuffle corruption after this change.
How was this patch tested?
Added tests.