diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 971cbd2aeba1f..71419d5aea0b4 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new HashMap[String, Long] - private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf) + private val executorTimeoutMs = sc.conf.get( + config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT + ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7d068fd695299..61591b020c600 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -96,15 +96,6 @@ class BlockManagerMasterEndpoint( mapper } - private val executorTimeoutMs = Utils.executorTimeoutMs(conf) - private val blockManagerInfoCleaner = { - val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L) - val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner") - executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay, - TimeUnit.MILLISECONDS) - executor - } - val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE) val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf) @@ -282,12 +273,12 @@ class BlockManagerMasterEndpoint( } } bmIdsExecutor.foreach { bmId => - aliveBlockManagerInfo(bmId).foreach { bmInfo => + blockManagerInfo.get(bmId).foreach { bmInfo => bmInfo.removeBlock(blockId) } } } - val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo => + val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo => bmInfo.storageEndpoint.ask[Int](removeMsg).recover { // use 0 as default value means no blocks were removed handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0) @@ -313,7 +304,7 @@ class BlockManagerMasterEndpoint( // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) Future.sequence( - allAliveBlockManagerInfos.map { bm => + blockManagerInfo.values.map { bm => bm.storageEndpoint.ask[Boolean](removeMsg).recover { // use false as default value means no shuffle data were removed handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) @@ -329,7 +320,7 @@ class BlockManagerMasterEndpoint( */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) - val requiredBlockManagers = allAliveBlockManagerInfos.filter { info => + val requiredBlockManagers = blockManagerInfo.values.filter { info => removeFromDriver || !info.blockManagerId.isDriver } val futures = requiredBlockManagers.map { bm => @@ -345,24 +336,13 @@ class BlockManagerMasterEndpoint( private def removeBlockManager(blockManagerId: BlockManagerId): Unit = { val info = blockManagerInfo(blockManagerId) - // SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal - // timestamp of the executor in BlockManagerInfo. This info will be removed from - // blockManagerInfo map by the blockManagerInfoCleaner once - // now() - info.executorRemovalTs > executorTimeoutMs. - // - // We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration - // while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping - // of executors in Spark. - // Delaying this removal until blockManagerInfoCleaner decides to remove it ensures - // BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed - // executor to reregister on BlockManagerHeartbeat message. - info.setExecutorRemovalTs() - // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId decommissioningBlockManagerSet.remove(blockManagerId) - // remove all the blocks. + // Remove it from blockManagerInfo and remove all the blocks. + blockManagerInfo.remove(blockManagerId) + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -383,7 +363,7 @@ class BlockManagerMasterEndpoint( val i = (new Random(blockId.hashCode)).nextInt(locations.size) val blockLocations = locations.toSeq val candidateBMId = blockLocations(i) - aliveBlockManagerInfo(candidateBMId).foreach { bm => + blockManagerInfo.get(candidateBMId).foreach { bm => val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) bm.storageEndpoint.ask[Boolean](replicateMsg) @@ -419,16 +399,16 @@ class BlockManagerMasterEndpoint( */ private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { try { - aliveBlockManagerInfo(blockManagerId).map { info => - val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) - rddBlocks.map { blockId => - val currentBlockLocations = blockLocations.get(blockId) - val maxReplicas = currentBlockLocations.size + 1 - val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) - val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) - replicateMsg - }.toSeq - }.getOrElse(Seq.empty[ReplicateBlock]) + val info = blockManagerInfo(blockManagerId) + + val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) + rddBlocks.map { blockId => + val currentBlockLocations = blockLocations.get(blockId) + val maxReplicas = currentBlockLocations.size + 1 + val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) + val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) + replicateMsg + }.toSeq } catch { // If the block manager has already exited, nothing to replicate. case e: java.util.NoSuchElementException => @@ -442,7 +422,8 @@ class BlockManagerMasterEndpoint( val locations = blockLocations.get(blockId) if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => - aliveBlockManagerInfo(blockManagerId).foreach { bm => + val blockManager = blockManagerInfo.get(blockManagerId) + blockManager.foreach { bm => // Remove the block from the BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. @@ -457,14 +438,14 @@ class BlockManagerMasterEndpoint( // Return a map from the block manager id to max memory and remaining memory. private def memoryStatus: Map[BlockManagerId, (Long, Long)] = { - allAliveBlockManagerInfos.map { info => - (info.blockManagerId, (info.maxMem, info.remainingMem)) + blockManagerInfo.map { case(blockManagerId, info) => + (blockManagerId, (info.maxMem, info.remainingMem)) }.toMap } private def storageStatus: Array[StorageStatus] = { - allAliveBlockManagerInfos.map { info => - new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem), + blockManagerInfo.map { case (blockManagerId, info) => + new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem), Some(info.maxOffHeapMem), info.blocks.asScala) }.toArray } @@ -486,7 +467,7 @@ class BlockManagerMasterEndpoint( * Futures to avoid potential deadlocks. This can arise if there exists a block manager * that is also waiting for this master endpoint's response to a previous message. */ - allAliveBlockManagerInfos.map { info => + blockManagerInfo.values.map { info => val blockStatusFuture = if (askStorageEndpoints) { info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus) @@ -510,7 +491,7 @@ class BlockManagerMasterEndpoint( askStorageEndpoints: Boolean): Future[Seq[BlockId]] = { val getMatchingBlockIds = GetMatchingBlockIds(filter) Future.sequence( - allAliveBlockManagerInfos.map { info => + blockManagerInfo.values.map { info => val future = if (askStorageEndpoints) { info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds) @@ -576,10 +557,9 @@ class BlockManagerMasterEndpoint( if (pushBasedShuffleEnabled) { addMergerLocation(id) } - - listenerBus.post(SparkListenerBlockManagerAdded(time, id, - maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) } + listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, + Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) id } @@ -667,7 +647,7 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) + blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) } } @@ -678,7 +658,8 @@ class BlockManagerMasterEndpoint( // can be used to access this block even when the original executor is already stopped. loc.host == requesterHost && (loc.port == externalShuffleServicePort || - aliveBlockManagerInfo(loc) + blockManagerInfo + .get(loc) .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) .getOrElse(false)) }.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) } @@ -695,7 +676,7 @@ class BlockManagerMasterEndpoint( /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet + val blockManagerIds = blockManagerInfo.keySet if (blockManagerIds.contains(blockManagerId)) { blockManagerIds .filterNot { _.isDriver } @@ -747,7 +728,7 @@ class BlockManagerMasterEndpoint( private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); - info <- aliveBlockManagerInfo(blockManagerId) + info <- blockManagerInfo.get(blockManagerId) ) yield { info.storageEndpoint } @@ -755,27 +736,7 @@ class BlockManagerMasterEndpoint( override def onStop(): Unit = { askThreadPool.shutdownNow() - blockManagerInfoCleaner.shutdownNow() - } - - private def cleanBlockManagerInfo(): Unit = { - logDebug("Cleaning blockManagerInfo") - val now = System.currentTimeMillis() - val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) => - // bmInfo.executorRemovalTs.get cannot be None when BM is not alive - !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs - }.keys - expiredBmIds.foreach { bmId => - logInfo(s"Cleaning expired $bmId from blockManagerInfo") - blockManagerInfo.remove(bmId) - } } - - @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] = - blockManagerInfo.get(bmId).filter(_.isAlive) - - @inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] = - blockManagerInfo.values.filter(_.isAlive) } @DeveloperApi @@ -803,7 +764,6 @@ private[spark] class BlockManagerInfo( private var _lastSeenMs: Long = timeMs private var _remainingMem: Long = maxMem - private var _executorRemovalTs: Option[Long] = None // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] @@ -918,16 +878,4 @@ private[spark] class BlockManagerInfo( def clear(): Unit = { _blocks.clear() } - - def executorRemovalTs: Option[Long] = _executorRemovalTs - - def isAlive: Boolean = _executorRemovalTs.isEmpty - - def setExecutorRemovalTs(): Unit = { - if (!isAlive) { - logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}") - } else { - _executorRemovalTs = Some(System.currentTimeMillis()) - } - } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 06241dd2a221b..1643aa68cdb5a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3057,13 +3057,6 @@ private[spark] object Utils extends Logging { 0 } } - - def executorTimeoutMs(conf: SparkConf): Long = { - // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses - // "milliseconds" - conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT) - .getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s")) - } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 5a8933ebb3c24..cd319daccc0db 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Future, TimeoutException} import scala.concurrent.duration._ -import scala.language.{implicitConversions, postfixOps} +import scala.language.implicitConversions import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils @@ -101,7 +101,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) .set(Network.RPC_ASK_TIMEOUT, "5s") .set(PUSH_BASED_SHUFFLE_ENABLED, true) - .set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s") } private def makeSortShuffleManager(): SortShuffleManager = { @@ -611,7 +610,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt()) } - test("no reregistration on heart beat until executor timeout") { + test("reregistration on heart beat") { val store = makeBlockManager(2000) val a1 = new Array[Byte](400) @@ -622,15 +621,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister == false, "master told to re-register") - - eventually(timeout(10 seconds), interval(1 seconds)) { - val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( - BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister, "master did not tell to re-register") - } + assert(reregister) } test("reregistration on block update") { @@ -644,12 +638,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - eventually(timeout(10 seconds), interval(1 seconds)) { - val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( - BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister, "master did not tell to re-register") - } - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister()