diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java index 96fa6f29eb..c465034e24 100644 --- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java +++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java @@ -276,12 +276,6 @@ public void waitSendFinished() { sendBuffersToServers(); } long start = System.currentTimeMillis(); - long commitDuration = 0; - if (!isMemoryShuffleEnabled) { - long s = System.currentTimeMillis(); - sendCommit(); - commitDuration = System.currentTimeMillis() - s; - } while (true) { if (failedBlockIds.size() > 0) { String errorMsg = "Send failed: failed because " + failedBlockIds.size() @@ -302,6 +296,12 @@ public void waitSendFinished() { throw new RssException(errorMsg); } } + long commitDuration = 0; + if (!isMemoryShuffleEnabled) { + long s = System.currentTimeMillis(); + sendCommit(); + commitDuration = System.currentTimeMillis() - s; + } start = System.currentTimeMillis(); TezVertexID tezVertexID = tezTaskAttemptID.getTaskID().getVertexID(); TezDAGID tezDAGID = tezVertexID.getDAGId(); diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index ce3a8a48ea..2b581656bf 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -195,7 +195,6 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte double sendThreshold = 0.2f; int batch = 50; int numMaps = 1; - String storageType = "MEMORY"; RssConf rssConf = new RssConf(); Map> partitionToServers = new HashMap<>(); long sendCheckInterval = 500L; @@ -209,7 +208,7 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte comparator, maxSegmentSize, keySerializer, valSerializer, maxBufferSize, memoryThreshold, sendThreshold, batch, rssConf, partitionToServers, - numMaps, isMemoryShuffleEnabled(storageType), + numMaps, false, sendCheckInterval, sendCheckTimeout, bitmapSplitNum, shuffleId, true); Random random = new Random(); @@ -221,6 +220,7 @@ numMaps, isMemoryShuffleEnabled(storageType), int partitionId = random.nextInt(50); bufferManager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value)); } + bufferManager.waitSendFinished(); assertTrue(bufferManager.getWaitSendBuffers().isEmpty()); assertEquals(writeClient.mockedShuffleServer.getFinishBlockSize(),