Skip to content

Commit

Permalink
Removed code duplication in ShuffleBlockFetcherIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
eracah committed Aug 28, 2015
1 parent bb7f352 commit 6b3df94
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator(
fetchRequests ++= Utils.randomize(remoteRequests)

// Send out initial requests for blocks, up to our maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
fetchUpToMaxBytes()

val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
Expand Down Expand Up @@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator(
case _ =>
}
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
fetchUpToMaxBytes()

result match {
case FailureFetchResult(blockId, address, e) =>
Expand All @@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator(
}
}

private def fetchUpToMaxBytes() {
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
}

private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = {
blockId match {
case ShuffleBlockId(shufId, mapId, reduceId) =>
Expand Down

0 comments on commit 6b3df94

Please sign in to comment.