New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size #38333
Conversation
+CC @otterc |
Can one of the admins verify this patch? |
We have located the cause of the zero-size chunk problem on the shuffle service node. and there is the following information in the system
Although this is not from the software layer, and the number of bad nodes that lose data is very low, I think it makes sense to support fallback here. cc @otterc |
For cases like this, it might actually be better to fail the task (and recompute the parent stage) - and leverage deny list to prevent tasks from running on the problematic node ? |
it is not necessary to recompute the parent stage. This case is similar to chunk corruption. We can fallback original shuffle block. The reasons are:
|
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
If there are hardware issues which are causing failures - it is better to move the nodes to deny list and prevent them from getting used: we will keep seeing more failures, including for vanilla shuffle. On other hand, I can also look at this as a data corruption issue - @otterc what was the plan around how we support shuffle corruption diagnosis for push based shuffle (SPARK-36206, etc). Is the expectation that we fallback ? Or we diagnose + fail ? Thoughts @Ngone51 |
I think it is more efficient to fallback and fetch map outputs instead of failing the stage and regenerating the data of the partition. When the corrupt blocks are merged shuffle blocks or chunks we don't retry to fetch them anyways and fallback immediately. |
Sounds good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this @gaoyajun02.
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of changes ...
There is a pending comment, can you take a look at it @gaoyajun02 ? Thx |
Also, can you please update to latest master @gaoyajun02 ? Not sure why we are seeing the linter failure in build |
…huffle chunk is zero-size
d653b6c
to
b992830
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
The test failure looks unrelated, can you retrigger the tests @gaoyajun02 ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
+CC @otterc
Looks good to me |
Merged to master. |
Thank you, @gaoyajun02 , @mridulm , @otterc .
|
I was on two minds whether to fix this in 3.3 as well ... But agree, a backport to branch-3.3 would be helpful. |
Thank you, @mridulm ! |
…huffle chunk is zero-size When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. No. UT Closes apache#38333 from gaoyajun02/SPARK-40872. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Mridul <mridul<at>gmail.com> (cherry picked from commit 72cce5c)
ok, can you take a look @mridulm cc @otterc @dongjoon-hyun |
Since the 3.3 branch does not contain the pr of SPARK-38987, if the mergedChunk is zero-size, throwFetchFailedException is actually a SparkException, which will eventually cause the app to fail due to task failure 4 times. |
…ged shuffle chunk is zero-size ### What changes were proposed in this pull request? This is a backport PR of #38333. When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. ### Why are the changes needed? When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes #38751 from gaoyajun02/SPARK-40872-backport. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…huffle chunk is zero-size ### What changes were proposed in this pull request? When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. ### Why are the changes needed? When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes apache#38333 from gaoyajun02/SPARK-40872. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Mridul <mridul<at>gmail.com>
…huffle chunk is zero-size ### What changes were proposed in this pull request? When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. ### Why are the changes needed? When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes apache#38333 from gaoyajun02/SPARK-40872. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Mridul <mridul<at>gmail.com>
…huffle chunk is zero-size ### What changes were proposed in this pull request? When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. ### Why are the changes needed? When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes apache#38333 from gaoyajun02/SPARK-40872. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Mridul <mridul<at>gmail.com>
What changes were proposed in this pull request?
When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks.
Why are the changes needed?
When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT