From 2dadea95c8e2c727e97fca91b0060f666fc0c65b Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Thu, 22 Jun 2017 20:48:12 +0800 Subject: [PATCH] [SPARK-20832][CORE] Standalone master should explicitly inform drivers of worker deaths and invalidate external shuffle service outputs ## What changes were proposed in this pull request? In standalone mode, master should explicitly inform each active driver of any worker deaths, so the invalid external shuffle service outputs on the lost host would be removed from the shuffle mapStatus, thus we can avoid future `FetchFailure`s. ## How was this patch tested? Manually tested by the following steps: 1. Start a standalone Spark cluster with one driver node and two worker nodes; 2. Run a Job with ShuffleMapStage, ensure the outputs distribute on each worker; 3. Run another Job to make all executors exit, but the workers are all alive; 4. Kill one of the workers; 5. Run rdd.collect(), before this change, we should see `FetchFailure`s and failed Stages, while after the change, the job should complete without failure. Before the change: ![image](https://user-images.githubusercontent.com/4784782/27335366-c251c3d6-55fe-11e7-99dd-d1fdcb429210.png) After the change: ![image](https://user-images.githubusercontent.com/4784782/27335393-d1c71640-55fe-11e7-89ed-bd760f1f39af.png) Author: Xingbo Jiang Closes #18362 from jiangxb1987/removeWorker. --- .../apache/spark/deploy/DeployMessage.scala | 2 ++ .../deploy/client/StandaloneAppClient.scala | 4 +++ .../client/StandaloneAppClientListener.scala | 8 +++-- .../apache/spark/deploy/master/Master.scala | 15 ++++++---- .../apache/spark/scheduler/DAGScheduler.scala | 30 +++++++++++++++++++ .../spark/scheduler/DAGSchedulerEvent.scala | 3 ++ .../spark/scheduler/TaskScheduler.scala | 5 ++++ .../spark/scheduler/TaskSchedulerImpl.scala | 5 ++++ .../cluster/CoarseGrainedClusterMessage.scala | 3 ++ .../CoarseGrainedSchedulerBackend.scala | 25 +++++++++++++--- .../cluster/StandaloneSchedulerBackend.scala | 5 ++++ .../spark/deploy/client/AppClientSuite.scala | 2 ++ .../spark/scheduler/DAGSchedulerSuite.scala | 2 ++ .../ExternalClusterManagerSuite.scala | 1 + 14 files changed, 98 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c1a91c27eef2d..49a319abb3238 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -158,6 +158,8 @@ private[deploy] object DeployMessages { case class ApplicationRemoved(message: String) + case class WorkerRemoved(id: String, host: String, message: String) + // DriverClient <-> Master case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 93f58ce63799f..757c930b84eb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -182,6 +182,10 @@ private[spark] class StandaloneAppClient( listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) } + case WorkerRemoved(id, host, message) => + logInfo("Master removed worker %s: %s".format(id, message)) + listener.workerRemoved(id, host, message) + case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 64255ec92b72a..d8bc1a883def1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -18,9 +18,9 @@ package org.apache.spark.deploy.client /** - * Callbacks invoked by deploy client when various events happen. There are currently four events: - * connecting to the cluster, disconnecting, being given an executor, and having an executor - * removed (either due to failure or due to revocation). + * Callbacks invoked by deploy client when various events happen. There are currently five events: + * connecting to the cluster, disconnecting, being given an executor, having an executor removed + * (either due to failure or due to revocation), and having a worker removed. * * Users of this API should *not* block inside the callback methods. */ @@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + + def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f10a41286c52f..c192a0cc82ef6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -498,7 +498,7 @@ private[deploy] class Master( override def onDisconnected(address: RpcAddress): Unit = { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") - addressToWorker.get(address).foreach(removeWorker) + addressToWorker.get(address).foreach(removeWorker(_, s"${address} got disassociated")) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } @@ -544,7 +544,8 @@ private[deploy] class Master( state = RecoveryState.COMPLETING_RECOVERY // Kill off any workers and apps that didn't respond to us. - workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) + workers.filter(_.state == WorkerState.UNKNOWN).foreach( + removeWorker(_, "Not responding for recovery")) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // Update the state of recovered apps to RUNNING @@ -755,7 +756,7 @@ private[deploy] class Master( if (oldWorker.state == WorkerState.UNKNOWN) { // A worker registering from UNKNOWN implies that the worker was restarted during recovery. // The old worker must thus be dead, so we will remove it and accept the new worker. - removeWorker(oldWorker) + removeWorker(oldWorker, "Worker replaced by a new worker with same address") } else { logInfo("Attempted to re-register worker at same address: " + workerAddress) return false @@ -771,7 +772,7 @@ private[deploy] class Master( true } - private def removeWorker(worker: WorkerInfo) { + private def removeWorker(worker: WorkerInfo, msg: String) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) worker.setState(WorkerState.DEAD) idToWorker -= worker.id @@ -795,6 +796,10 @@ private[deploy] class Master( removeDriver(driver.id, DriverState.ERROR, None) } } + logInfo(s"Telling app of lost worker: " + worker.id) + apps.filterNot(completedApps.contains(_)).foreach { app => + app.driver.send(WorkerRemoved(worker.id, worker.host, msg)) + } persistenceEngine.removeWorker(worker) } @@ -979,7 +984,7 @@ private[deploy] class Master( if (worker.state != WorkerState.DEAD) { logWarning("Removing %s because we got no heartbeat in %d seconds".format( worker.id, WORKER_TIMEOUT_MS / 1000)) - removeWorker(worker) + removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds") } else { if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) { workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fafe9cafdc18f..3422a5f204b12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -259,6 +259,13 @@ class DAGScheduler( eventProcessLoop.post(ExecutorLost(execId, reason)) } + /** + * Called by TaskScheduler implementation when a worker is removed. + */ + def workerRemoved(workerId: String, host: String, message: String): Unit = { + eventProcessLoop.post(WorkerRemoved(workerId, host, message)) + } + /** * Called by TaskScheduler implementation when a host is added. */ @@ -1432,6 +1439,26 @@ class DAGScheduler( } } + /** + * Responds to a worker being removed. This is called inside the event loop, so it assumes it can + * modify the scheduler's internal state. Use workerRemoved() to post a loss event from outside. + * + * We will assume that we've lost all shuffle blocks associated with the host if a worker is + * removed, so we will remove them all from MapStatus. + * + * @param workerId identifier of the worker that is removed. + * @param host host of the worker that is removed. + * @param message the reason why the worker is removed. + */ + private[scheduler] def handleWorkerRemoved( + workerId: String, + host: String, + message: String): Unit = { + logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host)) + mapOutputTracker.removeOutputsOnHost(host) + clearCacheLocs() + } + private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { @@ -1727,6 +1754,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler } dagScheduler.handleExecutorLost(execId, workerLost) + case WorkerRemoved(workerId, host, message) => + dagScheduler.handleWorkerRemoved(workerId, host, message) + case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index cda0585f154a9..3f8d5639a2b90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -86,6 +86,9 @@ private[scheduler] case class ExecutorAdded(execId: String, host: String) extend private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason) extends DAGSchedulerEvent +private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String) + extends DAGSchedulerEvent + private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 3de7d1f7de22b..90644fea23ab1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -89,6 +89,11 @@ private[spark] trait TaskScheduler { */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit + /** + * Process a removed worker + */ + def workerRemoved(workerId: String, host: String, message: String): Unit + /** * Get an application's attempt ID associated with the job. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 629cfc7c7a8ce..bba0b294f1afb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -569,6 +569,11 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { + logInfo(s"Handle removed worker $workerId: $message") + dagScheduler.workerRemoved(workerId, host, message) + } + private def logExecutorLoss( executorId: String, hostPort: String, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6b49bd699a13a..89a9ad6811e18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -85,6 +85,9 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) + extends CoarseGrainedClusterMessage + case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage // Exchanged between the driver and the AM in Yarn client mode diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dc82bb7704727..0b396b794ddce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -219,6 +219,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) context.reply(true) + case RemoveWorker(workerId, host, message) => + removeWorker(workerId, host, message) + context.reply(true) + case RetrieveSparkAppConfig => val reply = SparkAppConfig(sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey()) @@ -231,8 +235,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) - val workOffers = activeExecutors.map { case (id, executorData) => - new WorkerOffer(id, executorData.executorHost, executorData.freeCores) + val workOffers = activeExecutors.map { + case (id, executorData) => + new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq scheduler.resourceOffers(workOffers) } @@ -331,6 +336,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + // Remove a lost worker from the cluster + private def removeWorker(workerId: String, host: String, message: String): Unit = { + logDebug(s"Asked to remove worker $workerId with reason $message") + scheduler.workerRemoved(workerId, host, message) + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -449,8 +460,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { // Only log the failure since we don't care about the result. - driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { case t => - logError(t.getMessage, t) + driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { + case t => logError(t.getMessage, t) + }(ThreadUtils.sameThread) + } + + protected def removeWorker(workerId: String, host: String, message: String): Unit = { + driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure { + case t => logError(t.getMessage, t) }(ThreadUtils.sameThread) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 0529fe9eed4da..fd8e64454bf70 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -161,6 +161,11 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { + logInfo("Worker %s removed: %s".format(workerId, message)) + removeWorker(workerId, host, message) + } + override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 936639b845789..a1707e6540b39 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -214,6 +214,8 @@ class AppClientSuite id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { execRemovedList.add(id) } + + def workerRemoved(workerId: String, host: String, message: String): Unit = {} } /** Create AppClient and supporting objects */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ddd3281106745..453be26ed8d0c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -131,6 +131,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None } @@ -632,6 +633,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None } val noKillScheduler = new DAGScheduler( diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index ba56af8215cd7..a4e4ea7cd2894 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -84,6 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} + override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String,