From fc36eb10856943fbeb6b0d099d130041d587edc6 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 4 Mar 2020 15:12:32 +0800 Subject: [PATCH] fix --- .../spark/storage/ShuffleBlockFetcherIterator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index cd4c86006af5a..052ba4434c99f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -339,14 +339,14 @@ final class ShuffleBlockFetcherIterator( + s"with ${blocks.size} blocks") } - def createFetchRequests(): Unit = { + def createFetchRequests(hasMore: Boolean): Unit = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) curBlocks = new ArrayBuffer[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { createFetchRequest(mergedBlocks) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => - if (blocks.length == maxBlocksInFlightPerAddress) { + if (blocks.length == maxBlocksInFlightPerAddress || !hasMore) { createFetchRequest(blocks) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back @@ -367,12 +367,12 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { - createFetchRequests() + createFetchRequests(true) } } // Add in the final request if (curBlocks.nonEmpty) { - createFetchRequests() + createFetchRequests(false) } }