-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#477] feat(spark): support getShuffleResult throws FetchFailedException. #1004
Conversation
cc @advancedxy @jerqi PTAL. |
I try to shutdown a ShuffleServer during app running, it will throws FetchFailed, you can see here Fetch Failed image. |
Codecov Report
@@ Coverage Diff @@
## master #1004 +/- ##
============================================
+ Coverage 53.66% 54.76% +1.09%
- Complexity 2523 2526 +3
============================================
Files 382 362 -20
Lines 21672 19333 -2339
Branches 1795 1799 +4
============================================
- Hits 11630 10587 -1043
+ Misses 9338 8115 -1223
+ Partials 704 631 -73
... and 27 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Seems flaky test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for submitting this PR, left some minor comments.
However maybe we need some refactor to:
- move
getShuffleResult
into ShuffleReadClient - the get result part should also in the
RssShuffleReader
, that might make the whole code a bit clearer.
But let's do that in follow-up prs.
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); | ||
if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) | ||
&& RssSparkShuffleUtils.isStageResubmitSupported()) { | ||
String driver = rssConf.getString("driver.host", ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT); | ||
try (ShuffleManagerClient client = ShuffleManagerClientFactory | ||
.getInstance().createShuffleManagerClient(ClientType.GRPC, driver, port)) { | ||
RssReportShuffleFetchFailureRequest req = new RssReportShuffleFetchFailureRequest( | ||
appId, shuffleId, stageAttemptId, partitionId, e.getMessage()); | ||
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req); | ||
if (response.getReSubmitWholeStage()) { | ||
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is provided. | ||
FetchFailedException ffe = | ||
RssSparkShuffleUtils.createFetchFailedException(shuffleId, -1, partitionId, e); | ||
throw new RssException(ffe); | ||
} | ||
} catch (IOException ioe) { | ||
LOG.info("Error closing shuffle manager client with error:", ioe); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we extract this into a utility method and reuse code for both spark 2 and spark 3?
RssReportShuffleFetchFailureRequest req = new RssReportShuffleFetchFailureRequest( | ||
appId, shuffleId, stageAttemptId, partitionId, e.getMessage()); | ||
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req); | ||
if (response.getReSubmitWholeStage()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this part, I think we should try to report all the shuffle fetch failures then throw FetchFailedException if any of response indicates the whole stage should be re-submitted.
P.S: one todo is create a new rpc interface to report failures in batch.
server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
Show resolved
Hide resolved
I will raise sub PRs to address the following problem:
|
Flaky test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except one minor comment. Thanks for your work.
cc @smallzhongfeng , Can you help review plz? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for ping me. LTGM!
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT); | ||
try (ShuffleManagerClient client = ShuffleManagerClientFactory | ||
.getInstance().createShuffleManagerClient(ClientType.GRPC, driver, port)) { | ||
// todo: Create a new rpc interface to report failures in batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remember create a new issue for this.
@@ -581,4 +582,15 @@ public int getPartitionNum(int shuffleId) { | |||
public int getNumMaps(int shuffleId) { | |||
return shuffleIdToNumMapTasks.getOrDefault(shuffleId, 0); | |||
} | |||
|
|||
private Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet, | |||
String appId, int shuffleId, int partitionId, int stageAttemptId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: code indentation
int currentFailedReadRequest = failedReadRequest.getAndIncrement(); | ||
if (currentFailedReadRequest < numOfFailedReadRequest) { | ||
LOG.info("This request is failed as mocked failure, current/firstN: {}/{}", | ||
currentFailedReadRequest, numOfFailedReadRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
int currentFailedReadRequest = failedReadRequest.getAndIncrement(); | ||
if (currentFailedReadRequest < numOfFailedReadRequest) { | ||
LOG.info("This request is failed as mocked failure, current/firstN: {}/{}", | ||
currentFailedReadRequest, numOfFailedReadRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
you can run this command to fix spotless issues.
|
Thank you. Already fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @leixm @smallzhongfeng @advancedxy , wait for CI.
Merged to master. |
What changes were proposed in this pull request?
feat #477
Why are the changes needed?
support getShuffleResult throws FetchFailedException.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Exiting UTs, Already added getShuffleResult failed, see
MockedShuffleServerGrpcService
.