From 1ab35c040f29c96dc22a6c2398375aedb1e1c2b2 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 24 Nov 2015 17:08:00 +0800 Subject: [PATCH 1/6] Fix AM failure situation for dynamic allocation --- .../spark/ExecutorAllocationManager.scala | 18 +++- .../cluster/CoarseGrainedClusterMessage.scala | 3 + .../CoarseGrainedSchedulerBackend.scala | 19 +++++ .../ExecutorAllocationManagerSuite.scala | 84 +++++++++++++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 9 +- .../cluster/YarnSchedulerBackend.scala | 18 ++++ 6 files changed, 148 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6419218f47c85..648b5242772ae 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -89,6 +89,8 @@ private[spark] class ExecutorAllocationManager( private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Integer.MAX_VALUE) + private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", + minNumExecutors) // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( @@ -121,8 +123,7 @@ private[spark] class ExecutorAllocationManager( // The desired number of executors at this moment in time. If all our executors were to die, this // is the number of executors we would immediately want from the cluster manager. - private var numExecutorsTarget = - conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) + private var numExecutorsTarget = initialNumExecutors // Executors that have been requested to be removed but have not been killed yet private val executorsPendingToRemove = new mutable.HashSet[String] @@ -240,6 +241,19 @@ private[spark] class ExecutorAllocationManager( executor.awaitTermination(10, TimeUnit.SECONDS) } + /** + * Reset the allocation manager to the initial state. Currently this will only be called in + * yarn-client mode where registered again. + */ + def reset(): Unit = synchronized { + initializing = true + numExecutorsTarget = initialNumExecutors + numExecutorsToAdd = 1 + + executorsPendingToRemove.clear() + removeTimes.clear() + } + /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index f3d0d85476772..fddfbe40ef366 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -88,6 +88,9 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage + // Message when cluster manager re-register itself to driver. + case class ReregisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage + // Request executors by specifying the new total number of executors desired // This includes executors already pending or running case class RequestExecutors( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 505c161141c88..1185bcee0bbb1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -341,6 +341,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + /** + * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only + * be called in the yarn-client mode where AM is registered again, also when dynamic + * allocation is enabled. + * */ + protected def reset(): Unit = synchronized { + if (Utils.isDynamicAllocationEnabled(conf)) { + numPendingExecutors = 0 + executorsPendingToRemove.clear() + + // Remove all the lingering executors that should be removed but not yet. The reason might be + // because (1) disconnected event is not yet received; (2) executors die silently. + executorDataMap.foreach { case (eid, _) => + driverEndpoint.askWithRetry[Boolean](RemoveExecutor(eid, SlaveLost())) + } + executorDataMap.clear() + } + } + override def reviveOffers() { driverEndpoint.send(ReviveOffers) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 116f027a0f987..fedfbd547b91b 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -805,6 +805,90 @@ class ExecutorAllocationManagerSuite assert(maxNumExecutorsNeeded(manager) === 1) } + test("reset the state of allocation manager") { + sc = createSparkContext() + val manager = sc.executorAllocationManager.get + assert(numExecutorsTarget(manager) === 1) + assert(numExecutorsToAdd(manager) === 1) + + // Allocation manager is reset when adding executor requests are sent without reporting back + // executor added. + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + + assert(addExecutors(manager) === 1) + assert(numExecutorsTarget(manager) === 2) + assert(addExecutors(manager) === 2) + assert(numExecutorsTarget(manager) === 4) + assert(addExecutors(manager) === 1) + assert(numExecutorsTarget(manager) === 5) + + manager.reset() + assert(numExecutorsTarget(manager) === 1) + assert(numExecutorsToAdd(manager) === 1) + assert(executorIds(manager) === Set.empty) + + // Allocation manager is reset when executors are added. + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + + addExecutors(manager) + addExecutors(manager) + addExecutors(manager) + assert(numExecutorsTarget(manager) === 5) + + onExecutorAdded(manager, "first") + onExecutorAdded(manager, "second") + onExecutorAdded(manager, "third") + onExecutorAdded(manager, "fourth") + onExecutorAdded(manager, "fifth") + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + + // Cluster manager lost will make all the live executors lost, so here simulate this behavior + onExecutorRemoved(manager, "first") + onExecutorRemoved(manager, "second") + onExecutorRemoved(manager, "third") + onExecutorRemoved(manager, "fourth") + onExecutorRemoved(manager, "fifth") + + manager.reset() + assert(numExecutorsTarget(manager) === 1) + assert(numExecutorsToAdd(manager) === 1) + assert(executorIds(manager) === Set.empty) + assert(removeTimes(manager) === Map.empty) + + // Allocation manager is reset when executors are pending to remove + addExecutors(manager) + addExecutors(manager) + addExecutors(manager) + assert(numExecutorsTarget(manager) === 5) + + onExecutorAdded(manager, "first") + onExecutorAdded(manager, "second") + onExecutorAdded(manager, "third") + onExecutorAdded(manager, "fourth") + onExecutorAdded(manager, "fifth") + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + + removeExecutor(manager, "first") + removeExecutor(manager, "second") + assert(executorsPendingToRemove(manager) === Set("first", "second")) + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + + + // Cluster manager lost will make all the live executors lost, so here simulate this behavior + onExecutorRemoved(manager, "first") + onExecutorRemoved(manager, "second") + onExecutorRemoved(manager, "third") + onExecutorRemoved(manager, "fourth") + onExecutorRemoved(manager, "fifth") + + manager.reset() + + assert(numExecutorsTarget(manager) === 1) + assert(numExecutorsToAdd(manager) === 1) + assert(executorsPendingToRemove(manager) === Set.empty) + assert(removeTimes(manager) === Map.empty) + } + private def createSparkContext( minExecutors: Int = 1, maxExecutors: Int = 5, diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 13ef4dfd64165..af22cab089a33 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -583,7 +583,14 @@ private[spark] class ApplicationMaster( extends RpcEndpoint with Logging { override def onStart(): Unit = { - driver.send(RegisterClusterManager(self)) + val attemptId = client.getAttemptId().getAttemptId + if (attemptId == 1 || isClusterMode) { + driver.send(RegisterClusterManager(self)) + } else { + // only send rereigster message when application master is reattempted to start and + // current running mode is yarn-client mode. + driver.send(ReregisterClusterManager(self)) + } } override def receive: PartialFunction[Any, Unit] = { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index e3dd87798f018..bc1e9d3a1a992 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -155,6 +155,15 @@ private[spark] abstract class YarnSchedulerBackend( new YarnDriverEndpoint(rpcEnv, properties) } + /** + * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed + * and re-registered itself to driver. The stale state in driver should be cleaned. + */ + override protected def reset(): Unit = { + super.reset() + sc.executorAllocationManager.foreach(_.reset()) + } + /** * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected. * This endpoint communicates with the executors and queries the AM for an executor's exit @@ -218,6 +227,7 @@ private[spark] abstract class YarnSchedulerBackend( case None => logWarning("Attempted to check for an executor loss reason" + " before the AM has registered!") + driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost())) } } @@ -226,6 +236,13 @@ private[spark] abstract class YarnSchedulerBackend( logInfo(s"ApplicationMaster registered as $am") amEndpoint = Option(am) + case ReregisterClusterManager(am) => + logInfo(s"ApplicationMaster re-registered as $am") + // re-register am endpoint and reset the state of schedulerBackend + // This will only happened in yarn-client mode with AM restarted. + amEndpoint = Option(am) + reset() + case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) @@ -270,6 +287,7 @@ private[spark] abstract class YarnSchedulerBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (amEndpoint.exists(_.address == remoteAddress)) { logWarning(s"ApplicationMaster has disassociated: $remoteAddress") + amEndpoint = None } } From 1bd306d8286295e381aa4fc9902a2bf58f46ee12 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 25 Nov 2015 14:15:05 +0800 Subject: [PATCH 2/6] Remove unnecessary code --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1185bcee0bbb1..4f517b7432112 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -356,7 +356,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean](RemoveExecutor(eid, SlaveLost())) } - executorDataMap.clear() } } From 929cc70ad72ff703ac8ec35d8cf6a5c4022b5e36 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 2 Dec 2015 16:35:16 +0800 Subject: [PATCH 3/6] Address the comments --- .../spark/ExecutorAllocationManager.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 3 --- .../CoarseGrainedSchedulerBackend.scala | 7 +++--- .../spark/deploy/yarn/ApplicationMaster.scala | 9 +------ .../cluster/YarnSchedulerBackend.scala | 24 ++++++++++++------- 5 files changed, 21 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 648b5242772ae..1c05127490a0b 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -243,7 +243,7 @@ private[spark] class ExecutorAllocationManager( /** * Reset the allocation manager to the initial state. Currently this will only be called in - * yarn-client mode where registered again. + * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { initializing = true diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index fddfbe40ef366..f3d0d85476772 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -88,9 +88,6 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage - // Message when cluster manager re-register itself to driver. - case class ReregisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage - // Request executors by specifying the new total number of executors desired // This includes executors already pending or running case class RequestExecutors( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4f517b7432112..478b44ad300d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -343,7 +343,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only - * be called in the yarn-client mode where AM is registered again, also when dynamic + * be called in the yarn-client mode when AM re-registers after a failure, also dynamic * allocation is enabled. * */ protected def reset(): Unit = synchronized { @@ -353,8 +353,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. - executorDataMap.foreach { case (eid, _) => - driverEndpoint.askWithRetry[Boolean](RemoveExecutor(eid, SlaveLost())) + executorDataMap.toMap.foreach { case (eid, _) => + driverEndpoint.askWithRetry[Boolean]( + RemoveExecutor(eid, SlaveLost("Executor stale after cluster manager re-registration"))) } } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index af22cab089a33..13ef4dfd64165 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -583,14 +583,7 @@ private[spark] class ApplicationMaster( extends RpcEndpoint with Logging { override def onStart(): Unit = { - val attemptId = client.getAttemptId().getAttemptId - if (attemptId == 1 || isClusterMode) { - driver.send(RegisterClusterManager(self)) - } else { - // only send rereigster message when application master is reattempted to start and - // current running mode is yarn-client mode. - driver.send(ReregisterClusterManager(self)) - } + driver.send(RegisterClusterManager(self)) } override def receive: PartialFunction[Any, Unit] = { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index bc1e9d3a1a992..df250dd4522e6 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -51,6 +51,9 @@ private[spark] abstract class YarnSchedulerBackend( private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) + // Flag to specify whether AM is registered or not. + @volatile private var isAmRegistered = false + /** Application ID. */ protected var appId: Option[ApplicationId] = None @@ -157,7 +160,8 @@ private[spark] abstract class YarnSchedulerBackend( /** * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed - * and re-registered itself to driver. The stale state in driver should be cleaned. + * and re-registered itself to driver after a failure. The stale state in driver should be + * cleaned. */ override protected def reset(): Unit = { super.reset() @@ -227,7 +231,8 @@ private[spark] abstract class YarnSchedulerBackend( case None => logWarning("Attempted to check for an executor loss reason" + " before the AM has registered!") - driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost())) + driverEndpoint.askWithRetry[Boolean]( + RemoveExecutor(executorId, SlaveLost("AM is not registered yet"))) } } @@ -235,13 +240,14 @@ private[spark] abstract class YarnSchedulerBackend( case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") amEndpoint = Option(am) - - case ReregisterClusterManager(am) => - logInfo(s"ApplicationMaster re-registered as $am") - // re-register am endpoint and reset the state of schedulerBackend - // This will only happened in yarn-client mode with AM restarted. - amEndpoint = Option(am) - reset() + if (!isAmRegistered) { + // First time when AM is registered + isAmRegistered = true + } else { + // AM is already registered before, this potentially means that AM is failed and + // re-registered after the failure. This will only be happened in yarn-client mode. + reset() + } case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) From cc7cc5db19348b153ce0a72d75bfefbffedf92ae Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 2 Dec 2015 17:10:51 +0800 Subject: [PATCH 4/6] Some words change --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 478b44ad300d3..e4f93167633bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -355,7 +355,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( - RemoveExecutor(eid, SlaveLost("Executor stale after cluster manager re-registration"))) + RemoveExecutor(eid, SlaveLost("State executor after cluster manager re-registered"))) } } } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index df250dd4522e6..20ebc844891d8 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -232,7 +232,7 @@ private[spark] abstract class YarnSchedulerBackend( logWarning("Attempted to check for an executor loss reason" + " before the AM has registered!") driverEndpoint.askWithRetry[Boolean]( - RemoveExecutor(executorId, SlaveLost("AM is not registered yet"))) + RemoveExecutor(executorId, SlaveLost("AM is not yet registered"))) } } From b43296844b1d7bcc4e394bfea77fa372001031ee Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 4 Dec 2015 14:39:54 +0800 Subject: [PATCH 5/6] Style fix --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e4f93167633bc..7efe16749e59d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -355,7 +355,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( - RemoveExecutor(eid, SlaveLost("State executor after cluster manager re-registered"))) + RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 20ebc844891d8..9e9a42cc730f5 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -232,7 +232,7 @@ private[spark] abstract class YarnSchedulerBackend( logWarning("Attempted to check for an executor loss reason" + " before the AM has registered!") driverEndpoint.askWithRetry[Boolean]( - RemoveExecutor(executorId, SlaveLost("AM is not yet registered"))) + RemoveExecutor(executorId, SlaveLost("AM is not yet registered."))) } } From 0e1d796811503c3a5c63133b653431c170da1d9b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 7 Dec 2015 15:10:57 +0800 Subject: [PATCH 6/6] Address the comments to change the variable name --- .../scheduler/cluster/YarnSchedulerBackend.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 9e9a42cc730f5..91d61091e5959 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -51,9 +51,6 @@ private[spark] abstract class YarnSchedulerBackend( private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) - // Flag to specify whether AM is registered or not. - @volatile private var isAmRegistered = false - /** Application ID. */ protected var appId: Option[ApplicationId] = None @@ -63,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend( /** Scheduler extension services. */ private val services: SchedulerExtensionServices = new SchedulerExtensionServices() + // Flag to specify whether this schedulerBackend should be reset. + private var shouldResetOnAmRegister = false + /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -240,9 +240,8 @@ private[spark] abstract class YarnSchedulerBackend( case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") amEndpoint = Option(am) - if (!isAmRegistered) { - // First time when AM is registered - isAmRegistered = true + if (!shouldResetOnAmRegister) { + shouldResetOnAmRegister = true } else { // AM is already registered before, this potentially means that AM is failed and // re-registered after the failure. This will only be happened in yarn-client mode.