Skip to content

Commit

Permalink
Revert "[SPARK-35011][CORE] Avoid Block Manager registrations when St…
Browse files Browse the repository at this point in the history
…opExecutor msg is in-flight"

This reverts commit b9e53f8.
  • Loading branch information
Ngone51 committed Sep 10, 2021
1 parent b52fbee commit 20fce6b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 109 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long]

private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf)
private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))

private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,6 @@ class BlockManagerMasterEndpoint(
mapper
}

private val executorTimeoutMs = Utils.executorTimeoutMs(conf)
private val blockManagerInfoCleaner = {
val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L)
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner")
executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay,
TimeUnit.MILLISECONDS)
executor
}

val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)

val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
Expand Down Expand Up @@ -282,12 +273,12 @@ class BlockManagerMasterEndpoint(
}
}
bmIdsExecutor.foreach { bmId =>
aliveBlockManagerInfo(bmId).foreach { bmInfo =>
blockManagerInfo.get(bmId).foreach { bmInfo =>
bmInfo.removeBlock(blockId)
}
}
}
val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo =>
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
Expand All @@ -314,7 +305,7 @@ class BlockManagerMasterEndpoint(
// Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
allAliveBlockManagerInfos.map { bm =>
blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
Expand All @@ -330,7 +321,7 @@ class BlockManagerMasterEndpoint(
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = allAliveBlockManagerInfos.filter { info =>
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
val futures = requiredBlockManagers.map { bm =>
Expand All @@ -346,24 +337,13 @@ class BlockManagerMasterEndpoint(
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
val info = blockManagerInfo(blockManagerId)

// SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal
// timestamp of the executor in BlockManagerInfo. This info will be removed from
// blockManagerInfo map by the blockManagerInfoCleaner once
// now() - info.executorRemovalTs > executorTimeoutMs.
//
// We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
// while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
// of executors in Spark.
// Delaying this removal until blockManagerInfoCleaner decides to remove it ensures
// BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed
// executor to reregister on BlockManagerHeartbeat message.
info.setExecutorRemovalTs()

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId)

// remove all the blocks.
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
Expand All @@ -384,7 +364,7 @@ class BlockManagerMasterEndpoint(
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
aliveBlockManagerInfo(candidateBMId).foreach { bm =>
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.storageEndpoint.ask[Boolean](replicateMsg)
Expand Down Expand Up @@ -420,16 +400,16 @@ class BlockManagerMasterEndpoint(
*/
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
try {
aliveBlockManagerInfo(blockManagerId).map { info =>
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
}.getOrElse(Seq.empty[ReplicateBlock])
val info = blockManagerInfo(blockManagerId)

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
} catch {
// If the block manager has already exited, nothing to replicate.
case e: java.util.NoSuchElementException =>
Expand All @@ -443,7 +423,8 @@ class BlockManagerMasterEndpoint(
val locations = blockLocations.get(blockId)
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
aliveBlockManagerInfo(blockManagerId).foreach { bm =>
val blockManager = blockManagerInfo.get(blockManagerId)
blockManager.foreach { bm =>
// Remove the block from the BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
Expand All @@ -458,14 +439,14 @@ class BlockManagerMasterEndpoint(

// Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
allAliveBlockManagerInfos.map { info =>
(info.blockManagerId, (info.maxMem, info.remainingMem))
blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}

private def storageStatus: Array[StorageStatus] = {
allAliveBlockManagerInfos.map { info =>
new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}
Expand All @@ -487,7 +468,7 @@ class BlockManagerMasterEndpoint(
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master endpoint's response to a previous message.
*/
allAliveBlockManagerInfos.map { info =>
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askStorageEndpoints) {
info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
Expand All @@ -511,7 +492,7 @@ class BlockManagerMasterEndpoint(
askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence(
allAliveBlockManagerInfos.map { info =>
blockManagerInfo.values.map { info =>
val future =
if (askStorageEndpoints) {
info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
Expand Down Expand Up @@ -581,10 +562,9 @@ class BlockManagerMasterEndpoint(
if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}

listenerBus.post(SparkListenerBlockManagerAdded(time, id,
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}

Expand Down Expand Up @@ -672,7 +652,7 @@ class BlockManagerMasterEndpoint(
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
} else {
aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId))
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
}
}

Expand All @@ -683,7 +663,8 @@ class BlockManagerMasterEndpoint(
// can be used to access this block even when the original executor is already stopped.
loc.host == requesterHost &&
(loc.port == externalShuffleServicePort ||
aliveBlockManagerInfo(loc)
blockManagerInfo
.get(loc)
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
.getOrElse(false))
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
Expand All @@ -700,7 +681,7 @@ class BlockManagerMasterEndpoint(

/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds
.filterNot { _.isDriver }
Expand Down Expand Up @@ -753,35 +734,15 @@ class BlockManagerMasterEndpoint(
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- aliveBlockManagerInfo(blockManagerId)
info <- blockManagerInfo.get(blockManagerId)
) yield {
info.storageEndpoint
}
}

override def onStop(): Unit = {
askThreadPool.shutdownNow()
blockManagerInfoCleaner.shutdownNow()
}

private def cleanBlockManagerInfo(): Unit = {
logDebug("Cleaning blockManagerInfo")
val now = System.currentTimeMillis()
val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
// bmInfo.executorRemovalTs.get cannot be None when BM is not alive
!bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
}.keys
expiredBmIds.foreach { bmId =>
logInfo(s"Cleaning expired $bmId from blockManagerInfo")
blockManagerInfo.remove(bmId)
}
}

@inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
blockManagerInfo.get(bmId).filter(_.isAlive)

@inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] =
blockManagerInfo.values.filter(_.isAlive)
}

@DeveloperApi
Expand Down Expand Up @@ -834,7 +795,6 @@ private[spark] class BlockManagerInfo(

private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
private var _executorRemovalTs: Option[Long] = None

// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]
Expand Down Expand Up @@ -949,16 +909,4 @@ private[spark] class BlockManagerInfo(
def clear(): Unit = {
_blocks.clear()
}

def executorRemovalTs: Option[Long] = _executorRemovalTs

def isAlive: Boolean = _executorRemovalTs.isEmpty

def setExecutorRemovalTs(): Unit = {
if (!isAlive) {
logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
} else {
_executorRemovalTs = Some(System.currentTimeMillis())
}
}
}
7 changes: 0 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3090,13 +3090,6 @@ private[spark] object Utils extends Logging {
}
}

def executorTimeoutMs(conf: SparkConf): Long = {
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}

/** Returns a string message about delegation token generation failure */
def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): String = {
val message = "Failed to get token from service %s due to %s. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, TimeoutException}
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.commons.lang3.RandomUtils
Expand Down Expand Up @@ -101,7 +101,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
.set(Network.RPC_ASK_TIMEOUT, "5s")
.set(PUSH_BASED_SHUFFLE_ENABLED, true)
.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s")
}

private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = {
Expand Down Expand Up @@ -611,7 +610,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
}

test("no reregistration on heart beat until executor timeout") {
test("reregistration on heart beat") {
val store = makeBlockManager(2000)
val a1 = new Array[Byte](400)

Expand All @@ -622,15 +621,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")

val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister == false, "master told to re-register")

eventually(timeout(10 seconds), interval(1 seconds)) {
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister, "master did not tell to re-register")
}
assert(reregister)
}

test("reregistration on block update") {
Expand All @@ -644,12 +638,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")

eventually(timeout(10 seconds), interval(1 seconds)) {
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister, "master did not tell to re-register")
}

store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()

Expand Down

0 comments on commit 20fce6b

Please sign in to comment.