Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngone51 committed Mar 4, 2020
1 parent ebfff7a commit fc36eb1
Showing 1 changed file with 4 additions and 4 deletions.
Expand Up @@ -339,14 +339,14 @@ final class ShuffleBlockFetcherIterator(
+ s"with ${blocks.size} blocks")
}

def createFetchRequests(): Unit = {
def createFetchRequests(hasMore: Boolean): Unit = {
val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
curBlocks = new ArrayBuffer[FetchBlockInfo]
if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
createFetchRequest(mergedBlocks)
} else {
mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
if (blocks.length == maxBlocksInFlightPerAddress) {
if (blocks.length == maxBlocksInFlightPerAddress || !hasMore) {
createFetchRequest(blocks)
} else {
// The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back
Expand All @@ -367,12 +367,12 @@ final class ShuffleBlockFetcherIterator(
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
createFetchRequests()
createFetchRequests(true)
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
createFetchRequests()
createFetchRequests(false)
}
}

Expand Down

0 comments on commit fc36eb1

Please sign in to comment.