-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption #32385
Conversation
I marked PR as
|
cc @mridulm @otterc @attilapiros @tgravescs @cloud-fan Please take a look, thanks! |
Test build #138049 has finished for PR 32385 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
+CC @otterc This should be of interest given recent discussions. |
Thanks for copying me. I will take a look at it in few days. |
Hi all, to ease the review for everyone, I have planned to split these PRs into 2 smaller PR first:
(I have created the PR (#32401) for this part so you can start reviewing from there.)
(I'll create the PR later) |
I haven't had time to look in detail, if this is a working prototype, have you done any performance measurements to see example impact this has? I know you said it should be minimal but some numbers would be nice. Also at the high level how does this affect other shuffle work going on - like merging and pluggable? Is it independent of that or would need to be reimplemented? |
@tgravescs Thanks for the good points! I did find some perf regression by benchmarking with the change. I'll double-check it for sure and try to get rid of it if possible.
For merging, it needs extension to send checksum values along with the block data while merging. The extension is also needed for the decommission feature. For pluggable, my current implementation is added at An alternative way to support checksum for all plugins or say to make it a built-in feature maybe is to implement it in |
Hi @tgravescs @mridulm @otterc , I have resolved the regression issue (verified by running the TPCDS benchmark with 3tb data internally) and made the checksum as a built-in feature of Spark. And I have updated PR #32401 (which adds checksum support at shuffle writer side only) and I think it's ready for review. |
thanks @Ngone51, I'm very busy this week, so will take a look early next week. |
Sure, take your time :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took one pass through the PR, thanks for working on this @Ngone51 !
if (!appId.equals(that.appId)) return false; | ||
if (!execId.equals(that.execId)) return false; | ||
if (!blockId.equals(that.blockId)) return false; | ||
return checksum == that.checksum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: check checksum
first ? cheapest check ..
|
||
@Override | ||
public void encode(ByteBuf buf) { | ||
buf.writeInt(cause.ordinal()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int
-> byte
?
@@ -47,6 +48,15 @@ | |||
protected volatile TransportClientFactory clientFactory; | |||
protected String appId; | |||
|
|||
public Cause diagnoseCorruption( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include javadoc ?
int result = appId.hashCode(); | ||
result = 31 * result + execId.hashCode(); | ||
result = 31 * result + blockId.hashCode(); | ||
result = 31 * result + (int) checksum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: checksum -> Long.hashCode(checksum)
?
this.partitionLengths = new long[numPartitions]; | ||
this.partitionChecksums = checksumEnabled ? new long[numPartitions] : new long[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using null
in MapOutputCommitMessage
while empty array here when checksum is disabled.
Unify to a single idiom ? Given writeMetadataFileAndCommit
is depending on empty array (based on how it is written up right now), thoughts on using long[0]
?
(Btw, use a constant EMPTY_LONG_ARRAY if deciding to using new long[0]
} | ||
} | ||
} finally { | ||
logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}") | ||
if (indexTmp.exists() && !indexTmp.delete()) { | ||
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") | ||
} | ||
checksumTmpOpt.foreach { checksumTmp => | ||
if (checksumTmp.exists() && !checksumTmp.delete()) { | ||
logError(s"Failed to delete temporary checksum file at ${checksumTmp.getAbsolutePath}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logInfo
?
val channel = Files.newByteChannel(checksumFile.toPath) | ||
channel.position(reduceId * 8L) | ||
in = new DataInputStream(Channels.newInputStream(channel)) | ||
val goldenChecksum = in.readLong() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract out a readChecksum
and computeChecksum
methods ?
Btw, tryWithResource { DataInputStream(FileInputStream()).skip(reduceId * 8L).readLong() } would do the trick for readChecksum.
val checksumIn = new CheckedInputStream(blockData.createInputStream(), new Adler32) | ||
val buffer = new Array[Byte](8192) | ||
while (checksumIn.read(buffer, 0, 8192) != -1) {} | ||
val recalculatedChecksum = checksumIn.getChecksum.getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not closing checksumIn
btw.
@@ -76,6 +77,9 @@ private[spark] class DiskBlockObjectWriter( | |||
private var initialized = false | |||
private var streamOpen = false | |||
private var hasBeenClosed = false | |||
private var checksumEnabled = false | |||
private var checksumCal: Checksum = null | |||
private var checksumOutputStream: CheckedOutputStream = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above - reduce the number of streaming as fields ?
|
||
override def close(): Unit = sink.close() | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice unification !
oh..@mridulm Sorry if I confused you here. I have planed to split this PR into two separate PRs to ease the review:
So please help review the smaller PR there. And I'll try to resolve your comments in the separate PRs. Thanks! |
…checksum file ### What changes were proposed in this pull request? This is the initial work of add checksum support of shuffle. This is a piece of #32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, added a new conf: `spark.shuffle.checksum`. ### How was this patch tested? Added unit tests. Closes #32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…checksum file ### What changes were proposed in this pull request? This is the initial work of add checksum support of shuffle. This is a piece of #32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, added a new conf: `spark.shuffle.checksum`. ### How was this patch tested? Added unit tests. Closes #32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 4783fb7) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out #32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes #33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out #32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes #33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
…checksum file This is the initial work of add checksum support of shuffle. This is a piece of apache#32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. Yes, added a new conf: `spark.shuffle.checksum`. Added unit tests. Closes apache#32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 4783fb7) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Hi @Ngone51, thanks for providing the checksum feature for shuffle, have a thought about the following case.
I think the checksum could handle this case. Pick the idea from #28525, we can consume the input stream inside
To achieve this, we may need to update the shuffle network protocol to support passing checksums when fetching shuffle blocks. Compare to the existing We encounter this issues in our production everyday, for some jobs, the performance overhead is acceptable comparing to stability. @Ngone51 WDYT? |
…checksum file ### What changes were proposed in this pull request? This is the initial work of add checksum support of shuffle. This is a piece of apache#32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, added a new conf: `spark.shuffle.checksum`. ### How was this patch tested? Added unit tests. Closes apache#32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 4783fb7) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ffle checksum ### What changes were proposed in this pull request? This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this: The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown. After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis. Please check out apache#32385 to see the completed proposal of the shuffle checksum project. ### Why are the changes needed? Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users. ### Does this PR introduce _any_ user-facing change? Yes, users may know the cause of the shuffle corruption after this change. ### How was this patch tested? Added tests. Closes apache#33451 from Ngone51/SPARK-36206. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a98d919) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
What changes were proposed in this pull request?
This PR proposes to add checksum support for shuffle blocks. The basic idea is:
On the mapper side, we'll wrap a
CheckedOutputStream
upon theFileOutputStream
to calculate the checksum (use the same checksum calculatorAdler32
with broadcast) for each shuffle block (a.k.a partition) at the same time when we writing map output files. And similar to the index file, we'll have a checksum file to save these checksums.On the reducer side, we'll also wrap a
CheckedInputStream
upon theFileInputStream
to read the block. When block corruption is detected, we'll try to diagnose corruption for the cause:First, we'll use the
CheckedInputStream
to consume the remaining data of the corrupted block to calculate the checksum (c1
);Second, the reducer send an RPC request called
DiagnoseCorruption
(which containsc1
) to the server (where the reducer executed)Third, the server will read (using a very small memory) the corresponding block back from the disk and calculate the checksum (
c2
) again for it. And also read back the checksum(c3
) of the block saved in the checksum file. Then, ifc2 != c3
, we'll suspect the corruption is caused by the disk issue. Otherwise, ifc1 != c3
, we'll suspect the corruption is caused by the network issue. Otherwise, the cause remains unknown. The server then will reply to the reducer withCorruptionCause
containing the cause.Fourth, the reducer needs to take action after it receives the cause. If it's a disk issue or unknown, it will throw fetch failure directly. If it's a network issue, it will re-fetch the block later. Also note that, if the corruption happens inside
BufferReleasingInputStream
, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis.Overall, I think we don't introduce severe overhead with this proposal. In a normal case, the checksum is calculated in the streaming way as well as other streams, e.g., encryption, compression. And the major overhead here is that we need an extra data file traverse in the error case in order to calculate the checksum (
c2
).And the proposal in this PR is much simpler compared to the previous one #15894 (abandoned due to complexity ), which introduce more overhead as it need to traverse the data file twice for every block. In that proposal, the checksum is appended to each block data, so it's also invasive to the existing code.
Why are the changes needed?
Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the checksum support for the shuffle, Spark itself can at least distinguish the cause between disk and network, which is very important for users.
Does this PR introduce any user-facing change?
Yes.
spark.shuffle.checksum
to let user enables/disables the checksum (enabled by default)How was this patch tested?
Added an end-to-end unit test in
ShuffleSuite
.I'll add more tests if the community accepts the proposal.