diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 14895b39b24bc..33a78cd2dfa55 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -337,19 +337,24 @@ object SparkEnv extends Logging { None } - val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( - BlockManagerMaster.DRIVER_ENDPOINT_NAME, - new BlockManagerMasterEndpoint( - rpcEnv, - isLocal, - conf, - listenerBus, - if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) { - externalShuffleClient - } else { - None - })), - conf, isDriver) + val blockManagerMaster = new BlockManagerMaster( + registerOrLookupEndpoint( + BlockManagerMaster.DRIVER_ENDPOINT_NAME, + new BlockManagerMasterEndpoint( + rpcEnv, + isLocal, + conf, + listenerBus, + if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) { + externalShuffleClient + } else { + None + })), + registerOrLookupEndpoint( + BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, + new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal)), + conf, + isDriver) val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ee3e9cc03b966..c8fffead9941b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -267,7 +267,7 @@ private[spark] class DAGScheduler( executorUpdates: mutable.Map[(Int, Int), ExecutorMetrics]): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates, executorUpdates)) - blockManagerMaster.driverEndpoint.askSync[Boolean]( + blockManagerMaster.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat")) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 9d13fedfb0c58..d96b9afd665a6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] class BlockManagerMaster( var driverEndpoint: RpcEndpointRef, + var driverHeartbeatEndPoint: RpcEndpointRef, conf: SparkConf, isDriver: Boolean) extends Logging { @@ -38,6 +39,7 @@ class BlockManagerMaster( /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { + driverHeartbeatEndPoint.askSync[Boolean](RemoveExecutor(execId)) tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") } @@ -46,6 +48,7 @@ class BlockManagerMaster( * This is only called on the driver side. Non-blocking */ def removeExecutorAsync(execId: String) { + driverHeartbeatEndPoint.ask[Boolean](RemoveExecutor(execId)) driverEndpoint.ask[Boolean](RemoveExecutor(execId)) logInfo("Removal of executor " + execId + " requested") } @@ -62,8 +65,10 @@ class BlockManagerMaster( maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $id") - val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + val register = + RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) + driverHeartbeatEndPoint.ask[Boolean](register) + val updatedId = driverEndpoint.askSync[BlockManagerId](register) logInfo(s"Registered BlockManager $updatedId") updatedId } @@ -74,8 +79,9 @@ class BlockManagerMaster( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { - val res = driverEndpoint.askSync[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) + val update = UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + driverHeartbeatEndPoint.ask[Boolean](update) + val res = driverEndpoint.askSync[Boolean](update) logDebug(s"Updated info of block $blockId") res } @@ -230,6 +236,7 @@ class BlockManagerMaster( if (driverEndpoint != null && isDriver) { tell(StopBlockManagerMaster) driverEndpoint = null + driverHeartbeatEndPoint = null logInfo("BlockManagerMaster stopped") } } @@ -245,4 +252,5 @@ class BlockManagerMaster( private[spark] object BlockManagerMaster { val DRIVER_ENDPOINT_NAME = "BlockManagerMaster" + val DRIVER_HEARTBEAT_ENDPOINT_NAME = "BlockManagerMasterHeartbeat" } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 5e021b334fd2b..a6e02f26985b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -144,9 +144,6 @@ class BlockManagerMasterEndpoint( case StopBlockManagerMaster => context.reply(true) stop() - - case BlockManagerHeartbeat(blockManagerId) => - context.reply(heartbeatReceived(blockManagerId)) } private def removeRdd(rddId: Int): Future[Seq[Int]] = { @@ -290,19 +287,6 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } - /** - * Return true if the driver knows about the given block manager. Otherwise, return false, - * indicating that the block manager should re-register. - */ - private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = { - if (!blockManagerInfo.contains(blockManagerId)) { - blockManagerId.isDriver && !isLocal - } else { - blockManagerInfo(blockManagerId).updateLastSeenMs() - true - } - } - // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. private def removeBlockFromWorkers(blockId: BlockId) { @@ -460,7 +444,6 @@ class BlockManagerMasterEndpoint( } if (blockId == null) { - blockManagerInfo(blockManagerId).updateLastSeenMs() return true } @@ -585,7 +568,6 @@ private[spark] class BlockManagerInfo( val externalShuffleServiceEnabled = externalShuffleServiceBlockStatus.isDefined - private var _lastSeenMs: Long = timeMs private var _remainingMem: Long = maxMem // Mapping from block id to its status. @@ -593,18 +575,12 @@ private[spark] class BlockManagerInfo( def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId)) - def updateLastSeenMs() { - _lastSeenMs = System.currentTimeMillis() - } - def updateBlockInfo( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long) { - updateLastSeenMs() - val blockExists = _blocks.containsKey(blockId) var originalMemSize: Long = 0 var originalDiskSize: Long = 0 @@ -693,8 +669,6 @@ private[spark] class BlockManagerInfo( def remainingMem: Long = _remainingMem - def lastSeenMs: Long = _lastSeenMs - def blocks: JHashMap[BlockId, BlockStatus] = _blocks override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala new file mode 100644 index 0000000000000..3ec44ba31c049 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterHeartbeatEndpoint.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.storage.BlockManagerMessages.{BlockManagerHeartbeat, RegisterBlockManager, RemoveExecutor, UpdateBlockInfo} + +/** + * Separate heartbeat out of BlockManagerMasterEndpoint due to performance consideration. + */ +private[spark] class BlockManagerMasterHeartbeatEndpoint( + override val rpcEnv: RpcEnv, + isLocal: Boolean) + extends ThreadSafeRpcEndpoint with Logging { + + // Mapping from block manager id to the block manager's information. + private val blockManagerLastSeen = new mutable.HashMap[BlockManagerId, Long] + + // Mapping from executor ID to block manager ID. + private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RegisterBlockManager(blockManagerId, _, _, _, _) => + updateLastSeenMs(blockManagerId) + blockManagerIdByExecutor(blockManagerId.executorId) = blockManagerId + context.reply(true) + + case UpdateBlockInfo(blockManagerId, _, _, _, _) => + updateLastSeenMs(blockManagerId) + context.reply(true) + + case RemoveExecutor(execId) => + blockManagerIdByExecutor.get(execId).foreach(blockManagerLastSeen.remove) + blockManagerIdByExecutor -= execId + context.reply(true) + + case BlockManagerHeartbeat(blockManagerId) => + context.reply(heartbeatReceived(blockManagerId)) + } + + /** + * Return true if the driver knows about the given block manager. Otherwise, return false, + * indicating that the block manager should re-register. + */ + private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = { + if (!blockManagerLastSeen.contains(blockManagerId)) { + blockManagerId.isDriver && !isLocal + } else { + updateLastSeenMs(blockManagerId) + true + } + } + + def updateLastSeenMs(blockManagerId: BlockManagerId) { + blockManagerLastSeen(blockManagerId) = System.currentTimeMillis() + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e41d10709b0f4..a205b26d965c2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -245,7 +245,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf, true) { + val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) { override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 05a9ac685e5e7..9dd98fd45fa3f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -99,7 +99,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None)), conf, true) + new LiveListenerBus(conf), None)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", + new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true)), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 509d4efcab67a..064f47b6bc835 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -145,7 +145,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE when(sc.conf).thenReturn(conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None)), conf, true) + new LiveListenerBus(conf), None)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", + new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true)), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() @@ -419,7 +420,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - val reregister = !master.driverEndpoint.askSync[Boolean]( + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerHeartbeat(store.blockManagerId)) assert(reregister) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index c8f424af9af01..391cbb4fa9837 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -89,7 +89,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None)), conf, true) + new LiveListenerBus(conf), None)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", + new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true)), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf)