From fe0392991716b040d9f585bbeee19119d2a1ddb7 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 15 Dec 2015 16:15:08 +0900 Subject: [PATCH 1/2] Optimize a location order of blocks by considering preferred local host --- .../apache/spark/storage/BlockManager.scala | 11 ++++++++++- .../spark/storage/BlockManagerSuite.scala | 19 ++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6074fc58d70db..e23e4cc9941fe 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,9 +578,18 @@ private[spark] class BlockManager( doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } + private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { + // Since block managers can share an identical host, we put these preferred + // locations first. + val locs = Random.shuffle(master.getLocations(blockId)) + val preferredLocs = locs.filter(loc => blockManagerId.host == loc.host) + val otherLocs = locs.filter(loc => blockManagerId.host != loc.host) + preferredLocs ++ otherLocs + } + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") - val locations = Random.shuffle(master.getLocations(blockId)) + val locations = getLocations(blockId) var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8a1aede..bf49be3d4c4fd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.implicitConversions import scala.language.postfixOps import org.mockito.Mockito.{mock, when} +import org.mockito.{Matchers => mc} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ @@ -66,7 +67,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private def makeBlockManager( maxMem: Long, - name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + name: String = SparkContext.DRIVER_IDENTIFIER, + master: BlockManagerMaster = this.master): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, @@ -451,6 +453,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } + test("optimize a location order of blocks") { + val localHost = Utils.localHostName() + val otherHost = "otherHost" + val bmMaster = mock(classOf[BlockManagerMaster]) + val bmId1 = BlockManagerId("id1", localHost, 1) + val bmId2 = BlockManagerId("id2", localHost, 2) + val bmId3 = BlockManagerId("id3", otherHost, 3) + when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3)) + + val blockManager = makeBlockManager(128, "exec", bmMaster) + val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) + val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) + assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) + } + test("SPARK-9591: getRemoteBytes from another location when Exception throw") { val origTimeoutOpt = conf.getOption("spark.network.timeout") try { From d962f15e186bfe77d3fb3e5e4ec44d10b5523c0f Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Mon, 21 Dec 2015 11:17:22 +0900 Subject: [PATCH 2/2] Apply comments --- .../scala/org/apache/spark/storage/BlockManager.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e23e4cc9941fe..74c4dd109987b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -579,11 +579,12 @@ private[spark] class BlockManager( } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { - // Since block managers can share an identical host, we put these preferred - // locations first. + /** + * Return a list of locations for the given block, prioritizing the local machine since + * multiple block managers can share the same host. + */ val locs = Random.shuffle(master.getLocations(blockId)) - val preferredLocs = locs.filter(loc => blockManagerId.host == loc.host) - val otherLocs = locs.filter(loc => blockManagerId.host != loc.host) + val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } preferredLocs ++ otherLocs }