Skip to content

Commit

Permalink
address comments, and fix the test error
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed May 5, 2017
1 parent 1d863b7 commit a3e979f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,14 @@ private[spark] class BlockManager(
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
val locs = Random.shuffle(master.getLocations(blockId))
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
val (sameRackLocs, differentRackLocs) = otherLocs.partition {
loc => blockManagerId.topologyInfo == loc.topologyInfo
blockManagerId.topologyInfo match {
case None => preferredLocs ++ otherLocs
case Some(_) =>
val (sameRackLocs, differentRackLocs) = otherLocs.partition {
loc => blockManagerId.topologyInfo == loc.topologyInfo
}
preferredLocs ++ sameRackLocs ++ differentRackLocs
}
preferredLocs ++ sameRackLocs ++ differentRackLocs
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}

test("optimize a location order of blocks without topology information") {
val localHost = Utils.localHostName()
val localHost = "localhost"
val otherHost = "otherHost"
val bmMaster = mock(classOf[BlockManagerMaster])
val bmId1 = BlockManagerId("id1", localHost, 1)
Expand All @@ -512,7 +512,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}

test("optimize a location order of blocks with topology information") {
val localHost = Utils.localHostName()
val localHost = "localhost"
val otherHost = "otherHost"
val localRack = "localRack"
val otherRack = "otherRack"
Expand All @@ -527,6 +527,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))

val blockManager = makeBlockManager(128, "exec", bmMaster)
blockManager.blockManagerId =
BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
Expand Down

0 comments on commit a3e979f

Please sign in to comment.