Skip to content

Commit

Permalink
[SPARK-29298][CORE] Separate block manager heartbeat endpoint from dr…
Browse files Browse the repository at this point in the history
…iver endpoint
  • Loading branch information
LantaoJin committed Sep 30, 2019
1 parent 8167714 commit c348176
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 49 deletions.
31 changes: 18 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Expand Up @@ -337,19 +337,24 @@ object SparkEnv extends Logging {
None
}

val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(
rpcEnv,
isLocal,
conf,
listenerBus,
if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
externalShuffleClient
} else {
None
})),
conf, isDriver)
val blockManagerMaster = new BlockManagerMaster(
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(
rpcEnv,
isLocal,
conf,
listenerBus,
if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
externalShuffleClient
} else {
None
})),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal)),
conf,
isDriver)

val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
Expand Down
Expand Up @@ -267,7 +267,7 @@ private[spark] class DAGScheduler(
executorUpdates: mutable.Map[(Int, Int), ExecutorMetrics]): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
executorUpdates))
blockManagerMaster.driverEndpoint.askSync[Boolean](
blockManagerMaster.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
}

Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils}
private[spark]
class BlockManagerMaster(
var driverEndpoint: RpcEndpointRef,
var driverHeartbeatEndPoint: RpcEndpointRef,
conf: SparkConf,
isDriver: Boolean)
extends Logging {
Expand All @@ -38,6 +39,7 @@ class BlockManagerMaster(

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
driverHeartbeatEndPoint.askSync[Boolean](RemoveExecutor(execId))
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}
Expand All @@ -46,6 +48,7 @@ class BlockManagerMaster(
* This is only called on the driver side. Non-blocking
*/
def removeExecutorAsync(execId: String) {
driverHeartbeatEndPoint.ask[Boolean](RemoveExecutor(execId))
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}
Expand All @@ -62,8 +65,10 @@ class BlockManagerMaster(
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $id")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
val register =
RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
driverHeartbeatEndPoint.ask[Boolean](register)
val updatedId = driverEndpoint.askSync[BlockManagerId](register)
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
Expand All @@ -74,8 +79,9 @@ class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
val res = driverEndpoint.askSync[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
val update = UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
driverHeartbeatEndPoint.ask[Boolean](update)
val res = driverEndpoint.askSync[Boolean](update)
logDebug(s"Updated info of block $blockId")
res
}
Expand Down Expand Up @@ -230,6 +236,7 @@ class BlockManagerMaster(
if (driverEndpoint != null && isDriver) {
tell(StopBlockManagerMaster)
driverEndpoint = null
driverHeartbeatEndPoint = null
logInfo("BlockManagerMaster stopped")
}
}
Expand All @@ -245,4 +252,5 @@ class BlockManagerMaster(

private[spark] object BlockManagerMaster {
val DRIVER_ENDPOINT_NAME = "BlockManagerMaster"
val DRIVER_HEARTBEAT_ENDPOINT_NAME = "BlockManagerMasterHeartbeat"
}
Expand Up @@ -144,9 +144,6 @@ class BlockManagerMasterEndpoint(
case StopBlockManagerMaster =>
context.reply(true)
stop()

case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
}

private def removeRdd(rddId: Int): Future[Seq[Int]] = {
Expand Down Expand Up @@ -290,19 +287,6 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

/**
* Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
}
}

// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId) {
Expand Down Expand Up @@ -460,7 +444,6 @@ class BlockManagerMasterEndpoint(
}

if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}

Expand Down Expand Up @@ -585,26 +568,19 @@ private[spark] class BlockManagerInfo(

val externalShuffleServiceEnabled = externalShuffleServiceBlockStatus.isDefined

private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem

// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]

def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))

def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
}

def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {

updateLastSeenMs()

val blockExists = _blocks.containsKey(blockId)
var originalMemSize: Long = 0
var originalDiskSize: Long = 0
Expand Down Expand Up @@ -693,8 +669,6 @@ private[spark] class BlockManagerInfo(

def remainingMem: Long = _remainingMem

def lastSeenMs: Long = _lastSeenMs

def blocks: JHashMap[BlockId, BlockStatus] = _blocks

override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.BlockManagerMessages.{BlockManagerHeartbeat, RegisterBlockManager, RemoveExecutor, UpdateBlockInfo}

/**
* Separate heartbeat out of BlockManagerMasterEndpoint due to performance consideration.
*/
private[spark] class BlockManagerMasterHeartbeatEndpoint(
override val rpcEnv: RpcEnv,
isLocal: Boolean)
extends ThreadSafeRpcEndpoint with Logging {

// Mapping from block manager id to the block manager's information.
private val blockManagerLastSeen = new mutable.HashMap[BlockManagerId, Long]

// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, _, _, _, _) =>
updateLastSeenMs(blockManagerId)
blockManagerIdByExecutor(blockManagerId.executorId) = blockManagerId
context.reply(true)

case UpdateBlockInfo(blockManagerId, _, _, _, _) =>
updateLastSeenMs(blockManagerId)
context.reply(true)

case RemoveExecutor(execId) =>
blockManagerIdByExecutor.get(execId).foreach(blockManagerLastSeen.remove)
blockManagerIdByExecutor -= execId
context.reply(true)

case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
}

/**
* Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerLastSeen.contains(blockManagerId)) {
blockManagerId.isDriver && !isLocal
} else {
updateLastSeenMs(blockManagerId)
true
}
}

def updateLastSeenMs(blockManagerId: BlockManagerId) {
blockManagerLastSeen(blockManagerId) = System.currentTimeMillis()
}
}
Expand Up @@ -245,7 +245,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
Expand Down
Expand Up @@ -99,7 +99,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(conf), None)), conf, true)
new LiveListenerBus(conf), None)), rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true)), conf, true)
allStores.clear()
}

Expand Down
Expand Up @@ -145,7 +145,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
when(sc.conf).thenReturn(conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(conf), None)), conf, true)
new LiveListenerBus(conf), None)), rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true)), conf, true)

val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
Expand Down Expand Up @@ -419,7 +420,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")

val reregister = !master.driverEndpoint.askSync[Boolean](
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister)
}
Expand Down
Expand Up @@ -89,7 +89,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)

blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(conf), None)), conf, true)
new LiveListenerBus(conf), None)), rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true)), conf, true)

storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)
Expand Down

0 comments on commit c348176

Please sign in to comment.