Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629 #28817

Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
67dec3c
Add an option to migrate shuffle blocks as well as the current cache …
holdenk Jun 2, 2020
4f2e7ce
Update core/src/main/scala/org/apache/spark/storage/BlockManager.scala
holdenk Jun 3, 2020
ecd1a14
Update core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlock…
holdenk Jun 3, 2020
9c8836a
First pass at the feedback from @attilapiros, mostly minor re-arrange…
holdenk Jun 3, 2020
ccb8827
Saw a test failure which could come from us not having a reasonable t…
holdenk Jun 4, 2020
c4ed3bd
Improve error logging
holdenk Jun 4, 2020
ac510dc
cleanup
holdenk Jun 4, 2020
e13b070
cleanup
holdenk Jun 4, 2020
d3ecd8e
Add more info to debugging
holdenk Jun 4, 2020
6bdf0c2
logging string interpolation
holdenk Jun 4, 2020
f2eb6eb
logging string interpolation
holdenk Jun 4, 2020
81e29a8
logging string interpolation
holdenk Jun 4, 2020
6f0544a
[SPARK-28624][SQL][TESTS] Run date.sql via Thrift Server
MaxGekk Jun 4, 2020
41d5464
Generalize the decom put to check put as stream and shuffle blocks as…
holdenk Jun 4, 2020
a517d67
spacing
holdenk Jun 4, 2020
bce1613
Fix long line, make our shuffle block threads stop so we don't leak t…
holdenk Jun 4, 2020
6c1b364
Remove un-needed shuffleStatus.invalidateSerializedMapOutputStatusCac…
holdenk Jun 4, 2020
a904030
Always transfer shuffle blocks as put, take out the spark.network.max…
holdenk Jun 4, 2020
5838639
First pass at making the executor exit once no tasks are running and …
holdenk Jun 2, 2020
4b3fb27
Test that we exit the executor on our own.
holdenk Jun 4, 2020
6a940e6
Fix up how we check if we're done with migrations to avoid a race whe…
holdenk Jun 4, 2020
d591507
Also check inside of the CGEB where we know if tasks are running or not
holdenk Jun 4, 2020
ea8efc7
No need to wait, just had that bumped up for debugging
holdenk Jun 4, 2020
a2c0557
CR feedback (clarify comment and log statement)
holdenk Jun 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util._
*
* All public methods of this class are thread-safe.
*/
private class ShuffleStatus(numPartitions: Int) {
private class ShuffleStatus(numPartitions: Int) extends Logging {

private val (readLock, writeLock) = {
val lock = new ReentrantReadWriteLock()
Expand Down Expand Up @@ -121,12 +121,28 @@ private class ShuffleStatus(numPartitions: Int) {
mapStatuses(mapIndex) = status
}

/**
* Update the map output location (e.g. during migration).
*/
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock {
val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
mapStatusOpt match {
case Some(mapStatus) =>
logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
mapStatus.updateLocation(bmAddress)
invalidateSerializedMapOutputStatusCache()
case None =>
logError(s"Asked to update map output ${mapId} for untracked map status.")
}
}

/**
* Remove the map output which was served by the specified block manager.
* This is a no-op if there is no registered map output or if the registered output is from a
* different block manager.
*/
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock {
logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) {
_numAvailableOutputs -= 1
mapStatuses(mapIndex) = null
Expand All @@ -139,6 +155,7 @@ private class ShuffleStatus(numPartitions: Int) {
* outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsOnHost(host: String): Unit = withWriteLock {
logDebug(s"Removing outputs for host ${host}")
removeOutputsByFilter(x => x.host == host)
}

Expand All @@ -148,6 +165,7 @@ private class ShuffleStatus(numPartitions: Int) {
* still registered with that execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = withWriteLock {
logDebug(s"Removing outputs for execId ${execId}")
removeOutputsByFilter(x => x.executorId == execId)
}

Expand Down Expand Up @@ -265,7 +283,7 @@ private[spark] class MapOutputTrackerMasterEndpoint(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
logInfo(s"Asked to send map output locations for shuffle ${shuffleId} to ${hostPort}")
tracker.post(new GetMapOutputMessage(shuffleId, context))

case StopMapOutputTracker =>
Expand Down Expand Up @@ -479,6 +497,15 @@ private[spark] class MapOutputTrackerMaster(
}
}

def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.updateMapOutput(mapId, bmAddress)
case None =>
logError(s"Asked to update map output for unknown shuffle ${shuffleId}")
}
}

def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.shuffle.ShuffleDataIOUtils
import org.apache.spark.shuffle.api.ShuffleDriverComponents
Expand Down Expand Up @@ -1725,6 +1725,17 @@ class SparkContext(config: SparkConf) extends Logging {
}
}


private[spark] def decommissionExecutors(executorIds: Seq[String]): Unit = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
executorIds.foreach(b.decommissionExecutor)
case _ =>
logWarning(s"Decommissioning executors is not supported by current scheduler" +
s"${schedulerBackend}")
}
}

/** The version of Spark on which this application is running. */
def version: String = SPARK_VERSION

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ object SparkEnv extends Logging {
externalShuffleClient
} else {
None
}, blockManagerInfo)),
}, blockManagerInfo,
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
case WorkerDecommission(_, _) =>
decommissionSelf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ private[spark] class CoarseGrainedExecutorBackend(
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)

case DecommissionSelf =>
logInfo("Received decommission self")
decommissionSelf()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -258,26 +262,65 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}

private def decommissionSelf(): Boolean = {
logInfo("Decommissioning self w/sync")
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
private var previousAllBlocksMigrated = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this variable be marked volatile ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, this will only be accessed in one thread.

private def shutdownIfDone(): Unit = {
val numRunningTasks = executor.numRunningTasks
logInfo(s"Checking to see if we can shutdown have ${numRunningTasks} running tasks.")
if (executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
val allBlocksMigrated = env.blockManager.decommissionManager match {
case Some(m) => m.allBlocksMigrated
case None => false // We haven't started migrations yet.
}
if (allBlocksMigrated && previousAllBlocksMigrated) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitExecutor asynchronously sends RemoveExecutor to the driver. Does that actually make it to the driver ? There is also this question about if we should be using the same Shutdown/StopExecutor codepath for doing the stopping ? (But althought it seems that we do want to intimate to the driver that the executor is being removed).

Interestingly, the driver does indeed respond back with a StopExecutor and does trigger the clean shutdown path in the executor, but again I wonder if it is too late for it. Perhaps we shouldn't be calling System.exit here ?

Also, as currently written, this exitExecutor could cause job failures: Since the TaskSchedulerImpl will treat the ExecutorLossReason send by the executor to the driver as an exitCausedByApp and thus penalize the task. Instead, I think we shouldn't penalize the running job on a planned executor decommission. One workaround might be to actually respond back to the driver with ExecutorDecommission (which is not used elsewhere currently) and then handle that specifically in the TaskSchedulerImpl's determination of exitCausedByApp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's my understanding the TaskSchedulerImpl shouldn't have any job failures because we've waited for all the tasks on the executor to finish before calling this code path. Unless is there something I've missed there?

I think swapping out exit executor for instead telling the driver to stop the executor and avoiding the system.exit makes sense either way though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking about the case where we get shot down before we had a chance to cleanly exit on line 276. Say for example, some time out expires and the executor/node is brought down.

Are decom.sh and decommission-slave.sh expected to wait until the executor/worker process has properly shut down ? I think they have some timeouts in them to kill the executor ? Or consider a spot kill scenario where you got some warning (like 2 minutes) and then the machine is yanked out.

In this case, the executor will eventually be marked loss via a heartbeat/timeout. And that loss would be deemed as the fault of the task, and could cause job failures. I am wondering if we can fix that scenario of an unclean exit ?

One workaround I suggested above was to send a message to the driver saying that the executor is going to go away soon. When that happens (in a clean or unclean way), that loss shouldn't be attributed to the task.

Perhaps this unclean executor loss/timeout handling is follow up work ? We (or rather I) can create Jira's for this under the parent ticket :-).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, although I think this behaviour is covered by the changes in https://github.com/apache/spark/pull/26440/files (we only increment failures if the executors previous state was not decommissioning).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please double check that ? I couldn't find this behavior when scouring TaskSchedulerImpl, and TaskSetManager. The only place we check for an executor being decommissioned in that PR is when scheduling tasks (in CoarseGrainedSchedulerBackend#isExecutorActive). Thanks !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to where in TaskSchedulerImpl it's going to fail the job? core/src/main/scala/org/apache/spark/deploy/master/Master.scala is where the current code is, but there might be an additional case that needs to be covered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {

In this match block, we will hit the default case which will treat the failure as having been caused by the app and thus penalize it.

This routine is called from TaskScheduler.executorLost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :) If you want to make a PR for that I'd be happy to review/merge since I think that would not depend on any of the in-flight PRs just the current code in master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely !. Thanks

}
previousAllBlocksMigrated = allBlocksMigrated
} else {
logError("No driver to message decommissioning.")
logInfo("No running tasks, no block migration configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
if (executor != null) {
executor.decommission()
} else {
// If there's a running task it could store blocks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic of previousAllBlocks and allBlocks migrated is a bit confusing. Its not clear why the previous state has to be considered. I wonder if the following code can make this "history" aspect a bit clearer:

val allBlocksMigrated = !env.conf.get(STORAGE_DECOMMISSION_ENABLED) ||
      env.blockManager.decommissionManager.map(_.allBlocksMigrated).orElse(false)
val exitCondition = allBlocksMigrated && numRunningTasks == 0
if (exitCondition) { exitExecutor(...) }

Also, should we really be checking for numRunningTasks here ? What if some race condition caused some tasks to be scheduled onto us while we were marked for decom ?

Finally, should there be a timeout for how much time the executor will stay alive in decommissioned state ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task is scheduled before we are asked to decom. You can verify this is covered by taking the logic out and watching the tests fail :) (There's an ungly thread sleep in the tests to make this possible).

Since the block migrations are not atomic, I do think we need the 2x logic, unfortunately, think of this situation:

  1. Task launches on executor
  2. Executor asked to decomission
  3. All blocks currently stored on executor are migrated
  4. Task stores a block
  5. We check numTasks & see all blocks are migrated, then exit without migrating the block stored by task SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID #4.

Now that being said that's probably a corner case, and arguably not super important since we're really only doing best effort, but I think for the overhead of one extra boolean it's worth it to cover this corner case.

previousAllBlocksMigrated = false
}
}

private def decommissionSelf(): Boolean = {
if (!decommissioned) {
logInfo("Decommissioning self w/sync")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should expand what 'w/sync' stands for in the log message ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
} else {
logError("No driver to message decommissioning.")
}
if (executor != null) {
executor.decommission()
}
// Shutdown the executor once all tasks are gone :)
val shutdownThread = new Thread() {
while (true) {
shutdownIfDone()
Thread.sleep(1000) // 1s
}
}
shutdownThread.setDaemon(true)
shutdownThread.setName("decommission-shutdown-thread")
shutdownThread.start()
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
} else {
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ private[spark] class Executor(
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
logInfo("Executor asked to decommission. Starting shutdown thread.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment looks stale. It should probably be moved to the CoarseGrainedBackendExecutor. Its also not clear to me what the decommission flag does in the Executor besides just logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just logging for now. The reason I propagate the message to the executor is so that if we end up in a state where the executor believes it decommissioned (say local SIGPWR) but the driver doesn't it could be weird so having some logging is useful.

decommissioned = true
}

Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,21 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.shuffle_blocks")
.doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " +
"an indexed shuffle resolver (like sort based shuffe)")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.rdd_blocks")
.doc("Whether to transfer RDD blocks during block manager decommissioning.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ private[spark] class NettyBlockTransferService(
// Everything else is encoded using our binary protocol.
val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))

val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
val asStream = (blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) ||
blockId.isInternalShuffle || blockId.isShuffle)
val callback = new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import org.apache.spark.util.Utils
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/** Location where this task was run. */
/** Location where this task output is. */
def location: BlockManagerId

def updateLocation(bm: BlockManagerId): Unit

/**
* Estimated size for the reduce block, in bytes.
*
Expand Down Expand Up @@ -126,6 +128,10 @@ private[spark] class CompressedMapStatus(

override def location: BlockManagerId = loc

override def updateLocation(bm: BlockManagerId): Unit = {
loc = bm
}

override def getSizeForBlock(reduceId: Int): Long = {
MapStatus.decompressSize(compressedSizes(reduceId))
}
Expand Down Expand Up @@ -178,6 +184,10 @@ private[spark] class HighlyCompressedMapStatus private (

override def location: BlockManagerId = loc

override def updateLocation(bm: BlockManagerId): Unit = {
loc = bm
}

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ?

This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ?

Not a big deal but trying to reduce the number of message types introduced by this feature ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think decommissionself is pretty clearly telling the receiver to decommission itself. That being said I'm open to renaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good ;-) I checked that DecommissionSelf is not indeed used anywhere else, so it should be unambiguous. Lets keep the name.


// Executors to driver
case class RegisterExecutor(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.Future
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.{DeployMessage, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -432,7 +432,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (shouldDisable) {
logInfo(s"Starting decommissioning executor $executorId.")
try {
// Stop making offers on this executor
scheduler.executorDecommission(executorId)
// Send decommission message to the executor (it could have originated there but not
// necessarily).
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
DecommissionSelf)
case None =>
// Ignoring the executor since it is not registered.
logWarning(s"Attempted to decommission unknown executor $executorId.")
}
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] class StandaloneSchedulerBackend(
with StandaloneAppClientListener
with Logging {

private var client: StandaloneAppClient = null
private[spark] var client: StandaloneAppClient = null
private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
Expand Down
Loading