Skip to content

Commit

Permalink
[#886] fix(tez): Tez Client may lost data or throw exception when rss…
Browse files Browse the repository at this point in the history
….storage.type without MEMORY. (#976)
  • Loading branch information
zhengchenyu committed Jun 27, 2023
1 parent 431cb78 commit bb61efb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,6 @@ public void waitSendFinished() {
sendBuffersToServers();
}
long start = System.currentTimeMillis();
long commitDuration = 0;
if (!isMemoryShuffleEnabled) {
long s = System.currentTimeMillis();
sendCommit();
commitDuration = System.currentTimeMillis() - s;
}
while (true) {
if (failedBlockIds.size() > 0) {
String errorMsg = "Send failed: failed because " + failedBlockIds.size()
Expand All @@ -302,6 +296,12 @@ public void waitSendFinished() {
throw new RssException(errorMsg);
}
}
long commitDuration = 0;
if (!isMemoryShuffleEnabled) {
long s = System.currentTimeMillis();
sendCommit();
commitDuration = System.currentTimeMillis() - s;
}
start = System.currentTimeMillis();
TezVertexID tezVertexID = tezTaskAttemptID.getTaskID().getVertexID();
TezDAGID tezDAGID = tezVertexID.getDAGId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
String storageType = "MEMORY";
RssConf rssConf = new RssConf();
Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
long sendCheckInterval = 500L;
Expand All @@ -209,7 +208,7 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte
comparator, maxSegmentSize, keySerializer,
valSerializer, maxBufferSize, memoryThreshold,
sendThreshold, batch, rssConf, partitionToServers,
numMaps, isMemoryShuffleEnabled(storageType),
numMaps, false,
sendCheckInterval, sendCheckTimeout, bitmapSplitNum, shuffleId, true);

Random random = new Random();
Expand All @@ -221,6 +220,7 @@ numMaps, isMemoryShuffleEnabled(storageType),
int partitionId = random.nextInt(50);
bufferManager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
}
bufferManager.waitSendFinished();

assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
assertEquals(writeClient.mockedShuffleServer.getFinishBlockSize(),
Expand Down

0 comments on commit bb61efb

Please sign in to comment.