From 6369af0792bb3439d061f5a9ac279c87d39b4d06 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Mon, 19 Jun 2017 22:04:52 +0800 Subject: [PATCH 1/4] on WorkerRemoved, invalidate the outputs on the host from MapStatus. --- .../apache/spark/deploy/DeployMessage.scala | 2 + .../deploy/client/StandaloneAppClient.scala | 4 ++ .../client/StandaloneAppClientListener.scala | 8 ++-- .../apache/spark/deploy/master/Master.scala | 15 ++++--- .../apache/spark/scheduler/DAGScheduler.scala | 40 +++++++++++++++++++ .../spark/scheduler/DAGSchedulerEvent.scala | 3 ++ .../spark/scheduler/TaskScheduler.scala | 5 +++ .../spark/scheduler/TaskSchedulerImpl.scala | 6 +++ .../cluster/CoarseGrainedClusterMessage.scala | 2 + .../CoarseGrainedSchedulerBackend.scala | 16 ++++++++ .../cluster/StandaloneSchedulerBackend.scala | 5 +++ .../spark/deploy/client/AppClientSuite.scala | 2 + .../ExternalClusterManagerSuite.scala | 1 + 13 files changed, 101 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c1a91c27eef2d..49a319abb3238 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -158,6 +158,8 @@ private[deploy] object DeployMessages { case class ApplicationRemoved(message: String) + case class WorkerRemoved(id: String, host: String, message: String) + // DriverClient <-> Master case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 93f58ce63799f..757c930b84eb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -182,6 +182,10 @@ private[spark] class StandaloneAppClient( listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) } + case WorkerRemoved(id, host, message) => + logInfo("Master removed worker %s: %s".format(id, message)) + listener.workerRemoved(id, host, message) + case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 64255ec92b72a..d8bc1a883def1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -18,9 +18,9 @@ package org.apache.spark.deploy.client /** - * Callbacks invoked by deploy client when various events happen. There are currently four events: - * connecting to the cluster, disconnecting, being given an executor, and having an executor - * removed (either due to failure or due to revocation). + * Callbacks invoked by deploy client when various events happen. There are currently five events: + * connecting to the cluster, disconnecting, being given an executor, having an executor removed + * (either due to failure or due to revocation), and having a worker removed. * * Users of this API should *not* block inside the callback methods. */ @@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + + def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f10a41286c52f..c192a0cc82ef6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -498,7 +498,7 @@ private[deploy] class Master( override def onDisconnected(address: RpcAddress): Unit = { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") - addressToWorker.get(address).foreach(removeWorker) + addressToWorker.get(address).foreach(removeWorker(_, s"${address} got disassociated")) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } @@ -544,7 +544,8 @@ private[deploy] class Master( state = RecoveryState.COMPLETING_RECOVERY // Kill off any workers and apps that didn't respond to us. - workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) + workers.filter(_.state == WorkerState.UNKNOWN).foreach( + removeWorker(_, "Not responding for recovery")) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // Update the state of recovered apps to RUNNING @@ -755,7 +756,7 @@ private[deploy] class Master( if (oldWorker.state == WorkerState.UNKNOWN) { // A worker registering from UNKNOWN implies that the worker was restarted during recovery. // The old worker must thus be dead, so we will remove it and accept the new worker. - removeWorker(oldWorker) + removeWorker(oldWorker, "Worker replaced by a new worker with same address") } else { logInfo("Attempted to re-register worker at same address: " + workerAddress) return false @@ -771,7 +772,7 @@ private[deploy] class Master( true } - private def removeWorker(worker: WorkerInfo) { + private def removeWorker(worker: WorkerInfo, msg: String) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) worker.setState(WorkerState.DEAD) idToWorker -= worker.id @@ -795,6 +796,10 @@ private[deploy] class Master( removeDriver(driver.id, DriverState.ERROR, None) } } + logInfo(s"Telling app of lost worker: " + worker.id) + apps.filterNot(completedApps.contains(_)).foreach { app => + app.driver.send(WorkerRemoved(worker.id, worker.host, msg)) + } persistenceEngine.removeWorker(worker) } @@ -979,7 +984,7 @@ private[deploy] class Master( if (worker.state != WorkerState.DEAD) { logWarning("Removing %s because we got no heartbeat in %d seconds".format( worker.id, WORKER_TIMEOUT_MS / 1000)) - removeWorker(worker) + removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds") } else { if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) { workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fafe9cafdc18f..6d5c271e24290 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -259,6 +259,13 @@ class DAGScheduler( eventProcessLoop.post(ExecutorLost(execId, reason)) } + /** + * Called by TaskScheduler implementation when a worker is removed. + */ + def workerRemoved(workerId: String, host: String, message: String): Unit = { + eventProcessLoop.post(WorkerRemoved(workerId, host, message)) + } + /** * Called by TaskScheduler implementation when a host is added. */ @@ -1432,6 +1439,36 @@ class DAGScheduler( } } + /** + * Responds to a worker being removed. This is called inside the event loop, so it assumes it can + * modify the scheduler's internal state. Use workerRemoved() to post a loss event from outside. + * + * We will assume that we've lost all shuffle blocks associated with the host if a worker is + * removed, so we will remove them all from MapStatus. + * + * @param workerId identifier of the worker that is removed. + * @param host host of the worker that is removed. + * @param message the reason why the worker is removed. + */ + private[scheduler] def handleWorkerRemoved( + workerId: String, + host: String, + message: String): Unit = { + logInfo("Shuffle files lost for worker: %s".format(workerId)) + // TODO: This will be really slow if we keep accumulating shuffle map stages + for ((shuffleId, stage) <- shuffleIdToMapStage) { + stage.removeOutputsOnHost(host) + mapOutputTracker.registerMapOutputs( + shuffleId, + stage.outputLocInMapOutputTrackerFormat(), + changeEpoch = true) + } + if (shuffleIdToMapStage.isEmpty) { + mapOutputTracker.incrementEpoch() + } + clearCacheLocs() + } + private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { @@ -1727,6 +1764,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler } dagScheduler.handleExecutorLost(execId, workerLost) + case WorkerRemoved(workerId, host, message) => + dagScheduler.handleWorkerRemoved(workerId, host, message) + case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index cda0585f154a9..3f8d5639a2b90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -86,6 +86,9 @@ private[scheduler] case class ExecutorAdded(execId: String, host: String) extend private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason) extends DAGSchedulerEvent +private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String) + extends DAGSchedulerEvent + private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 3de7d1f7de22b..4c031b8d93d6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -89,6 +89,11 @@ private[spark] trait TaskScheduler { */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit + /** + * Process a removed worker + */ + def workerRemoved(workerId: String, message: String): Unit + /** * Get an application's attempt ID associated with the job. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 629cfc7c7a8ce..63384aeaa4a73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -569,6 +569,12 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { + logInfo(s"Handle removed worker $workerId: $message") + dagScheduler.workerRemoved(workerId, host, message) + backend.reviveOffers() + } + private def logExecutorLoss( executorId: String, hostPort: String, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6b49bd699a13a..3f2e06c4f26ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -85,6 +85,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, message: String) extends CoarseGrainedClusterMessage + case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage // Exchanged between the driver and the AM in Yarn client mode diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dc82bb7704727..b140233385d36 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -219,6 +219,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) context.reply(true) + case RemoveWorker(workerId, host, message) => + removeWorker(workerId, host, message) + context.reply(true) + case RetrieveSparkAppConfig => val reply = SparkAppConfig(sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey()) @@ -331,6 +335,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + // Remove a lost worker from the cluster + private def removeWorker(workerId: String, host: String, message: String): Unit = { + logDebug(s"Asked to remove worker $workerId with reason $message") + scheduler.workerRemoved(workerId, host, message) + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -454,6 +464,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp }(ThreadUtils.sameThread) } + protected def removeWorker(workerId: String, host: String, message: String): Unit = { + driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure { case t => + logError(t.getMessage, t) + }(ThreadUtils.sameThread) + } + def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 0529fe9eed4da..fd8e64454bf70 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -161,6 +161,11 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { + logInfo("Worker %s removed: %s".format(workerId, message)) + removeWorker(workerId, host, message) + } + override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 936639b845789..a1707e6540b39 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -214,6 +214,8 @@ class AppClientSuite id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { execRemovedList.add(id) } + + def workerRemoved(workerId: String, host: String, message: String): Unit = {} } /** Create AppClient and supporting objects */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index ba56af8215cd7..5b0afd655b7e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -84,6 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def workerRemoved(workerId: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, From 825ade06bd47fd201ee90fa28b78b951049cf7a4 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Mon, 19 Jun 2017 22:27:56 +0800 Subject: [PATCH 2/4] update --- .../main/scala/org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../spark/scheduler/cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 ++ .../apache/spark/scheduler/ExternalClusterManagerSuite.scala | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 4c031b8d93d6a..90644fea23ab1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -92,7 +92,7 @@ private[spark] trait TaskScheduler { /** * Process a removed worker */ - def workerRemoved(workerId: String, message: String): Unit + def workerRemoved(workerId: String, host: String, message: String): Unit /** * Get an application's attempt ID associated with the job. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 3f2e06c4f26ab..89a9ad6811e18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -85,7 +85,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage - case class RemoveWorker(workerId: String, message: String) extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) + extends CoarseGrainedClusterMessage case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ddd3281106745..453be26ed8d0c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -131,6 +131,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None } @@ -632,6 +633,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None } val noKillScheduler = new DAGScheduler( diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 5b0afd655b7e3..a4e4ea7cd2894 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -84,7 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} - override def workerRemoved(workerId: String, message: String): Unit = {} + override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, From fb72fb6ea8dc02753d8c699f985afc70d1ccd971 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 20 Jun 2017 18:16:50 +0800 Subject: [PATCH 3/4] remove output from shuffle status --- .../org/apache/spark/scheduler/DAGScheduler.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6d5c271e24290..3422a5f204b12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1454,18 +1454,8 @@ class DAGScheduler( workerId: String, host: String, message: String): Unit = { - logInfo("Shuffle files lost for worker: %s".format(workerId)) - // TODO: This will be really slow if we keep accumulating shuffle map stages - for ((shuffleId, stage) <- shuffleIdToMapStage) { - stage.removeOutputsOnHost(host) - mapOutputTracker.registerMapOutputs( - shuffleId, - stage.outputLocInMapOutputTrackerFormat(), - changeEpoch = true) - } - if (shuffleIdToMapStage.isEmpty) { - mapOutputTracker.incrementEpoch() - } + logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host)) + mapOutputTracker.removeOutputsOnHost(host) clearCacheLocs() } From 294c05499eb784b282b5490675da43c5acc7c2e9 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Thu, 22 Jun 2017 15:36:04 +0800 Subject: [PATCH 4/4] refactor --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 1 - .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 63384aeaa4a73..bba0b294f1afb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -572,7 +572,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( override def workerRemoved(workerId: String, host: String, message: String): Unit = { logInfo(s"Handle removed worker $workerId: $message") dagScheduler.workerRemoved(workerId, host, message) - backend.reviveOffers() } private def logExecutorLoss( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b140233385d36..0b396b794ddce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -235,8 +235,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) - val workOffers = activeExecutors.map { case (id, executorData) => - new WorkerOffer(id, executorData.executorHost, executorData.freeCores) + val workOffers = activeExecutors.map { + case (id, executorData) => + new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq scheduler.resourceOffers(workOffers) } @@ -459,14 +460,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { // Only log the failure since we don't care about the result. - driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { case t => - logError(t.getMessage, t) + driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { + case t => logError(t.getMessage, t) }(ThreadUtils.sameThread) } protected def removeWorker(workerId: String, host: String, message: String): Unit = { - driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure { case t => - logError(t.getMessage, t) + driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure { + case t => logError(t.getMessage, t) }(ThreadUtils.sameThread) }