Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38987][shuffle] Handle fallback when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true #36305

Closed
wants to merge 2 commits into from

Conversation

zhouyejoe
Copy link
Contributor

What changes were proposed in this pull request?

Adds the corruption exception handling for merged shuffle chunk when spark.shuffle.detectCorrupt is set to true(default value is true)

Why are the changes needed?

Prior to Spark 3.0, spark.shuffle.detectCorrupt is set to true by default, and this configuration is one of the knob for early corruption detection. So the fallback can be triggered as expected.

After Spark 3.0, even though spark.shuffle.detectCorrupt is still set to true by default, but the early corruption detect knob is controlled with a new configuration spark.shuffle.detectCorrupt.useExtraMemory, and it set to false by default. Thus the default behavior, with only Magnet enabled after Spark 3.2.0(internal li-3.1.1), will disable the early corruption detection, thus no fallback will be triggered. And it will drop to throw an exception when start to read the corrupted blocks.

We need to handle the corrupted stream for merged blocks with/out fallback in different scenarios:

If user sets the spark.shuffle.detectCorrupt.useExtraMemory to true, this will trigger the fallback. But this block only puts a small portion of the shuffle block and evaluate whether it has been corrupted. There is still possibility that it will be corrupted in later parts of the shuffle blocks. Then it will be handled by the spark.shuffle.detectCorrupt.
If the spark.shuffle.detectCorrupt.useExtraMemory is set to false, but spark.shuffle.detectCorrupt is set to true, it shouldn't throw an exception saying ShuffleChunk is not a shuffle block, and it should trigger the retry if the shuffleblock is shufflechunk.
If spark.shuffle.detectCorrupt.useExtraMemory is set to false, and spark.shuffle.detectCorrupt is set to false, it should just throw an exception in the client side and fail the task.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Test is WIP. UT to be added

… corrupted and spark.shuffle.detectCorrupt is set to true
@zhouyejoe
Copy link
Contributor Author

cc @mridulm @otterc Thoughts about the handling here? Should we also add the fallback, when the merged shuffle chunk is corrupted and spark.shuffle.detectCorrupt is set to false? The exception handling needs to be outside of the ShuffleBlockFetcherIterator though, which will make things complicated.

@@ -1166,6 +1166,8 @@ final class ShuffleBlockFetcherIterator(
case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) =>
throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, mapIndex, startReduceId,
msg, e)
case ShuffleBlockChunkId(_, _, _, _) =>
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
Copy link
Contributor

@otterc otterc Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this result in duplicate records? Some of the chunk is already consumed but we don't know how much is that correct? So now we fallback to original blocks and that could re-fetch the records that were already consumed.

Copy link
Contributor

@mridulm mridulm Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change it to throwing fetch failed exception in this case @zhouyejoe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will update accordingly

@github-actions github-actions bot added the CORE label Apr 21, 2022
@otterc
Copy link
Contributor

otterc commented Apr 21, 2022

I don't think we can handle corruption when it happens in between a shuffle chunk and fallback because we don't know how much of the chunk has been consumed. We would have to treat this as a corruption of a regular shuffle block and propagate it to driver. So, on the driver now we will have to handle fetch failures related to a shuffle chunk.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
I would like to merge this into branch-3.3, given it causes stage failures without it @MaxGekk

+CC @Ngone51, @otterc for taking a look.

@mridulm mridulm changed the title [WIP][SPARK-38987][shuffle] Handle fallback when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true [SPARK-38987][shuffle] Handle fallback when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true Apr 28, 2022
@mridulm
Copy link
Contributor

mridulm commented Apr 28, 2022

I removed the WIP tag @zhouyejoe.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, can you add a test for this as well for this @zhouyejoe ?

@otterc
Copy link
Contributor

otterc commented Apr 28, 2022

@zhouyejoe @mridulm I don't think this change is sufficient. We have to make changes on the driver to handle FetchFailure of a shuffle chunk. At the least we have to unregister the merge results corresponding to the shuffle chunk

@@ -1166,6 +1166,8 @@ final class ShuffleBlockFetcherIterator(
case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) =>
throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, mapIndex, startReduceId,
msg, e)
case ShuffleBlockChunkId(shuffleId, _, reduceId, _) =>
SparkCoreErrors.fetchFailedError(address, shuffleId, -1L, -1, reduceId, msg, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will propagate the FetchFailure with mapId/mapIndex = -1. Currently the driver will not unregister the mergeStatus because mapIndex = -1. Since the mergeStatus is still registered, the next attempt will still try to fetch the corrupted merged block. Eventually the application will fail.
So, this requires a change in the driver as well. Also once we make that change, we should add a UT to DAGScheduler to verify that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed description. Will update accordingly then.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 23, 2022
@github-actions github-actions bot closed this Sep 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants