Skip to content

Commit

Permalink
HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks
Browse files Browse the repository at this point in the history
HOTFIX test issue introduced in SPARK-20732

Closes #28337 from holdenk/revert-SPARK-20732.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
holdenk committed Apr 25, 2020
1 parent f92652d commit 9faad07
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 394 deletions.
28 changes: 0 additions & 28 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,34 +413,6 @@ package object config {
.intConf
.createWithDefault(1)

private[spark] val STORAGE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.enabled")
.doc("Whether to decommission the block manager when decommissioning executor")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.internal()
.doc("Maximum number of failures which can be handled for the replication of " +
"one RDD block when block manager is decommissioning and trying to move its " +
"existing blocks.")
.version("3.1.0")
.intConf
.createWithDefault(3)

private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
.internal()
.doc("The interval of time between consecutive cache block replication reattempts " +
"happening on each decommissioning executor (due to storage decommissioning).")
.version("3.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "Time interval between two consecutive attempts of " +
"cache block replication should be positive.")
.createWithDefaultString("30s")

private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,19 +438,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
logInfo(s"Finished decommissioning executor $executorId.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo("Starting decommissioning block manager corresponding to " +
s"executor $executorId.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
}
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
Expand Down Expand Up @@ -587,7 +574,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
if (driverEndpoint != null) {
logInfo("Propagating executor decommission to driver.")
logInfo("Propegating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId))
}
}
Expand Down Expand Up @@ -671,7 +658,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
* @param resourceProfileToNumExecutors The total number of executors we'd like to have per
* ResourceProfile. The cluster manager shouldn't kill any
* running executor to reach this number, but, if all
* existing executors were to die, this is the number
Expand Down
129 changes: 9 additions & 120 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
Expand Down Expand Up @@ -242,9 +241,6 @@ private[spark] class BlockManager(

private var blockReplicationPolicy: BlockReplicationPolicy = _

private var blockManagerDecommissioning: Boolean = false
private var decommissionManager: Option[BlockManagerDecommissionManager] = None

// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
// Exposed for test
Expand Down Expand Up @@ -1555,36 +1551,30 @@ private[spark] class BlockManager(
}

/**
* Replicates a block to peer block managers based on existingReplicas and maxReplicas
* Called for pro-active replenishment of blocks lost due to executor failures
*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
* @param maxReplicationFailures number of replication failures to tolerate before
* giving up.
* @return whether block was successfully replicated or not
*/
def replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int,
maxReplicationFailures: Option[Int] = None): Boolean = {
maxReplicas: Int): Unit = {
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).forall { info =>
blockInfoManager.lockForReading(blockId).foreach { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
useDisk = info.level.useDisk,
useMemory = info.level.useMemory,
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
// we know we are called as a result of an executor removal or because the current executor
// is getting decommissioned. so we refresh peer cache before trying replication, we won't
// try to replicate to a missing executor/another decommissioning executor
// we know we are called as a result of an executor removal, so we refresh peer cache
// this way, we won't try to replicate to a missing executor with a stale reference
getPeers(forceFetch = true)
try {
replicate(
blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures)
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
} finally {
logDebug(s"Releasing lock for $blockId")
releaseLockAndDispose(blockId, data)
Expand All @@ -1601,11 +1591,9 @@ private[spark] class BlockManager(
data: BlockData,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty,
maxReplicationFailures: Option[Int] = None): Boolean = {
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {

val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
Expand All @@ -1629,7 +1617,7 @@ private[spark] class BlockManager(
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailureCount &&
while(numFailures <= maxReplicationFailures &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
Expand Down Expand Up @@ -1677,11 +1665,9 @@ private[spark] class BlockManager(
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
return false
}

logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
return true
}

/**
Expand Down Expand Up @@ -1775,58 +1761,6 @@ private[spark] class BlockManager(
blocksToRemove.size
}

def decommissionBlockManager(): Unit = {
if (!blockManagerDecommissioning) {
logInfo("Starting block manager decommissioning process")
blockManagerDecommissioning = true
decommissionManager = Some(new BlockManagerDecommissionManager(conf))
decommissionManager.foreach(_.start())
} else {
logDebug("Block manager already in decommissioning state")
}
}

/**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
*/
def decommissionRddCacheBlocks(): Unit = {
val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)

if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
"for block manager decommissioning")
}

// Maximum number of storage replication failure which replicateBlock can handle
val maxReplicationFailures = conf.get(
config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)

// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
// so that we end up prioritize them over each other
val blocksFailedReplication = ThreadUtils.parmap(
replicateBlocksInfo, "decommissionRddCacheBlocks", 4) {
case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
val replicatedSuccessfully = replicateBlock(
blockId,
existingReplicas.toSet,
maxReplicas,
maxReplicationFailures = Some(maxReplicationFailures))
if (replicatedSuccessfully) {
logInfo(s"Block $blockId offloaded successfully, Removing block now")
removeBlock(blockId)
logInfo(s"Block $blockId removed")
} else {
logWarning(s"Failed to offload block $blockId")
}
(blockId, replicatedSuccessfully)
}.filterNot(_._2).map(_._1)
if (blocksFailedReplication.nonEmpty) {
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
}
}

/**
* Remove all blocks belonging to the given broadcast.
*/
Expand Down Expand Up @@ -1895,52 +1829,7 @@ private[spark] class BlockManager(
data.dispose()
}

/**
* Class to handle block manager decommissioning retries
* It creates a Thread to retry offloading all RDD cache blocks
*/
private class BlockManagerDecommissionManager(conf: SparkConf) {
@volatile private var stopped = false
private val blockReplicationThread = new Thread {
override def run(): Unit = {
while (blockManagerDecommissioning && !stopped) {
try {
logDebug("Attempting to replicate all cached RDD blocks")
decommissionRddCacheBlocks()
logInfo("Attempt to replicate all cached blocks done")
val sleepInterval = conf.get(
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
Thread.sleep(sleepInterval)
} catch {
case _: InterruptedException =>
// no-op
case NonFatal(e) =>
logError("Error occurred while trying to " +
"replicate cached RDD blocks for block manager decommissioning", e)
}
}
}
}
blockReplicationThread.setDaemon(true)
blockReplicationThread.setName("block-replication-thread")

def start(): Unit = {
logInfo("Starting block replication thread")
blockReplicationThread.start()
}

def stop(): Unit = {
if (!stopped) {
stopped = true
logInfo("Stopping block replication thread")
blockReplicationThread.interrupt()
blockReplicationThread.join()
}
}
}

def stop(): Unit = {
decommissionManager.foreach(_.stop())
blockTransferService.close()
if (blockStoreClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Decommission block managers corresponding to given set of executors */
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
}

/** Get Replication Info for all the RDD blocks stored in given blockManagerId */
def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
}

/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ class BlockManagerMasterEndpoint(
// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

// Set of block managers which are decommissioning
private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId]

// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

Expand Down Expand Up @@ -156,13 +153,6 @@ class BlockManagerMasterEndpoint(
removeExecutor(execId)
context.reply(true)

case DecommissionBlockManagers(executorIds) =>
decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get))
context.reply(true)

case GetReplicateInfoForRDDBlocks(blockManagerId) =>
context.reply(getReplicateInfoForRDDBlocks(blockManagerId))

case StopBlockManagerMaster =>
context.reply(true)
stop()
Expand Down Expand Up @@ -267,7 +257,6 @@ class BlockManagerMasterEndpoint(

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId)

// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
Expand Down Expand Up @@ -310,39 +299,6 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

/**
* Decommission the given Seq of blockmanagers
* - Adds these block managers to decommissioningBlockManagerSet Set
* - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]]
*/
def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {
val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
val futures = newBlockManagersToDecommission.map { blockManagerId =>
decommissioningBlockManagerSet.add(blockManagerId)
val info = blockManagerInfo(blockManagerId)
info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
}
Future.sequence{ futures.toSeq }
}

/**
* Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
* @param blockManagerId - block manager id for which ReplicateBlock info is needed
* @return Seq of ReplicateBlock
*/
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
val info = blockManagerInfo(blockManagerId)

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
}

// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
Expand Down Expand Up @@ -580,11 +536,7 @@ class BlockManagerMasterEndpoint(
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds
.filterNot { _.isDriver }
.filterNot { _ == blockManagerId }
.diff(decommissioningBlockManagerSet)
.toSeq
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
Seq.empty
}
Expand Down
Loading

0 comments on commit 9faad07

Please sign in to comment.