-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26089][CORE] Handle corruption in large shuffle blocks #23453
Conversation
Test build #100746 has finished for PR 23453 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a partial review so far
there should be a test that a large block is read correctly, with the partial buffering & stream concatenation. I think most unit tests will not end up creating a large block.
if (closeStreams) { | ||
try { | ||
if (count < maxSize) { | ||
in.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxSize might be exactly the same length as the stream, in which case you'll read to the end, but not close it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I left that edge case, as it will be correctly handled by the concatenated stream later
isStreamCopied = true | ||
streamCompressedOrEncrypted = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should do the inputStream checks even if detectCorrupt
is false, and perhaps even change that default. Or add another flag just for doing this memory copy, as its more expensive.
Also I think the comments around here need to be updated to explain what is happening now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can change it to always detect corruption. Which default did you mean though, the default value of detectCorrupt
is true.
Test build #100750 has finished for PR 23453 at commit
|
Test build #100895 has finished for PR 23453 at commit
|
Test build #100898 has finished for PR 23453 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work here. This is just an initial review to better understand your approach. I think It will be great to ping the original authors of SPARK-4105 and related changes to get their feedback as well.
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
cc @davies since you worked on SPARK-4105 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should do the inputStream checks even if detectCorrupt is false, and perhaps even change that default. Or add another flag just for doing this memory copy, as its more expensive.
Which default did you mean though, the default value of detectCorrupt is true.
Sorry I didn't explain that very well. What I was wondering is we should change the behavior to
- max the copy-into-memory check turned off by default
- but always do the on-the-fly check, even if detectCorrupt is false. You could add another option for this also, and have it default true, if really want to allow users to turn off all detection.
I'm saying this because I think that I'd want to run my spark apps with the new default I'm suggesting -- don't do the copy into memory, but do the on-the-fly check all the time, of the entire block.
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
d40d396
to
abbec63
Compare
Test build #101777 has finished for PR 23453 at commit
|
Test build #101776 has finished for PR 23453 at commit
|
retest this please |
Test build #101832 has finished for PR 23453 at commit
|
real build error:
|
Thanks, I realized it after this morning's build failure. The previous test failure confused me and I did not see the build failure before that. |
Test build #101837 has finished for PR 23453 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some test updates.
@tgravescs how do you feel about my suggested updates to the defaults?
I think you are talking about:
so the only place detectCorrupt is used after this change is passed into BufferReleasingInputStream |
sorry my earlier comment wasn't clear -- ankur was confused too but I think this version of the patch implements what I think the right option is:
|
Ok, That makes sense to me and agree with you. |
17e36c8
to
c8d3569
Compare
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
Test build #101996 has finished for PR 23453 at commit
|
Test build #101997 has finished for PR 23453 at commit
|
Test build #101999 has finished for PR 23453 at commit
|
Jenkins, retest this please |
Test build #102008 has finished for PR 23453 at commit
|
Jenkins, retest this please |
Test build #102037 has finished for PR 23453 at commit
|
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
Test build #102065 has finished for PR 23453 at commit
|
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
Test build #102223 has finished for PR 23453 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few very minor style things, otherwise lgtm
@@ -571,7 +582,8 @@ final class ShuffleBlockFetcherIterator( | |||
} | |||
} | |||
|
|||
private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = { | |||
private[storage] def throwFetchFailedException( | |||
blockId: BlockId, address: BlockManagerId, e: Throwable) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if the method declaration is multi-line, each arg on its own line. And this method should have had a return type in the first place (probably my fault, oops)
private[storage] def throwFetchFailedException(
blockId: BlockId,
address: BlockManagerId,
e: Throwable): Unit = {
private val iterator: ShuffleBlockFetcherIterator, | ||
private val blockId: BlockId, | ||
private val address: BlockManagerId, | ||
private val streamCompressedOrEncrypted: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename streamCompressedOrEncrypted
to detectCorruption
or something like that, as the condition is a bit more complex now (you also check the detectCorrupt
config when passing this in)
def copyStreamUpTo(in: InputStream, maxSize: Long): (Boolean, InputStream) = { | ||
var count = 0L | ||
val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) | ||
val streamCopied = tryWithSafeFinally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think a better name for streamCopied
would be fullyCopied
... everytime I come back to this code I always get a bit confused that maybe no copying is happening in some situations.
|
||
// Only one block should be returned which has corruption after maxBytesInFlight/3 because the | ||
// other block will detect corruption on first fetch, and then get added to the queue again for | ||
// a retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd reword this -- you get one block because you call next() once. You really want to explain why you get a certain block back.
We'll get back the block which has corruption after maxBytesInFlight/3 because ...
(assuming I understand this correctly...)
} | ||
|
||
// Following will succeed as it reads part of the stream which is not corrupt. This will read | ||
// maxBytesInFlight/3 bytes from first stream and remaining from the second stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... will read maxBytesInFlight/3 bytes from the portion copied into memory, and the remaining from the underlying stream.
|
||
val limit = 1000 | ||
// testing for inputLength less than, equal to and greater than limit | ||
List(998, 999, 1000, 1001, 1002).foreach { inputLength => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(limit - 2 to limit + 2).foreach
SPARK-4105 added corruption detection in shuffle blocks but that was limited to blocks which are smaller than maxBytesInFlight/3. This commit adds upon that by adding corruption check for large blocks. There are two changes/improvements that are made in this commit: 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as smaller blocks, so if a large block is corrupt in the starting, that block will be re-fetched and if that also fails, FetchFailureException will be thrown. 2. If large blocks are corrupt after size maxBytesInFlight/3, then any IOException thrown while reading the stream will be converted to FetchFailureException. This is slightly more aggressive than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction. Thanks to @squito for direction and support. Testing Done: Changed the junit test for big blocks to check for corruption.
1. Updated comments in the code 2. If IOException is thrown while reading from a stream, it will always be converted to a FetchFailureException, even when detectCorruption is false 3. Added a junit test which verifies that data can be read from concatenated stream
…n Mockito api across versions
1. Minor changes 2. Added a new config for detecting corruption by using extra memory with default set to false 3. Added test cases for copyStreamUpTo
1. Changed test to also compare the contents of stream 2. Other minor refactoring
Changes to unit test case
Minor changes to variable names and comments
Test build #103217 has finished for PR 23453 at commit
|
b178f65
to
bd1a813
Compare
Test build #103218 has finished for PR 23453 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @ankuriitg . sorry I am being very particular about this, its just a really core piece. I have a couple of minor updates to comments -- but really I was doing one final pass and I got pretty confused about the existing in.close()
. I'd like us to clean that up now, while we're looking at this, if possible. Left a longer comment inline, please check my reasoning.
* Copy all data from an InputStream to an OutputStream upto maxSize and | ||
* close the input stream if all data is read. | ||
* @return A tuple of boolean, which is whether the stream was fully copied, and an InputStream, | ||
* which is a combined stream of read data and any remaining data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doc needs updating now. Something like
Copy the first `maxSize` bytes of data from the InputStream to an in-memory
buffer, while still exposing the entire original input stream, primarily to check
for corruption.
This returns a new InputStream which contains the same data as the original
input stream. It may be entirely on an in-memory buffer, or it may be a combination
of of in-memory data, and then continue to read from the original stream. The only real
use of this is if the original input stream will potentially detect corruption while the data
is being read (eg. from compression). This allows for an eager check of corruption in
the first maxSize bytes of data.
@return A tuple of boolean, which is whether the stream was fully copied, and an
InputStream which includes all data from the original stream (combining buffered data
and remaining data in the original stream)
// Decompress the whole block at once to detect any corruption, which could increase | ||
// the memory usage tne potential increase the chance of OOM. | ||
// Decompress the block upto maxBytesInFlight/3 at once to detect any corruption which | ||
// could increase the memory usage and potentially increase the chance of OOM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment & the one just above the if
are redundant and a little bit wrong -- I think you only need one comment (don't care whether its above or below the if) and should be something more like:
We optionally decompress the first maxBytesInFlight/3 bytes into memory, to check for corruption in that portion of the data. But even if that configuration is off, or if the corruption is later, we'll still try to detect the corruption later in the stream.
Utils.copyStream(input, out, closeStreams = true) | ||
input = out.toChunkedByteBuffer.toInputStream(dispose = true) | ||
val (fullyCopied: Boolean, mergedStream: InputStream) = Utils.copyStreamUpTo( | ||
input, maxBytesInFlight / 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand why the
finally {
// TODO: release the buf here to free memory earlier
if (isStreamCopied) {
in.close()
}
is needed down below. To be honest, I don't think it was needed in the old code. The old Utils.copyStream
was always called with closeStreams=true
, and that would always close the input in a finally
itself:
spark/core/src/main/scala/org/apache/spark/util/Utils.scala
Lines 302 to 307 in d9978fb
def copyStream( | |
in: InputStream, | |
out: OutputStream, | |
closeStreams: Boolean = false, | |
transferToEnabled: Boolean = false): Long = { | |
tryWithSafeFinally { |
spark/core/src/main/scala/org/apache/spark/util/Utils.scala
Lines 330 to 332 in d9978fb
if (closeStreams) { | |
try { | |
in.close() |
It doesn't hurt, but it also makes things unnecessarily confusing. If you didn't need to do that in.close()
below, you woudln't need to track isStreamCopied
, and wouldn't need to even return fullyCopied
from Utils.copyStreamUpTo
. That's really the part of this which is bugging me -- something seems off that we need to know whether or not the stream is fully copied, seems like it shouldn't matter. If it does matter, aren't we getting something wrong in the case where the stream is exactly maxBytesInFlight / 3, but we haven't realized its fully copied because we haven't read past the end yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in.close() is needed if there is an exception while creating a wrapped stream. The first time I saw isStreamCopied
, I was also confused and by looking at it more closely now, I realize that it is not doing what it is supposed to do.
I have removed isStreamCopied
and instead used another condition to close the stream. Please check and let me know if it makes sense.
val (fullyCopied: Boolean, mergedStream: InputStream) = Utils.copyStreamUpTo( | ||
input, maxBytesInFlight / 3) | ||
isStreamCopied = fullyCopied | ||
input = mergedStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to your changes, but while you're touching this file, can you add 2 more spaces of indentation to || corruptedBlocks.contains(blockId)) {
a couple of lines below this?
1. Ensured input stream is closed on exception 2. Minor comments changes
Test build #103338 has finished for PR 23453 at commit
|
merged to master, thanks @ankuriitg ! |
What changes were proposed in this pull request?
SPARK-4105 added corruption detection in shuffle blocks but that was limited to blocks which are
smaller than maxBytesInFlight/3. This commit adds upon that by adding corruption check for large
blocks. There are two changes/improvements that are made in this commit:
large block is corrupt in the starting, that block will be re-fetched and if that also fails,
FetchFailureException will be thrown.
reading the stream will be converted to FetchFailureException. This is slightly more aggressive
than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction.
Thanks to @squito for direction and support.
How was this patch tested?
Changed the junit test for big blocks to check for corruption.