-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1045] fix(server): shuffle server may hang when restart worker due to multi require-momery and no require-momery release #1058
Conversation
@zuston Could you help me review this pr? |
@@ -408,6 +408,10 @@ public long getPartitionDataSize(String appId, int shuffleId, int partitionId) { | |||
|
|||
public long requireBuffer( | |||
String appId, int shuffleId, List<Integer> partitionIds, int requireSize) { | |||
boolean isRegistered = shuffleBufferManager.checkIfRegistered(appId, shuffleId, partitionIds); | |||
if (!isRegistered) { | |||
return -4; // Keep the same with StatusCode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could give -1
and -4
a error code? like
enum ERRORCODE {
NO_BUFFER(-1),
NO_REGISTER(-4)
}
Or we can reuse the error code which existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find there is no error code existed I can use, and I craete RequireBufferStatusCode
@@ -186,6 +187,7 @@ protected FetchResult callInternal() throws Exception { | |||
taskIdBitmap, | |||
new ArrayList<>(serverInfoSet), | |||
readerJobConf, | |||
new TezIdHelper(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this another bug? Will it cause correctness issues?
@@ -189,6 +189,20 @@ private void updateShuffleSize(String appId, int shuffleId, long size) { | |||
shuffleIdToSize.get(shuffleId).addAndGet(size); | |||
} | |||
|
|||
public boolean checkIfRegistered(String appId, int shuffleId, List<Integer> partitionIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use ShuffleTaskManager#shuffleTaskInfos
to judge whether is registered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I have similar thought. The appId + shuffleId check existence are enough. Right?
Codecov Report
@@ Coverage Diff @@
## master #1058 +/- ##
============================================
+ Coverage 54.18% 55.27% +1.09%
+ Complexity 2549 2546 -3
============================================
Files 386 367 -19
Lines 21861 19537 -2324
Branches 1813 1815 +2
============================================
- Hits 11845 10799 -1046
+ Misses 9315 8113 -1202
+ Partials 701 625 -76
... and 37 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
How about the spark test code, I did not change the spark test. |
But you modify the code shuffle server, Spark integration tests will use shuffle server. |
Ok, may be the spark test not register, and I only changed register logical. |
Spark register the application in the RssShuffleManager. |
In fact, likes not registered from my test and log or maybe other interface |
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc(); | ||
return -1; | ||
} | ||
if (null == shuffleTaskInfo && !"EMPTY".equals(appId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we add EMPTY
here? It's weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @lifeSo
if (null == shuffleTaskInfo) { | ||
return RequireBufferStatusCode.NO_REGISTER.statusCode(); | ||
} | ||
for (int partitionId : partitionIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston Could you take another look at this place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the change is right. But I will introduce new grpc status code like BUFFER_LIMIT_FOR_HUGE_PARTITION
instead of NO_BUFFER
@zuston Could you help me this pr? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.
@@ -133,7 +133,7 @@ public void copyFromRssServer() throws IOException { | |||
hasPendingData = false; | |||
uncompressedData = null; | |||
} else { | |||
LOG.info("uncompressedData is null"); | |||
LOG.info("UncompressedData is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not related with current issue.
Anyway, let's reserve these changes.
if (null == shuffleTaskInfo) { | ||
return RequireBufferStatusCode.NO_REGISTER.statusCode(); | ||
} | ||
for (int partitionId : partitionIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the change is right. But I will introduce new grpc status code like BUFFER_LIMIT_FOR_HUGE_PARTITION
instead of NO_BUFFER
What changes were proposed in this pull request?
shuffle server may hang when restart worker due to multi require-momery and no require-momery release.
So, fix the bug
Why are the changes needed?
shuffle server may hang when restart worker due to multi require-momery and no require-momery release.
So, fix the bug
Fix: #1045
Does this PR introduce any user-facing change?
No.
How was this patch tested?
unit test