From 375d348a83e6ffa38dfaece5047633f67aee1da5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 5 Aug 2020 16:28:14 -0700 Subject: [PATCH] [SPARK-31197][CORE] Shutdown executor once we are done decommissioning ### What changes were proposed in this pull request? Exit the executor when it has been asked to decommission and there is nothing left for it to do. This is a rebase of https://github.com/apache/spark/pull/28817 ### Why are the changes needed? If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished. Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible. ### Does this PR introduce _any_ user-facing change? The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet. ### How was this patch tested? I changed the unit test to not send the executor exit message and still wait on the executor exited message. Closes #29211 from holdenk/SPARK-31197-exit-execs-redone. Authored-by: Holden Karau Signed-off-by: Holden Karau --- .../apache/spark/deploy/DeployMessage.scala | 2 - .../apache/spark/deploy/worker/Worker.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 58 +++++++- .../cluster/CoarseGrainedClusterMessage.scala | 3 + .../CoarseGrainedSchedulerBackend.scala | 10 ++ .../apache/spark/storage/BlockManager.scala | 8 + .../storage/BlockManagerDecommissioner.scala | 96 ++++++++++-- .../scheduler/WorkerDecommissionSuite.scala | 19 +-- ...kManagerDecommissionIntegrationSuite.scala | 17 +-- .../BlockManagerDecommissionUnitSuite.scala | 139 +++++++++++++++++- 10 files changed, 310 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c8c6e5a192a24..b7a64d75a8d47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -165,8 +165,6 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. - // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa8c46fc68315..862e685c2dce6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionSelf => + case WorkerDecommission(_, _) => decommissionSelf() } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index def125bb6bfb6..55fb76b3572a3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null - @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] + @volatile private var decommissioned = false + override def onStart(): Unit = { logInfo("Registering PWR handler.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + @@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend( case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + + case DecommissionSelf => + logInfo("Received decommission self") + decommissionSelf() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor != null) { executor.decommission() } - logInfo("Done decommissioning self.") + // Shutdown the executor once all tasks are gone & any configured migrations completed. + // Detecting migrations completion doesn't need to be perfect and we want to minimize the + // overhead for executors that are not in decommissioning state as overall that will be + // more of the executors. For example, this will not catch a block which is already in + // the process of being put from a remote executor before migration starts. This trade-off + // is viewed as acceptable to minimize introduction of any new locking structures in critical + // code paths. + + val shutdownThread = new Thread("wait-for-blocks-to-migrate") { + override def run(): Unit = { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + Thread.sleep(sleep_time) + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } + } else { + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } + } else { + logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + // Note: this is only advanced if there is a running task, if there + // is no running task but the blocks are not done migrating this does not + // move forward. + lastTaskRunningTime = System.nanoTime() + } + } + } + } + shutdownThread.setDaemon(true) + shutdownThread.start() + + logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true } catch { case e: Exception => - logError(s"Error ${e} during attempt to decommission self") + logError("Unexpected error while decommissioning self", e) false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 91485f01bf007..7242ab7786061 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage + + // Used to ask an executor to decommission itself. (Can be an internal message) + case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8fbefae58af14..d81a617d0ed7d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) } + // Send decommission message to the executor, this may be a duplicate since the executor + // could have been the one to notify us. But it's also possible the notification came from + // elsewhere and the executor does not yet know. + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + } logInfo(s"Finished decommissioning executor $executorId.") if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47af854b6e8ff..6ec93df67f7db 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,6 +1822,14 @@ private[spark] class BlockManager( } } + /* + * Returns the last migration time and a boolean denoting if all the blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[spark] def lastMigrationInfo(): (Long, Boolean) = { + decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) + } + private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = master.getReplicateInfoForRDDBlocks(blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 1cc7ef6a25f92..f0a8e47aa3200 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner( private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + // Used for tracking if our migrations are complete. Readable for testing + @volatile private[storage] var lastRDDMigrationTime: Long = 0 + @volatile private[storage] var lastShuffleMigrationTime: Long = 0 + @volatile private[storage] var rddBlocksLeft: Boolean = true + @volatile private[storage] var shuffleBlocksLeft: Boolean = true + /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a * producer/consumer where the main migration loop updates the queue of blocks to be migrated @@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner( null)// class tag, we don't need for shuffle logDebug(s"Migrated sub block ${blockId}") } - logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") + logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}") } else { logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") } + numMigratedShuffles.incrementAndGet() } } // This catch is intentionally outside of the while running block. @@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner( // Shuffles which are either in queue for migrations or migrated private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() + // Shuffles which have migrated. This used to know when we are "done", being done can change + // if a new shuffle file is created by a running task. + private val numMigratedShuffles = new AtomicInteger(0) + // Shuffles which are queued for migration & number of retries so far. + // Visible in storage for testing. private[storage] val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]() // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false + @volatile private var stoppedRDD = + !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) + @volatile private var stoppedShuffle = + !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() @@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner( override def run(): Unit = { assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedRDD && !Thread.interrupted()) { logInfo("Iterating on migrating from the block manager.") + // Validate we have peers to migrate to. + val peers = bm.getPeers(false) + // If we have no peers give up. + if (peers.isEmpty) { + stopped = true + stoppedRDD = true + } try { + val startTime = System.nanoTime() logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() + rddBlocksLeft = decommissionRddCacheBlocks() + lastRDDMigrationTime = startTime logInfo("Attempt to replicate all cached blocks done") logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => - logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + logInfo("Interrupted during RDD migration, stopping") + stoppedRDD = true case NonFatal(e) => - logError("Error occurred while trying to replicate for block manager decommissioning.", + logError("Error occurred replicating RDD for block manager decommissioning.", e) - stopped = true + stoppedRDD = true } } } @@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner( override def run() { assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedShuffle && !Thread.interrupted()) { try { logDebug("Attempting to replicate all shuffle blocks") - refreshOffloadingShuffleBlocks() + val startTime = System.nanoTime() + shuffleBlocksLeft = refreshOffloadingShuffleBlocks() + lastShuffleMigrationTime = startTime logInfo("Done starting workers to migrate shuffle blocks") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + stoppedShuffle = true case NonFatal(e) => logError("Error occurred while trying to replicate for block manager decommissioning.", e) - stopped = true + stoppedShuffle = true } } } @@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner( * but rather shadows them. * Requires an Indexed based shuffle resolver. * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. + * Returns true if we are not done migrating shuffle blocks. */ - private[storage] def refreshOffloadingShuffleBlocks(): Unit = { + private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { // Update the queue of shuffles to be migrated logInfo("Offloading shuffle blocks") val localShuffles = bm.migratableResolver.getStoredShuffles().toSet @@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner( deadPeers.foreach { peer => migrationPeers.get(peer).foreach(_.running = false) } + // If we don't have anyone to migrate to give up + if (migrationPeers.values.find(_.running == true).isEmpty) { + stoppedShuffle = true + } + // If we found any new shuffles to migrate or otherwise have not migrated everything. + newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() } /** @@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner( /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing + * Returns true if we have not migrated all of our RDD blocks. */ - private[storage] def decommissionRddCacheBlocks(): Unit = { + private[storage] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = bm.getMigratableRDDBlocks() + // Refresh peers and validate we have somewhere to move blocks. if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + "for block manager decommissioning") } else { logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") - return + return false } // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) @@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner( if (blocksFailedReplication.nonEmpty) { logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") + return true } + return false } private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { @@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner( } logInfo("Stopped storage decommissioner") } + + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * The last migration time is calculated to be the minimum of the last migration of any + * running migration (and if there are now current running migrations it is set to current). + * This provides a timeStamp which, if there have been no tasks running since that time + * we can know that all potential blocks that can be have been migrated off. + */ + private[storage] def lastMigrationInfo(): (Long, Boolean) = { + if (stopped || (stoppedRDD && stoppedShuffle)) { + // Since we don't have anything left to migrate ever (since we don't restart once + // stopped), return that we're done with a validity timestamp that doesn't expire. + (Long.MaxValue, true) + } else { + // Chose the min of the active times. See the function description for more information. + val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) { + Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) + } else if (!stoppedShuffle) { + lastShuffleMigrationTime + } else { + lastRDDMigrationTime + } + + // Technically we could have blocks left if we encountered an error, but those blocks will + // never be migrated, so we don't care about them. + val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) + (lastMigrationTime, blocksMigrated) + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 3c34070e8bb97..bb0c33acc0af5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { assert(sleepyRdd.count() === 10) } - test("verify a task with all workers decommissioned succeeds") { + test("verify a running task with all workers decommissioned succeeds") { + // Wait for the executors to come up + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 2, + timeout = 30000) // 30s + val input = sc.parallelize(1 to 10) // Listen for the job val sem = new Semaphore(0) @@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { sem.release() } }) - TestUtils.waitUntilExecutorsUp(sc = sc, - numExecutors = 2, - timeout = 30000) // 30s + val sleepyRdd = input.mapPartitions{ x => Thread.sleep(5000) // 5s x @@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) - // Try and launch task after decommissioning, this should fail - val postDecommissioned = input.map(x => x) - val postDecomAsyncCount = postDecommissioned.countAsync() - val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) - } - assert(postDecomAsyncCount.isCompleted === false, - "After exec decommission new task could not launch") } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 6a52f72938c6c..25145dac52681 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -69,9 +69,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) - // Just replicate blocks as fast as we can during testing, there isn't another + // Just replicate blocks quickly during testing, there isn't another // workload we need to worry about. - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) if (whenToDecom == TaskStarted) { // We are using accumulators below, make sure those are reported frequently. @@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execIdToBlocksMapping = storageStatus.map( status => (status.blockManagerId.executorId, status.blocks)).toMap // No cached blocks should be present on executor which was decommissioned - assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), + assert( + !execIdToBlocksMapping.contains(execToDecommission) || + execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), "Cache blocks should be migrated") if (persist) { // There should still be all the RDD blocks cached assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } - // Make the executor we decommissioned exit - sched.client.killExecutors(List(execToDecommission)) - - // Wait for the executor to be removed - executorRemovedSem.acquire(1) + // Wait for the executor to be removed automatically after migration. + assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES)) // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 41b68d5978d16..74ad8bd2bcf9d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{atLeast => least, mock, times, verify, when} import org.scalatest.concurrent.Eventually._ import org.scalatest.matchers.must.Matchers @@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { private val sparkConf = new SparkConf(false) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true) + // Just replicate blocks quickly during testing, as there isn't another + // workload we need to worry about. + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) private def registerShuffleBlocks( mockMigratableShuffleResolver: MigratableResolver, @@ -54,6 +57,113 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } } + /** + * Validate a given configuration with the mocks. + * The fail variable controls if we expect migration to fail, in which case we expect + * a constant Long.MaxValue timestamp. + */ + private def validateDecommissionTimestamps(conf: SparkConf, bm: BlockManager, + migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = false) = { + // Verify the decommissioning manager timestamps and status + val bmDecomManager = new BlockManagerDecommissioner(conf, bm) + var previousTime: Option[Long] = None + try { + bmDecomManager.start() + eventually(timeout(100.second), interval(10.milliseconds)) { + val (currentTime, done) = bmDecomManager.lastMigrationInfo() + assert(done) + // Make sure the time stamp starts moving forward. + if (!fail) { + previousTime match { + case None => + previousTime = Some(currentTime) + assert(false) + case Some(t) => + assert(t < currentTime) + } + } else { + // If we expect migration to fail we should get the max value quickly. + assert(currentTime === Long.MaxValue) + } + } + if (!fail) { + // Wait 5 seconds and assert times keep moving forward. + Thread.sleep(5000) + val (currentTime, done) = bmDecomManager.lastMigrationInfo() + assert(done && currentTime > previousTime.get) + } + } finally { + bmDecomManager.stop() + } + } + + test("test that with no blocks we finish migration") { + // Set up the mocks so we return empty + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + when(migratableShuffleBlockResolver.getStoredShuffles()) + .thenReturn(Seq()) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver) + } + + test("block decom manager with no migrations configured") { + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + val badConf = new SparkConf(false) + .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, false) + .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, false) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(badConf, bm, migratableShuffleBlockResolver, + fail = true) + } + + test("block decom manager with no peers") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq()) + + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver, + fail = true) + } + + + test("block decom manager with only shuffle files time moves forward") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver) + } + test("test shuffle and cached rdd migration without any error") { val blockTransferService = mock(classOf[BlockTransferService]) val bm = mock(classOf[BlockManager]) @@ -77,13 +187,36 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() - eventually(timeout(5.second), interval(10.milliseconds)) { + var previousRDDTime: Option[Long] = None + var previousShuffleTime: Option[Long] = None + + // We don't check that all blocks are migrated because out mock is always returning an RDD. + eventually(timeout(100.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) - verify(bm, times(1)).replicateBlock( + verify(bm, least(1)).replicateBlock( mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), mc.eq(StorageLevel.DISK_ONLY), mc.isNull()) + // Since we never "finish" the RDD blocks, make sure the time is always moving forward. + assert(bmDecomManager.rddBlocksLeft) + previousRDDTime match { + case None => + previousRDDTime = Some(bmDecomManager.lastRDDMigrationTime) + assert(false) + case Some(t) => + assert(bmDecomManager.lastRDDMigrationTime > t) + } + // Since we do eventually finish the shuffle blocks make sure the shuffle blocks complete + // and that the time keeps moving forward. + assert(!bmDecomManager.shuffleBlocksLeft) + previousShuffleTime match { + case None => + previousShuffleTime = Some(bmDecomManager.lastShuffleMigrationTime) + assert(false) + case Some(t) => + assert(bmDecomManager.lastShuffleMigrationTime > t) + } } } finally { bmDecomManager.stop()