Skip to content

Commit

Permalink
Last style fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kayousterhout committed Jul 15, 2015
1 parent bc69d2b commit 14bfcbb
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ private[spark] object MapOutputTracker extends Logging {
* If any of the statuses is null (indicating a missing location due to a failed mapper),
* throws a FetchFailedException.
*
* @param shuffleId Identifier for the shuffle
* @param reduceId Identifier for the reduce task
* @param statuses List of map statuses, indexed by map ID.
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
Expand All @@ -450,14 +453,14 @@ private[spark] object MapOutputTracker extends Logging {
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
for ((status, index) <- statuses.zipWithIndex) {
for ((status, mapId) <- statuses.zipWithIndex) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, reduceId, errorMessage)
} else {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, index, reduceId), status.getSizeForBlock(reduceId)))
((ShuffleBlockId(shuffleId, mapId, reduceId), status.getSizeForBlock(reduceId)))
}
}

Expand Down

0 comments on commit 14bfcbb

Please sign in to comment.