Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight" #33959

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long]

private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf)
private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))

private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,6 @@ class BlockManagerMasterEndpoint(
mapper
}

private val executorTimeoutMs = Utils.executorTimeoutMs(conf)
private val blockManagerInfoCleaner = {
val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L)
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner")
executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay,
TimeUnit.MILLISECONDS)
executor
}

val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)

val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
Expand Down Expand Up @@ -282,12 +273,12 @@ class BlockManagerMasterEndpoint(
}
}
bmIdsExecutor.foreach { bmId =>
aliveBlockManagerInfo(bmId).foreach { bmInfo =>
blockManagerInfo.get(bmId).foreach { bmInfo =>
bmInfo.removeBlock(blockId)
}
}
}
val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo =>
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
Expand All @@ -314,7 +305,7 @@ class BlockManagerMasterEndpoint(
// Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
allAliveBlockManagerInfos.map { bm =>
blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
Expand All @@ -330,7 +321,7 @@ class BlockManagerMasterEndpoint(
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = allAliveBlockManagerInfos.filter { info =>
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
val futures = requiredBlockManagers.map { bm =>
Expand All @@ -346,24 +337,13 @@ class BlockManagerMasterEndpoint(
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
val info = blockManagerInfo(blockManagerId)

// SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal
// timestamp of the executor in BlockManagerInfo. This info will be removed from
// blockManagerInfo map by the blockManagerInfoCleaner once
// now() - info.executorRemovalTs > executorTimeoutMs.
//
// We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
// while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
// of executors in Spark.
// Delaying this removal until blockManagerInfoCleaner decides to remove it ensures
// BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed
// executor to reregister on BlockManagerHeartbeat message.
info.setExecutorRemovalTs()

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId)

// remove all the blocks.
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
Expand All @@ -384,7 +364,7 @@ class BlockManagerMasterEndpoint(
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
aliveBlockManagerInfo(candidateBMId).foreach { bm =>
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.storageEndpoint.ask[Boolean](replicateMsg)
Expand Down Expand Up @@ -420,16 +400,16 @@ class BlockManagerMasterEndpoint(
*/
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
try {
aliveBlockManagerInfo(blockManagerId).map { info =>
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
}.getOrElse(Seq.empty[ReplicateBlock])
val info = blockManagerInfo(blockManagerId)

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
} catch {
// If the block manager has already exited, nothing to replicate.
case e: java.util.NoSuchElementException =>
Expand All @@ -443,7 +423,8 @@ class BlockManagerMasterEndpoint(
val locations = blockLocations.get(blockId)
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
aliveBlockManagerInfo(blockManagerId).foreach { bm =>
val blockManager = blockManagerInfo.get(blockManagerId)
blockManager.foreach { bm =>
// Remove the block from the BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
Expand All @@ -458,14 +439,14 @@ class BlockManagerMasterEndpoint(

// Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
allAliveBlockManagerInfos.map { info =>
(info.blockManagerId, (info.maxMem, info.remainingMem))
blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}

private def storageStatus: Array[StorageStatus] = {
allAliveBlockManagerInfos.map { info =>
new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}
Expand All @@ -487,7 +468,7 @@ class BlockManagerMasterEndpoint(
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master endpoint's response to a previous message.
*/
allAliveBlockManagerInfos.map { info =>
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askStorageEndpoints) {
info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
Expand All @@ -511,7 +492,7 @@ class BlockManagerMasterEndpoint(
askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence(
allAliveBlockManagerInfos.map { info =>
blockManagerInfo.values.map { info =>
val future =
if (askStorageEndpoints) {
info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
Expand Down Expand Up @@ -581,10 +562,9 @@ class BlockManagerMasterEndpoint(
if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}

listenerBus.post(SparkListenerBlockManagerAdded(time, id,
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}

Expand Down Expand Up @@ -672,7 +652,7 @@ class BlockManagerMasterEndpoint(
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
} else {
aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId))
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
}
}

Expand All @@ -683,7 +663,8 @@ class BlockManagerMasterEndpoint(
// can be used to access this block even when the original executor is already stopped.
loc.host == requesterHost &&
(loc.port == externalShuffleServicePort ||
aliveBlockManagerInfo(loc)
blockManagerInfo
.get(loc)
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
.getOrElse(false))
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
Expand All @@ -700,7 +681,7 @@ class BlockManagerMasterEndpoint(

/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds
.filterNot { _.isDriver }
Expand Down Expand Up @@ -753,35 +734,15 @@ class BlockManagerMasterEndpoint(
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- aliveBlockManagerInfo(blockManagerId)
info <- blockManagerInfo.get(blockManagerId)
) yield {
info.storageEndpoint
}
}

override def onStop(): Unit = {
askThreadPool.shutdownNow()
blockManagerInfoCleaner.shutdownNow()
}

private def cleanBlockManagerInfo(): Unit = {
logDebug("Cleaning blockManagerInfo")
val now = System.currentTimeMillis()
val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
// bmInfo.executorRemovalTs.get cannot be None when BM is not alive
!bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
}.keys
expiredBmIds.foreach { bmId =>
logInfo(s"Cleaning expired $bmId from blockManagerInfo")
blockManagerInfo.remove(bmId)
}
}

@inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
blockManagerInfo.get(bmId).filter(_.isAlive)

@inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] =
blockManagerInfo.values.filter(_.isAlive)
}

@DeveloperApi
Expand Down Expand Up @@ -834,7 +795,6 @@ private[spark] class BlockManagerInfo(

private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
private var _executorRemovalTs: Option[Long] = None

// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]
Expand Down Expand Up @@ -949,16 +909,4 @@ private[spark] class BlockManagerInfo(
def clear(): Unit = {
_blocks.clear()
}

def executorRemovalTs: Option[Long] = _executorRemovalTs

def isAlive: Boolean = _executorRemovalTs.isEmpty

def setExecutorRemovalTs(): Unit = {
if (!isAlive) {
logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
} else {
_executorRemovalTs = Some(System.currentTimeMillis())
}
}
}
7 changes: 0 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3090,13 +3090,6 @@ private[spark] object Utils extends Logging {
}
}

def executorTimeoutMs(conf: SparkConf): Long = {
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}

/** Returns a string message about delegation token generation failure */
def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): String = {
val message = "Failed to get token from service %s due to %s. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, TimeoutException}
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.commons.lang3.RandomUtils
Expand Down Expand Up @@ -101,7 +101,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
.set(Network.RPC_ASK_TIMEOUT, "5s")
.set(PUSH_BASED_SHUFFLE_ENABLED, true)
.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s")
}

private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = {
Expand Down Expand Up @@ -611,7 +610,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
}

test("no reregistration on heart beat until executor timeout") {
test("reregistration on heart beat") {
val store = makeBlockManager(2000)
val a1 = new Array[Byte](400)

Expand All @@ -622,15 +621,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")

val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister == false, "master told to re-register")

eventually(timeout(10 seconds), interval(1 seconds)) {
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister, "master did not tell to re-register")
}
assert(reregister)
}

test("reregistration on block update") {
Expand All @@ -644,12 +638,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")

eventually(timeout(10 seconds), interval(1 seconds)) {
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister, "master did not tell to re-register")
}

store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()

Expand Down