Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629 #28817

Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
67dec3c
Add an option to migrate shuffle blocks as well as the current cache …
holdenk Jun 2, 2020
4f2e7ce
Update core/src/main/scala/org/apache/spark/storage/BlockManager.scala
holdenk Jun 3, 2020
ecd1a14
Update core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlock…
holdenk Jun 3, 2020
9c8836a
First pass at the feedback from @attilapiros, mostly minor re-arrange…
holdenk Jun 3, 2020
ccb8827
Saw a test failure which could come from us not having a reasonable t…
holdenk Jun 4, 2020
c4ed3bd
Improve error logging
holdenk Jun 4, 2020
ac510dc
cleanup
holdenk Jun 4, 2020
e13b070
cleanup
holdenk Jun 4, 2020
d3ecd8e
Add more info to debugging
holdenk Jun 4, 2020
6bdf0c2
logging string interpolation
holdenk Jun 4, 2020
f2eb6eb
logging string interpolation
holdenk Jun 4, 2020
81e29a8
logging string interpolation
holdenk Jun 4, 2020
6f0544a
[SPARK-28624][SQL][TESTS] Run date.sql via Thrift Server
MaxGekk Jun 4, 2020
41d5464
Generalize the decom put to check put as stream and shuffle blocks as…
holdenk Jun 4, 2020
a517d67
spacing
holdenk Jun 4, 2020
bce1613
Fix long line, make our shuffle block threads stop so we don't leak t…
holdenk Jun 4, 2020
6c1b364
Remove un-needed shuffleStatus.invalidateSerializedMapOutputStatusCac…
holdenk Jun 4, 2020
a904030
Always transfer shuffle blocks as put, take out the spark.network.max…
holdenk Jun 4, 2020
5838639
First pass at making the executor exit once no tasks are running and …
holdenk Jun 2, 2020
4b3fb27
Test that we exit the executor on our own.
holdenk Jun 4, 2020
6a940e6
Fix up how we check if we're done with migrations to avoid a race whe…
holdenk Jun 4, 2020
d591507
Also check inside of the CGEB where we know if tasks are running or not
holdenk Jun 4, 2020
ea8efc7
No need to wait, just had that bumped up for debugging
holdenk Jun 4, 2020
a2c0557
CR feedback (clarify comment and log statement)
holdenk Jun 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
case WorkerDecommission(_, _) =>
decommissionSelf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ private[spark] class CoarseGrainedExecutorBackend(
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)

case DecommissionSelf =>
logInfo("Received decommission self")
decommissionSelf()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}

private def decommissionSelf(): Boolean = {
logInfo("Decommissioning self w/sync")
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
private var previousAllBlocksMigrated = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this variable be marked volatile ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, this will only be accessed in one thread.

private def shutdownIfDone(): Unit = {
val numRunningTasks = executor.numRunningTasks
logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.")
if (executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
val allBlocksMigrated = env.blockManager.decommissionManager match {
case Some(m) => m.allBlocksMigrated
case None => false // We haven't started migrations yet.
}
if (allBlocksMigrated && previousAllBlocksMigrated) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitExecutor asynchronously sends RemoveExecutor to the driver. Does that actually make it to the driver ? There is also this question about if we should be using the same Shutdown/StopExecutor codepath for doing the stopping ? (But althought it seems that we do want to intimate to the driver that the executor is being removed).

Interestingly, the driver does indeed respond back with a StopExecutor and does trigger the clean shutdown path in the executor, but again I wonder if it is too late for it. Perhaps we shouldn't be calling System.exit here ?

Also, as currently written, this exitExecutor could cause job failures: Since the TaskSchedulerImpl will treat the ExecutorLossReason send by the executor to the driver as an exitCausedByApp and thus penalize the task. Instead, I think we shouldn't penalize the running job on a planned executor decommission. One workaround might be to actually respond back to the driver with ExecutorDecommission (which is not used elsewhere currently) and then handle that specifically in the TaskSchedulerImpl's determination of exitCausedByApp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's my understanding the TaskSchedulerImpl shouldn't have any job failures because we've waited for all the tasks on the executor to finish before calling this code path. Unless is there something I've missed there?

I think swapping out exit executor for instead telling the driver to stop the executor and avoiding the system.exit makes sense either way though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking about the case where we get shot down before we had a chance to cleanly exit on line 276. Say for example, some time out expires and the executor/node is brought down.

Are decom.sh and decommission-slave.sh expected to wait until the executor/worker process has properly shut down ? I think they have some timeouts in them to kill the executor ? Or consider a spot kill scenario where you got some warning (like 2 minutes) and then the machine is yanked out.

In this case, the executor will eventually be marked loss via a heartbeat/timeout. And that loss would be deemed as the fault of the task, and could cause job failures. I am wondering if we can fix that scenario of an unclean exit ?

One workaround I suggested above was to send a message to the driver saying that the executor is going to go away soon. When that happens (in a clean or unclean way), that loss shouldn't be attributed to the task.

Perhaps this unclean executor loss/timeout handling is follow up work ? We (or rather I) can create Jira's for this under the parent ticket :-).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, although I think this behaviour is covered by the changes in https://github.com/apache/spark/pull/26440/files (we only increment failures if the executors previous state was not decommissioning).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please double check that ? I couldn't find this behavior when scouring TaskSchedulerImpl, and TaskSetManager. The only place we check for an executor being decommissioned in that PR is when scheduling tasks (in CoarseGrainedSchedulerBackend#isExecutorActive). Thanks !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to where in TaskSchedulerImpl it's going to fail the job? core/src/main/scala/org/apache/spark/deploy/master/Master.scala is where the current code is, but there might be an additional case that needs to be covered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {

In this match block, we will hit the default case which will treat the failure as having been caused by the app and thus penalize it.

This routine is called from TaskScheduler.executorLost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :) If you want to make a PR for that I'd be happy to review/merge since I think that would not depend on any of the in-flight PRs just the current code in master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely !. Thanks

}
previousAllBlocksMigrated = allBlocksMigrated
} else {
logError("No driver to message decommissioning.")
logInfo("No running tasks, no block migration configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
if (executor != null) {
executor.decommission()
} else {
// If there's a running task it could store blocks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic of previousAllBlocks and allBlocks migrated is a bit confusing. Its not clear why the previous state has to be considered. I wonder if the following code can make this "history" aspect a bit clearer:

val allBlocksMigrated = !env.conf.get(STORAGE_DECOMMISSION_ENABLED) ||
      env.blockManager.decommissionManager.map(_.allBlocksMigrated).orElse(false)
val exitCondition = allBlocksMigrated && numRunningTasks == 0
if (exitCondition) { exitExecutor(...) }

Also, should we really be checking for numRunningTasks here ? What if some race condition caused some tasks to be scheduled onto us while we were marked for decom ?

Finally, should there be a timeout for how much time the executor will stay alive in decommissioned state ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task is scheduled before we are asked to decom. You can verify this is covered by taking the logic out and watching the tests fail :) (There's an ungly thread sleep in the tests to make this possible).

Since the block migrations are not atomic, I do think we need the 2x logic, unfortunately, think of this situation:

  1. Task launches on executor
  2. Executor asked to decomission
  3. All blocks currently stored on executor are migrated
  4. Task stores a block
  5. We check numTasks & see all blocks are migrated, then exit without migrating the block stored by task SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID #4.

Now that being said that's probably a corner case, and arguably not super important since we're really only doing best effort, but I think for the overhead of one extra boolean it's worth it to cover this corner case.

previousAllBlocksMigrated = false
}
}

private def decommissionSelf(): Boolean = {
if (!decommissioned) {
logInfo("Decommissioning self w/sync")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should expand what 'w/sync' stands for in the log message ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
} else {
logError("No driver to message decommissioning.")
}
if (executor != null) {
executor.decommission()
}
// Shutdown the executor once all tasks are gone :)
val shutdownThread = new Thread() {
while (true) {
shutdownIfDone()
Thread.sleep(1000) // 1s
}
}
shutdownThread.setDaemon(true)
shutdownThread.setName("decommission-shutdown-thread")
shutdownThread.start()
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
} else {
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ private[spark] class Executor(
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
logInfo("Executor asked to decommission. Starting shutdown thread.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment looks stale. It should probably be moved to the CoarseGrainedBackendExecutor. Its also not clear to me what the decommission flag does in the Executor besides just logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just logging for now. The reason I propagate the message to the executor is so that if we end up in a state where the executor believes it decommissioned (say local SIGPWR) but the driver doesn't it could be weird so having some logging is useful.

decommissioned = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ?

This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ?

Not a big deal but trying to reduce the number of message types introduced by this feature ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think decommissionself is pretty clearly telling the receiver to decommission itself. That being said I'm open to renaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good ;-) I checked that DecommissionSelf is not indeed used anywhere else, so it should be unambiguous. Lets keep the name.


// Executors to driver
case class RegisterExecutor(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.Future
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.{DeployMessage, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -432,7 +432,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (shouldDisable) {
logInfo(s"Starting decommissioning executor $executorId.")
try {
// Stop making offers on this executor
scheduler.executorDecommission(executorId)
// Send decommission message to the executor (it could have originated there but not
// necessarily).
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
DecommissionSelf)
case None =>
// Ignoring the executor since it is not registered.
logWarning(s"Attempted to decommission unknown executor $executorId.")
}
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
Expand Down
42 changes: 31 additions & 11 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private[spark] class BlockManager(
private var blockReplicationPolicy: BlockReplicationPolicy = _

private var blockManagerDecommissioning: Boolean = false
private var decommissionManager: Option[BlockManagerDecommissionManager] = None
private[spark] var decommissionManager: Option[BlockManagerDecommissionManager] = None

// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
Expand All @@ -262,6 +262,8 @@ private[spark] class BlockManager(

// Shuffles which are either in queue for migrations or migrated
private val migratingShuffles = mutable.HashSet[(Int, Long)]()
// Shuffles which have migrated
private val migratedShuffles = mutable.HashSet[(Int, Long)]()
// Shuffles which are queued for migration
private val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()

Expand Down Expand Up @@ -1822,6 +1824,7 @@ private[spark] class BlockManager(
}
}


private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable {
@volatile var running = true
override def run(): Unit = {
Expand Down Expand Up @@ -1862,6 +1865,7 @@ private[spark] class BlockManager(
logInfo(s"Migrated sub block ${blockId}")
}
logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
migratedShuffles += ((shuffleId, mapId))
}
}
// This catch is intentionally outside of the while running block.
Expand All @@ -1887,7 +1891,7 @@ private[spark] class BlockManager(
* but rather shadows them.
* Requires an Indexed based shuffle resolver.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the comment needs to be updated to reflect what the return Boolean indicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

*/
def offloadShuffleBlocks(): Unit = {
def offloadShuffleBlocks(): Boolean = {
// Update the queue of shuffles to be migrated
logInfo("Offloading shuffle blocks")
val localShuffles = migratableResolver.getStoredShuffles()
Expand All @@ -1914,29 +1918,32 @@ private[spark] class BlockManager(
deadPeers.foreach { peer =>
migrationPeers.get(peer).foreach(_.running = false)
}
// If we found any new shuffles to migrate or otherwise have not migrated everything.
return newShufflesToMigrate.nonEmpty || (migratingShuffles.&~(migratedShuffles)).nonEmpty
}


/**
* Stop migrating shuffle blocks.
*/
def stopOffloadingShuffleBlocks(): Unit = {
logInfo("Stopping offloading shuffle blocks")
migrationPeers.values.foreach(_.running = false)
}

/**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
*/
def decommissionRddCacheBlocks(): Unit = {
private[spark] def decommissionRddCacheBlocks(): Boolean = {
val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)

if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
"for block manager decommissioning")
} else {
logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate")
return
return false
}

// Maximum number of storage replication failure which replicateBlock can handle
Expand Down Expand Up @@ -1965,6 +1972,7 @@ private[spark] class BlockManager(
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
}
return true
}

/**
Expand Down Expand Up @@ -2039,8 +2047,11 @@ private[spark] class BlockManager(
* Class to handle block manager decommissioning retries
* It creates a Thread to retry offloading all RDD cache blocks
*/
private class BlockManagerDecommissionManager(conf: SparkConf) {
private[spark] class BlockManagerDecommissionManager(conf: SparkConf) {
@volatile private var stopped = false
// Since running tasks can add more blocks this can change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I am totally understanding this: You mean that the running tasks that were already running when the decommissioning was started at the executor ? Because, I think we refuse launching new tasks when the decommissioning has started, so the new blocks being written must be written by already running tasks. Did I get this right ?

Also, just to confirm I am still following along: I don't see this case handled in the existing BlockManagerSuite: I believe we are not testing writing new blocks while the decom/offload is in progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is covered, you can verify this by disabling this logic and seeing the test fail (albiet you'll have to run the test a few times because it becomes a race condition). Look at the "migrateDuring" flag for details.

@volatile var allBlocksMigrated = false
var previousBlocksLeft = true
private val blockMigrationThread = new Thread {
val sleepInterval = conf.get(
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
Expand All @@ -2053,22 +2064,30 @@ private[spark] class BlockManager(
&& failures < 20) {
logInfo("Iterating on migrating from the block manager.")
try {
var blocksLeft = false
// If enabled we migrate shuffle blocks first as they are more expensive.
if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) {
logDebug(s"Attempting to replicate all shuffle blocks")
offloadShuffleBlocks()
logInfo(s"Done starting workers to migrate shuffle blocks")
logDebug("Attempting to replicate all shuffle blocks")
blocksLeft = blocksLeft || offloadShuffleBlocks()
logInfo("Done starting workers to migrate shuffle blocks")
}
if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED)) {
logDebug(s"Attempting to replicate all cached RDD blocks")
decommissionRddCacheBlocks()
logInfo(s"Attempt to replicate all cached blocks done")
logDebug("Attempting to replicate all cached RDD blocks")
blocksLeft = blocksLeft || decommissionRddCacheBlocks()
logInfo("Attempt to replicate all cached blocks done")
blocksLeft
}
logInfo(s"We have blocksLeft: ${blocksLeft}")
// Avoid the situation where block was added during the loop
allBlocksMigrated = (! blocksLeft ) && ( ! previousBlocksLeft )
previousBlocksLeft = blocksLeft
if (!conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED) &&
!conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) {
logWarning("Decommissioning, but no task configured set one or both:\n" +
"spark.storage.decommission.shuffle_blocks\n" +
"spark.storage.decommission.rdd_blocks")
allBlocksMigrated = true
stopped = true
}
logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.")
Thread.sleep(sleepInterval)
Expand Down Expand Up @@ -2103,6 +2122,7 @@ private[spark] class BlockManager(
}

def stop(): Unit = {
logInfo("Stopping decommission manager")
decommissionManager.foreach(_.stop())
blockTransferService.close()
if (blockStoreClient ne blockTransferService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
.set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle)
// Just replicate blocks as fast as we can during testing, there isn't another
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1000L)

sc = new SparkContext(master, "test", conf)

Expand Down Expand Up @@ -223,10 +223,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts)
}

// Make the executor we decommissioned exit
sched.client.killExecutors(List(execToDecommission))

// Wait for the executor to be removed
// Wait for the executor to be removed after blocks are migrated.
executorRemovedSem.acquire(1)

// Since the RDD is cached or shuffled so further usage of same RDD should use the
Expand Down