Skip to content

Commit

Permalink
[SPARK-20832][CORE] Standalone master should explicitly inform driver…
Browse files Browse the repository at this point in the history
…s of worker deaths and invalidate external shuffle service outputs

## What changes were proposed in this pull request?

In standalone mode, master should explicitly inform each active driver of any worker deaths, so the invalid external shuffle service outputs on the lost host would be removed from the shuffle mapStatus, thus we can avoid future `FetchFailure`s.

## How was this patch tested?
Manually tested by the following steps:
1. Start a standalone Spark cluster with one driver node and two worker nodes;
2. Run a Job with ShuffleMapStage, ensure the outputs distribute on each worker;
3. Run another Job to make all executors exit, but the workers are all alive;
4. Kill one of the workers;
5. Run rdd.collect(), before this change, we should see `FetchFailure`s and failed Stages, while after the change, the job should complete without failure.

Before the change:
![image](https://user-images.githubusercontent.com/4784782/27335366-c251c3d6-55fe-11e7-99dd-d1fdcb429210.png)

After the change:
![image](https://user-images.githubusercontent.com/4784782/27335393-d1c71640-55fe-11e7-89ed-bd760f1f39af.png)

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18362 from jiangxb1987/removeWorker.
  • Loading branch information
jiangxb1987 authored and cloud-fan committed Jun 22, 2017
1 parent 97b307c commit 2dadea9
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 12 deletions.
Expand Up @@ -158,6 +158,8 @@ private[deploy] object DeployMessages {

case class ApplicationRemoved(message: String)

case class WorkerRemoved(id: String, host: String, message: String)

// DriverClient <-> Master

case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
Expand Down
Expand Up @@ -182,6 +182,10 @@ private[spark] class StandaloneAppClient(
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
}

case WorkerRemoved(id, host, message) =>
logInfo("Master removed worker %s: %s".format(id, message))
listener.workerRemoved(id, host, message)

case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
Expand Down
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.deploy.client

/**
* Callbacks invoked by deploy client when various events happen. There are currently four events:
* connecting to the cluster, disconnecting, being given an executor, and having an executor
* removed (either due to failure or due to revocation).
* Callbacks invoked by deploy client when various events happen. There are currently five events:
* connecting to the cluster, disconnecting, being given an executor, having an executor removed
* (either due to failure or due to revocation), and having a worker removed.
*
* Users of this API should *not* block inside the callback methods.
*/
Expand All @@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener {

def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit

def workerRemoved(workerId: String, host: String, message: String): Unit
}
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Expand Up @@ -498,7 +498,7 @@ private[deploy] class Master(
override def onDisconnected(address: RpcAddress): Unit = {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToWorker.get(address).foreach(removeWorker(_, s"${address} got disassociated"))
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
Expand Down Expand Up @@ -544,7 +544,8 @@ private[deploy] class Master(
state = RecoveryState.COMPLETING_RECOVERY

// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
workers.filter(_.state == WorkerState.UNKNOWN).foreach(
removeWorker(_, "Not responding for recovery"))
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

// Update the state of recovered apps to RUNNING
Expand Down Expand Up @@ -755,7 +756,7 @@ private[deploy] class Master(
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
removeWorker(oldWorker, "Worker replaced by a new worker with same address")
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
Expand All @@ -771,7 +772,7 @@ private[deploy] class Master(
true
}

private def removeWorker(worker: WorkerInfo) {
private def removeWorker(worker: WorkerInfo, msg: String) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
Expand All @@ -795,6 +796,10 @@ private[deploy] class Master(
removeDriver(driver.id, DriverState.ERROR, None)
}
}
logInfo(s"Telling app of lost worker: " + worker.id)
apps.filterNot(completedApps.contains(_)).foreach { app =>
app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
}
persistenceEngine.removeWorker(worker)
}

Expand Down Expand Up @@ -979,7 +984,7 @@ private[deploy] class Master(
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker)
removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Expand Up @@ -259,6 +259,13 @@ class DAGScheduler(
eventProcessLoop.post(ExecutorLost(execId, reason))
}

/**
* Called by TaskScheduler implementation when a worker is removed.
*/
def workerRemoved(workerId: String, host: String, message: String): Unit = {
eventProcessLoop.post(WorkerRemoved(workerId, host, message))
}

/**
* Called by TaskScheduler implementation when a host is added.
*/
Expand Down Expand Up @@ -1432,6 +1439,26 @@ class DAGScheduler(
}
}

/**
* Responds to a worker being removed. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use workerRemoved() to post a loss event from outside.
*
* We will assume that we've lost all shuffle blocks associated with the host if a worker is
* removed, so we will remove them all from MapStatus.
*
* @param workerId identifier of the worker that is removed.
* @param host host of the worker that is removed.
* @param message the reason why the worker is removed.
*/
private[scheduler] def handleWorkerRemoved(
workerId: String,
host: String,
message: String): Unit = {
logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host))
mapOutputTracker.removeOutputsOnHost(host)
clearCacheLocs()
}

private[scheduler] def handleExecutorAdded(execId: String, host: String) {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
Expand Down Expand Up @@ -1727,6 +1754,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
}
dagScheduler.handleExecutorLost(execId, workerLost)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

Expand Down
Expand Up @@ -86,6 +86,9 @@ private[scheduler] case class ExecutorAdded(execId: String, host: String) extend
private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
extends DAGSchedulerEvent

private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
extends DAGSchedulerEvent

private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent
Expand Down
Expand Up @@ -89,6 +89,11 @@ private[spark] trait TaskScheduler {
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit

/**
* Process a removed worker
*/
def workerRemoved(workerId: String, host: String, message: String): Unit

/**
* Get an application's attempt ID associated with the job.
*
Expand Down
Expand Up @@ -569,6 +569,11 @@ private[spark] class TaskSchedulerImpl private[scheduler](
}
}

override def workerRemoved(workerId: String, host: String, message: String): Unit = {
logInfo(s"Handle removed worker $workerId: $message")
dagScheduler.workerRemoved(workerId, host, message)
}

private def logExecutorLoss(
executorId: String,
hostPort: String,
Expand Down
Expand Up @@ -85,6 +85,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage

case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage

case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage

// Exchanged between the driver and the AM in Yarn client mode
Expand Down
Expand Up @@ -219,6 +219,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeExecutor(executorId, reason)
context.reply(true)

case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
context.reply(true)

case RetrieveSparkAppConfig =>
val reply = SparkAppConfig(sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey())
Expand All @@ -231,8 +235,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
Expand Down Expand Up @@ -331,6 +336,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

// Remove a lost worker from the cluster
private def removeWorker(workerId: String, host: String, message: String): Unit = {
logDebug(s"Asked to remove worker $workerId with reason $message")
scheduler.workerRemoved(workerId, host, message)
}

/**
* Stop making resource offers for the given executor. The executor is marked as lost with
* the loss reason still pending.
Expand Down Expand Up @@ -449,8 +460,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
// Only log the failure since we don't care about the result.
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { case t =>
logError(t.getMessage, t)
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
case t => logError(t.getMessage, t)
}(ThreadUtils.sameThread)
}

protected def removeWorker(workerId: String, host: String, message: String): Unit = {
driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
case t => logError(t.getMessage, t)
}(ThreadUtils.sameThread)
}

Expand Down
Expand Up @@ -161,6 +161,11 @@ private[spark] class StandaloneSchedulerBackend(
removeExecutor(fullId.split("/")(1), reason)
}

override def workerRemoved(workerId: String, host: String, message: String): Unit = {
logInfo("Worker %s removed: %s".format(workerId, message))
removeWorker(workerId, host, message)
}

override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
Expand Down
Expand Up @@ -214,6 +214,8 @@ class AppClientSuite
id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
execRemovedList.add(id)
}

def workerRemoved(workerId: String, host: String, message: String): Unit = {}
}

/** Create AppClient and supporting objects */
Expand Down
Expand Up @@ -131,6 +131,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
}

Expand Down Expand Up @@ -632,6 +633,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
}
val noKillScheduler = new DAGScheduler(
Expand Down
Expand Up @@ -84,6 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
Expand Down

0 comments on commit 2dadea9

Please sign in to comment.