From 81af45fbcbe9609cd3edaed692cb92520ea3f6e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 22:01:42 -0600 Subject: [PATCH 01/27] Add test case for killExecutorsOnHost --- .../ExecutorAllocationManagerSuite.scala | 2 ++ .../StandaloneDynamicAllocationSuite.scala | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ec409712b953c..87b022fb37cb8 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1147,6 +1147,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend } } + override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } + override def start(): Unit = sb.start() override def stop(): Unit = sb.stop() diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 05dad7a4b86ad..9d92631b95da7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -467,6 +467,23 @@ class StandaloneDynamicAllocationSuite } } + test("kill all executors on localhost") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + // kill all executors + assert(killExecutorsOnHost(sc, "localhost").size == 2) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) + } + // =============================== // | Utility methods for testing | // =============================== @@ -528,6 +545,16 @@ class StandaloneDynamicAllocationSuite } } + /** Kill the executors on a given host. */ + private def killExecutorsOnHost(sc: SparkContext, host: String): Seq[String] = { + syncExecutors(sc) + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost(host) + case _ => fail("expected coarse grained scheduler") + } + } + /** * Return a list of executor IDs belonging to this application. * From 33ac3643a799eaf3a4b8a48db001eb4aa05c39ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 22:36:26 -0600 Subject: [PATCH 02/27] BlacklistTracker can ask the SparkContext to kill executors on a host. Still need to wire in configuration. --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++++++++++++++++ .../spark/scheduler/BlacklistTracker.scala | 3 +++ .../scheduler/BlacklistTrackerSuite.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 869c5d7094cd4..b207aceee3170 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1671,6 +1671,28 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) + /** + * :: DeveloperApo :: + * Request that the cluster manager kill all executors on the specified host. + * + * Note: This is an indication to the cluster manager that the application wishes to adjust + * its resource usage downwards. If the application wishes to replace the executor it kills + * through this method with a new one, it should follow up explicitly with a call to + * {{SparkContext#requestExecutors}}. + * + * @return whether the request is received. + */ + @DeveloperApi + def killExecutorsOnHost(host: String): Boolean = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost(host).nonEmpty + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false + } + } + /** * Request that the cluster manager kill the specified executor without adjusting the * application resource requirements. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 77d5c97a32711..e3717d69045c4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -50,6 +50,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, + sc: Option[SparkContext], clock: Clock = new SystemClock()) extends Logging { def this(sc: SparkContext) = { @@ -185,6 +186,8 @@ private[scheduler] class BlacklistTracker ( logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) + // TODO Only do this if a config value is set. + sc.foreach(context => context.killExecutorsOnHost(node)) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index ead695574d292..d5463fce23012 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -43,7 +43,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.setTime(0) listenerBusMock = mock[LiveListenerBus] - blacklist = new BlacklistTracker(listenerBusMock, conf, clock) + blacklist = new BlacklistTracker(null, conf, None, clock) } override def afterEach(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ffb9fe461a486..d9b13df3481e2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -429,7 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. val mockListenerBus = mock(classOf[LiveListenerBus]) - val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, clock)) + val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, None, clock)) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { From da1d91df24310bdc0a748466f1bde746c080ea6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 11:14:12 -0600 Subject: [PATCH 03/27] Respond to review feedback: basic changes --- .../org/apache/spark/ExecutorAllocationClient.scala | 4 +++- .../src/main/scala/org/apache/spark/SparkContext.scala | 7 +------ .../org/apache/spark/scheduler/BlacklistTracker.scala | 10 +++++----- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 +++++++++ 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 5d47f624ac8a3..dc168525fff6d 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -58,7 +58,9 @@ private[spark] trait ExecutorAllocationClient { */ def killExecutors(executorIds: Seq[String]): Seq[String] - /** + def killExecutorsOnHost(host: String): Seq[String] + + /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b207aceee3170..f38e500019735 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1672,14 +1672,9 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) /** - * :: DeveloperApo :: + * :: DeveloperApi :: * Request that the cluster manager kill all executors on the specified host. * - * Note: This is an indication to the cluster manager that the application wishes to adjust - * its resource usage downwards. If the application wishes to replace the executor it kills - * through this method with a new one, it should follow up explicitly with a call to - * {{SparkContext#requestExecutors}}. - * * @return whether the request is received. */ @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index e3717d69045c4..a8e253afe9a96 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, - sc: Option[SparkContext], + scheduler: ExecutorAllocationClient, clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext) = { - this(sc.listenerBus, sc.conf) + def this(sc: SparkContext, scheduler: ExecutorAllocationClient) = { + this(sc.listenerBus, sc.getConf, scheduler) } BlacklistTracker.validateBlacklistConfs(conf) @@ -187,7 +187,7 @@ private[scheduler] class BlacklistTracker ( s"executors blacklisted: ${blacklistedExecsOnNode}") nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) // TODO Only do this if a config value is set. - sc.foreach(context => context.killExecutorsOnHost(node)) + scheduler.killExecutorsOnHost(node) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 72ed55af41f4d..ecb2db9402f9c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -719,7 +719,7 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc)) + Some(new BlacklistTracker(sc, scheduler)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 31575c0ca0d15..644b5e4c07190 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -600,6 +600,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful(false) + + /** + * Request that the cluster manager kill all executors on a given host. + * @return whether the kill request is acknowledged + */ + final override def killExecutorsOnHost(host: String): Seq[String] = { + logInfo(s"Requesting to kill any and all executors on host ${host}") + killExecutors(scheduler.getExecutorsAliveOnHost(host).get.toSeq, replace = true, force = true) + } } private[spark] object CoarseGrainedSchedulerBackend { From 87bb328f13c9c49c9c0d210394236015aa068690 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 15:13:07 -0600 Subject: [PATCH 04/27] Add documentation for configuration.md --- docs/configuration.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 7a11a983d5972..dc6b7ff26dfe0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1401,6 +1401,15 @@ Apart from these, the following properties are also available, and may be useful on the node may get marked as idle and be reclaimed by the cluster manager. + + spark.blacklist.kill + false + + (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, + executors when they are blacklisted. Note that, when an entire node is added to the blacklist, + all of the executors on that node will be killed. + + spark.speculation false From 974999c314be2b7b96a0643cb1f20de42210d29a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 16:33:13 -0600 Subject: [PATCH 05/27] First implementation of actual executor killing in BlacklistTracker --- .../spark/ExecutorAllocationClient.scala | 11 ++++++ .../spark/internal/config/package.scala | 5 +++ .../spark/scheduler/BlacklistTracker.scala | 37 +++++++++++++++++-- .../spark/scheduler/TaskSchedulerImpl.scala | 6 ++- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../ExecutorAllocationManagerSuite.scala | 9 ++++- 6 files changed, 62 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index dc168525fff6d..508d213ecb5fe 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -58,6 +58,17 @@ private[spark] trait ExecutorAllocationClient { */ def killExecutors(executorIds: Seq[String]): Seq[String] + /** + * Request that the cluster manager try harder to kill the specified executors, + * and maybe replace them. + * @return whether the request is acknowledged by the cluster manager. + */ + def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] + + /** + * Request that the cluster manager kill every executor on the specified host. + * @return the ids of the executors acknowledged by the cluster manager to be removed + */ def killExecutorsOnHost(host: String): Seq[String] /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index aba429bcdca60..ad08ba4cc1e4d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -139,6 +139,11 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val BLACKLIST_KILL_ENABLED = + ConfigBuilder("spark.blacklist.kill") + .booleanConf + .createOptional + private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskBlacklistTime") .internal() diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index a8e253afe9a96..51d4408b3e314 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -50,10 +50,10 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, - scheduler: ExecutorAllocationClient, + scheduler: Option[ExecutorAllocationClient], clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext, scheduler: ExecutorAllocationClient) = { + def this(sc: SparkContext, scheduler: Option[ExecutorAllocationClient]) = { this(sc.listenerBus, sc.getConf, scheduler) } @@ -169,6 +169,21 @@ private[scheduler] class BlacklistTracker ( if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor id: $exec because it has $newTotal" + s" task failures in successful task sets") + conf.get(config.BLACKLIST_ENABLED) match { + case Some(enabled) => + if (enabled) { + scheduler match { + case Some(scheduler) => + logInfo(s"Killing blacklisted executor id: $exec" + + s"since spark.blacklist.kill is set.") + scheduler.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec" + + s"since scheduler is not defined.") + } + } + case None => + } val node = failuresInTaskSet.node executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists)) listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) @@ -185,9 +200,23 @@ private[scheduler] class BlacklistTracker ( !nodeIdToBlacklistExpiryTime.contains(node)) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") + // TODO Prevent the scheduler from offering executors on this host. + conf.get(config.BLACKLIST_ENABLED) match { + case Some(enabled) => + if (enabled) { + scheduler match { + case Some(scheduler) => + logInfo(s"Killing blacklisted executor id: $exec" + + s"since spark.blacklist.kill is set.") + scheduler.killExecutorsOnHost(node) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec" + + s"since scheduler is not defined.") + } + } + case None => + } nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) - // TODO Only do this if a config value is set. - scheduler.killExecutorsOnHost(node) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ecb2db9402f9c..2245c2223fccd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -719,7 +719,11 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc, scheduler)) + val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { + case b: ExecutorAllocationClient => Some(b.asInstanceOf[ExecutorAllocationClient]) + case _ => None + } + Some(new BlacklistTracker(sc, executorAllocClient)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 644b5e4c07190..459026a9fac8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -544,7 +544,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ - final def killExecutors( + final override def killExecutors( executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] = { diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 87b022fb37cb8..81b5b51f7a0fa 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1147,8 +1147,6 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend } } - override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } - override def start(): Unit = sb.start() override def stop(): Unit = sb.stop() @@ -1156,4 +1154,11 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def reviveOffers(): Unit = sb.reviveOffers() override def defaultParallelism(): Int = sb.defaultParallelism() + + // Unused. + override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } + + // Unused. + override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) + : Seq[String] = { Seq.empty[String] } } From ebe35f6fc356acc15edb2a0fa1284ed3976481da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 17:14:35 -0600 Subject: [PATCH 06/27] Additional updates. Not sure if this killing is thread or race safe. --- .../scala/org/apache/spark/SparkContext.scala | 17 ----------------- .../spark/scheduler/BlacklistTracker.scala | 5 ++++- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 +++- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f38e500019735..869c5d7094cd4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1671,23 +1671,6 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) - /** - * :: DeveloperApi :: - * Request that the cluster manager kill all executors on the specified host. - * - * @return whether the request is received. - */ - @DeveloperApi - def killExecutorsOnHost(host: String): Boolean = { - schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host).nonEmpty - case _ => - logWarning("Killing executors is only supported in coarse-grained mode") - false - } - } - /** * Request that the cluster manager kill the specified executor without adjusting the * application resource requirements. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 51d4408b3e314..14e1ad6a7cdd0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -176,6 +176,9 @@ private[scheduler] class BlacklistTracker ( case Some(scheduler) => logInfo(s"Killing blacklisted executor id: $exec" + s"since spark.blacklist.kill is set.") + // TODO Do this killing in the driver via an RPC message? + // TODO Update the coarseGrainedSchedulerBackend's list of executors and hosts + // TODO to fail fast and not attempt to allocate this executor? scheduler.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + @@ -200,7 +203,6 @@ private[scheduler] class BlacklistTracker ( !nodeIdToBlacklistExpiryTime.contains(node)) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") - // TODO Prevent the scheduler from offering executors on this host. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { @@ -208,6 +210,7 @@ private[scheduler] class BlacklistTracker ( case Some(scheduler) => logInfo(s"Killing blacklisted executor id: $exec" + s"since spark.blacklist.kill is set.") + // TODO Same as above. scheduler.killExecutorsOnHost(node) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 459026a9fac8c..f6f7f201a2b28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -607,7 +607,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ final override def killExecutorsOnHost(host: String): Seq[String] = { logInfo(s"Requesting to kill any and all executors on host ${host}") - killExecutors(scheduler.getExecutorsAliveOnHost(host).get.toSeq, replace = true, force = true) + scheduler.getExecutorsAliveOnHost(host).foreach(exec => + killExecutors(exec.toSeq, replace = true, force = true) + ) } } From 56b5b96fc65220604495d5c4e817cfc5071efe22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 17:25:21 -0600 Subject: [PATCH 07/27] Add some implementation thoughts in comments to BlacklistTracker --- .../scala/org/apache/spark/ExecutorAllocationClient.scala | 2 +- .../org/apache/spark/scheduler/BlacklistTracker.scala | 8 ++++++++ .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 508d213ecb5fe..90e71fcd773fb 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -63,7 +63,7 @@ private[spark] trait ExecutorAllocationClient { * and maybe replace them. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] + def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Unit /** * Request that the cluster manager kill every executor on the specified host. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 14e1ad6a7cdd0..9f6758673080f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -203,6 +203,14 @@ private[scheduler] class BlacklistTracker ( !nodeIdToBlacklistExpiryTime.contains(node)) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") + // TODO: + // As soon as this decision has been made, a couple of things need to happen. + // First, as quickly as possible, we need to tell the scheduler backend to: + // not create any additional executors on this host + // (attempt to) fail to create any executors being created. + // not schedule any additional tasks on the executors on this host. + // + // Then, we kill and re-create all the executors on this host. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f6f7f201a2b28..2732ed8079326 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -605,7 +605,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager kill all executors on a given host. * @return whether the kill request is acknowledged */ - final override def killExecutorsOnHost(host: String): Seq[String] = { + final override def killExecutorsOnHost(host: String): Unit = { logInfo(s"Requesting to kill any and all executors on host ${host}") scheduler.getExecutorsAliveOnHost(host).foreach(exec => killExecutors(exec.toSeq, replace = true, force = true) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 81b5b51f7a0fa..491c5d2897c9d 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1160,5 +1160,5 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend // Unused. override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) - : Seq[String] = { Seq.empty[String] } + : Unit = {} } From c4556bd6680b393ffb949bc5b321e38209d91d37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 12 Dec 2016 21:30:11 -0600 Subject: [PATCH 08/27] Update killing of nodes to use an RPC method for synchronization --- .../spark/scheduler/BlacklistTracker.scala | 18 +++--------------- .../cluster/CoarseGrainedClusterMessage.scala | 2 ++ .../CoarseGrainedSchedulerBackend.scala | 9 ++++++--- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 9f6758673080f..abec0f6b0c8da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -174,11 +174,8 @@ private[scheduler] class BlacklistTracker ( if (enabled) { scheduler match { case Some(scheduler) => - logInfo(s"Killing blacklisted executor id: $exec" + + logInfo(s"Killing blacklisted executor id $exec" + s"since spark.blacklist.kill is set.") - // TODO Do this killing in the driver via an RPC message? - // TODO Update the coarseGrainedSchedulerBackend's list of executors and hosts - // TODO to fail fast and not attempt to allocate this executor? scheduler.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + @@ -203,25 +200,16 @@ private[scheduler] class BlacklistTracker ( !nodeIdToBlacklistExpiryTime.contains(node)) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") - // TODO: - // As soon as this decision has been made, a couple of things need to happen. - // First, as quickly as possible, we need to tell the scheduler backend to: - // not create any additional executors on this host - // (attempt to) fail to create any executors being created. - // not schedule any additional tasks on the executors on this host. - // - // Then, we kill and re-create all the executors on this host. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { scheduler match { case Some(scheduler) => - logInfo(s"Killing blacklisted executor id: $exec" + + logInfo(s"Killing all executors on blacklisted host $node" + s"since spark.blacklist.kill is set.") - // TODO Same as above. scheduler.killExecutorsOnHost(node) case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec" + + logWarning(s"Not attempting to kill executors on blacklisted host $node" + s"since scheduler is not defined.") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 02803598097d9..da72b1a5c1f37 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -43,6 +43,8 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage + case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage + sealed trait RegisterExecutorResponse case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2732ed8079326..407060c753ea5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -140,6 +140,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + + case KillExecutorsOnHost(host) => + scheduler.getExecutorsAliveOnHost(host).foreach(exec => + killExecutors(exec.toSeq, replace = true, force = true) + ) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -607,9 +612,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ final override def killExecutorsOnHost(host: String): Unit = { logInfo(s"Requesting to kill any and all executors on host ${host}") - scheduler.getExecutorsAliveOnHost(host).foreach(exec => - killExecutors(exec.toSeq, replace = true, force = true) - ) + driverEndpoint.send(KillExecutorsOnHost(host)) } } From 54236da59bcd992fbb783bc85fee4a1217c57867 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 19 Jan 2017 14:58:36 -0600 Subject: [PATCH 09/27] Have killExecutorsOnHost return Boolean since it delegates the task elsewhere --- .../scala/org/apache/spark/ExecutorAllocationClient.scala | 6 +++--- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 90e71fcd773fb..2807be5958958 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -63,13 +63,13 @@ private[spark] trait ExecutorAllocationClient { * and maybe replace them. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Unit + def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] /** * Request that the cluster manager kill every executor on the specified host. - * @return the ids of the executors acknowledged by the cluster manager to be removed + * @return whether the request is acknowledged by the cluster manager. */ - def killExecutorsOnHost(host: String): Seq[String] + def killExecutorsOnHost(host: String): Boolean /** * Request that the cluster manager kill the specified executor. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 407060c753ea5..979771c1e419f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -608,11 +608,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Request that the cluster manager kill all executors on a given host. - * @return whether the kill request is acknowledged + * @return whether the kill request is acknowledged. */ - final override def killExecutorsOnHost(host: String): Unit = { + final override def killExecutorsOnHost(host: String): Boolean = { logInfo(s"Requesting to kill any and all executors on host ${host}") driverEndpoint.send(KillExecutorsOnHost(host)) + true } } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 9d92631b95da7..1183202608035 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -478,7 +478,7 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === Int.MaxValue) } // kill all executors - assert(killExecutorsOnHost(sc, "localhost").size == 2) + assert(killExecutorsOnHost(sc, "localhost").equals(true)) var apps = getApplications() assert(apps.head.executors.size === 0) assert(apps.head.getExecutorLimit === 0) @@ -546,7 +546,7 @@ class StandaloneDynamicAllocationSuite } /** Kill the executors on a given host. */ - private def killExecutorsOnHost(sc: SparkContext, host: String): Seq[String] = { + private def killExecutorsOnHost(sc: SparkContext, host: String): Unit = { syncExecutors(sc) sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => From 967a095951e9e42b2cea159b72ecb0f550ba5e5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 19 Jan 2017 16:05:44 -0600 Subject: [PATCH 10/27] Fix NPEs in BlacklistTrackerSuite and other build failures --- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 4 ++-- .../org/apache/spark/scheduler/BlacklistTrackerSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 491c5d2897c9d..cac0b501f12a3 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1156,9 +1156,9 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() // Unused. - override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } + override def killExecutorsOnHost(host: String): Boolean = false // Unused. override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) - : Unit = {} + : Seq[String] = Seq.empty[String] } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index d5463fce23012..aacbbfdcd2ae8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -43,7 +43,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.setTime(0) listenerBusMock = mock[LiveListenerBus] - blacklist = new BlacklistTracker(null, conf, None, clock) + blacklist = new BlacklistTracker(listenerBusMock, conf, None, clock) } override def afterEach(): Unit = { From 0e159707c56fa69a85fb822b95547c8578180f0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 24 Jan 2017 12:38:39 -0600 Subject: [PATCH 11/27] Add executors to blacklist before trying to kill them. --- .../spark/scheduler/BlacklistTracker.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index abec0f6b0c8da..9a45ce3a0d7d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -169,6 +169,14 @@ private[scheduler] class BlacklistTracker ( if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor id: $exec because it has $newTotal" + s" task failures in successful task sets") + val node = failuresInTaskSet.node + executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists)) + listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) + executorIdToFailureList.remove(exec) + updateNextExpiryTime() + // Add executor to blacklist before attempting to kill it. This allows a scheduler backend + // to immediately fail to allocate resources on this executor, since killing could be + // asynchronous. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { @@ -184,11 +192,6 @@ private[scheduler] class BlacklistTracker ( } case None => } - val node = failuresInTaskSet.node - executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists)) - listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) - executorIdToFailureList.remove(exec) - updateNextExpiryTime() // In addition to blacklisting the executor, we also update the data for failures on the // node, and potentially put the entire node into a blacklist as well. @@ -200,6 +203,10 @@ private[scheduler] class BlacklistTracker ( !nodeIdToBlacklistExpiryTime.contains(node)) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") + nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) + listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + // As before, blacklist this node before killing all its executors. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { @@ -215,9 +222,6 @@ private[scheduler] class BlacklistTracker ( } case None => } - nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) - listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } } } From 73abda4f1302d72ae25bb2b3db8ae4f7174cec4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 24 Jan 2017 12:53:00 -0600 Subject: [PATCH 12/27] Handle a marked-for-killing executor returning alive in RegisterExecutor --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 979771c1e419f..9d27d32ddf322 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -153,6 +153,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) + } else if (scheduler.nodeBlacklist.contains(executorId)) { + // Handle a race where the cluster manager finishes creating an executor, just after + // the executor is blacklisted and just before it is possibly killed. + executorRef.send(RegisterExecutorFailed("Executor is blacklisted: " + executorId)) + context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. From f1c9386cde902ace2c34de4fdb64d6201063ce3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 24 Jan 2017 12:56:41 -0600 Subject: [PATCH 13/27] Respond to squito's feedback. Minor changes. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2245c2223fccd..bf45cb93912ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -720,7 +720,7 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { - case b: ExecutorAllocationClient => Some(b.asInstanceOf[ExecutorAllocationClient]) + case b: ExecutorAllocationClient => Some(b) case _ => None } Some(new BlacklistTracker(sc, executorAllocClient)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d9b13df3481e2..d03a0c990a02b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -429,7 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. val mockListenerBus = mock(classOf[LiveListenerBus]) - val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, None, clock)) + val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, None, clock)) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { From bc3d969a7fe72b6ea54fd187b996f11965048367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 24 Jan 2017 13:27:42 -0600 Subject: [PATCH 14/27] Respond to tgravescs code review --- .../spark/scheduler/BlacklistTracker.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 9a45ce3a0d7d9..f68727b99d6c4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, - scheduler: Option[ExecutorAllocationClient], + allocationClient: Option[ExecutorAllocationClient], clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext, scheduler: Option[ExecutorAllocationClient]) = { - this(sc.listenerBus, sc.getConf, scheduler) + def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = { + this(sc.listenerBus, sc.getConf, allocationClient) } BlacklistTracker.validateBlacklistConfs(conf) @@ -177,17 +177,17 @@ private[scheduler] class BlacklistTracker ( // Add executor to blacklist before attempting to kill it. This allows a scheduler backend // to immediately fail to allocate resources on this executor, since killing could be // asynchronous. - conf.get(config.BLACKLIST_ENABLED) match { + conf.get(config.BLACKLIST_KILL_ENABLED) match { case Some(enabled) => if (enabled) { - scheduler match { - case Some(scheduler) => + allocationClient match { + case Some(allocationClient) => logInfo(s"Killing blacklisted executor id $exec" + s"since spark.blacklist.kill is set.") - scheduler.killExecutors(Seq(exec), true, true) + allocationClient.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + - s"since scheduler is not defined.") + s"since allocation client is not defined.") } } case None => @@ -207,17 +207,17 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) // As before, blacklist this node before killing all its executors. - conf.get(config.BLACKLIST_ENABLED) match { + conf.get(config.BLACKLIST_KILL_ENABLED) match { case Some(enabled) => if (enabled) { - scheduler match { - case Some(scheduler) => + allocationClient match { + case Some(allocationClient) => logInfo(s"Killing all executors on blacklisted host $node" + s"since spark.blacklist.kill is set.") - scheduler.killExecutorsOnHost(node) + allocationClient.killExecutorsOnHost(node) case None => logWarning(s"Not attempting to kill executors on blacklisted host $node" + - s"since scheduler is not defined.") + s"since allocation client is not defined.") } } case None => From b2bf1f78a3287e79b86074653b45710ddb127d98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 24 Jan 2017 14:01:45 -0600 Subject: [PATCH 15/27] Add null check to get HeartbeatReceiverSuite to pass. Could instead hack HeartbeatReceiverSuite. --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9d27d32ddf322..fe823c6573fa9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -153,7 +153,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) - } else if (scheduler.nodeBlacklist.contains(executorId)) { + } else if (scheduler.nodeBlacklist != null && + scheduler.nodeBlacklist.contains(executorId)) { // Handle a race where the cluster manager finishes creating an executor, just after // the executor is blacklisted and just before it is possibly killed. executorRef.send(RegisterExecutorFailed("Executor is blacklisted: " + executorId)) From 0a3e3f126e2bb0e0c2d84f3750da16de4117dcf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 24 Jan 2017 16:32:57 -0600 Subject: [PATCH 16/27] Add replace argument to killExecutorsOnHost so that tests can not replace killed executors for simplicity. --- .../apache/spark/ExecutorAllocationClient.scala | 2 +- .../apache/spark/scheduler/BlacklistTracker.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../cluster/CoarseGrainedSchedulerBackend.scala | 8 ++++---- .../spark/ExecutorAllocationManagerSuite.scala | 4 +++- .../deploy/StandaloneDynamicAllocationSuite.scala | 15 +++++++++------ 6 files changed, 20 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 2807be5958958..e098f8190398f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -69,7 +69,7 @@ private[spark] trait ExecutorAllocationClient { * Request that the cluster manager kill every executor on the specified host. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutorsOnHost(host: String): Boolean + def killExecutorsOnHost(host: String, replace: Boolean): Boolean /** * Request that the cluster manager kill the specified executor. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index f68727b99d6c4..4b500f0983d7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -214,7 +214,7 @@ private[scheduler] class BlacklistTracker ( case Some(allocationClient) => logInfo(s"Killing all executors on blacklisted host $node" + s"since spark.blacklist.kill is set.") - allocationClient.killExecutorsOnHost(node) + allocationClient.killExecutorsOnHost(node, true) case None => logWarning(s"Not attempting to kill executors on blacklisted host $node" + s"since allocation client is not defined.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index da72b1a5c1f37..0f60ff261f86e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -43,7 +43,8 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage - case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage + case class KillExecutorsOnHost(host: String, replace: Boolean) + extends CoarseGrainedClusterMessage sealed trait RegisterExecutorResponse diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index fe823c6573fa9..add7f0342331b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -141,9 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } - case KillExecutorsOnHost(host) => + case KillExecutorsOnHost(host, replace) => scheduler.getExecutorsAliveOnHost(host).foreach(exec => - killExecutors(exec.toSeq, replace = true, force = true) + killExecutors(exec.toSeq, replace, force = true) ) } @@ -616,9 +616,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager kill all executors on a given host. * @return whether the kill request is acknowledged. */ - final override def killExecutorsOnHost(host: String): Boolean = { + final override def killExecutorsOnHost(host: String, replace: Boolean): Boolean = { logInfo(s"Requesting to kill any and all executors on host ${host}") - driverEndpoint.send(KillExecutorsOnHost(host)) + driverEndpoint.send(KillExecutorsOnHost(host, replace)) true } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index cac0b501f12a3..1bf2b6c27c1d3 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1156,7 +1156,9 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() // Unused. - override def killExecutorsOnHost(host: String): Boolean = false + override def killExecutorsOnHost(host: String, replace: Boolean): Boolean = { + false + } // Unused. override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1183202608035..b6ffa08e0a474 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -477,11 +477,14 @@ class StandaloneDynamicAllocationSuite assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) } - // kill all executors + // kill all executors without replacement assert(killExecutorsOnHost(sc, "localhost").equals(true)) - var apps = getApplications() - assert(apps.head.executors.size === 0) - assert(apps.head.getExecutorLimit === 0) + + eventually(timeout(10.seconds), interval(100.millis)) { + val apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) + } } // =============================== @@ -546,11 +549,11 @@ class StandaloneDynamicAllocationSuite } /** Kill the executors on a given host. */ - private def killExecutorsOnHost(sc: SparkContext, host: String): Unit = { + private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = { syncExecutors(sc) sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host) + b.killExecutorsOnHost(host, false) case _ => fail("expected coarse grained scheduler") } } From 708a57bf9de7285fd53d77644a3ada1ef924edf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 26 Jan 2017 13:13:55 -0600 Subject: [PATCH 17/27] Add unit test to BlacklistTrackerSuite to verify killExecutors is called on ExecutorAllocationClient --- .../scheduler/BlacklistTrackerSuite.scala | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index aacbbfdcd2ae8..4dc16ff6166ce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.scheduler -import org.mockito.Mockito.{verify, when} +import org.mockito.Matchers._ +import org.mockito.Mockito.{times, verify, when} import org.scalatest.BeforeAndAfterEach import org.scalatest.mock.MockitoSugar @@ -272,12 +273,14 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // if task failures are spaced out by more than the timeout period, the first failure is timed // out, and the executor isn't blacklisted. var stageId = 0 + def failOneTaskInTaskSet(exec: String): Unit = { val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId) taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0) blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures) stageId += 1 } + failOneTaskInTaskSet(exec = "1") // We have one sporadic failure on exec 2, but that's it. Later checks ensure that we never // blacklist executor 2 despite this one failure. @@ -411,7 +414,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // if you explicitly set the legacy conf to 0, that also would disable blacklisting conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 0L) assert(!BlacklistTracker.isBlacklistEnabled(conf)) - // but again, the new conf takes precendence + // but again, the new conf takes precedence conf.set(config.BLACKLIST_ENABLED, true) assert(BlacklistTracker.isBlacklistEnabled(conf)) assert(1000 === BlacklistTracker.getBlacklistTimeout(conf)) @@ -456,4 +459,61 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M conf.remove(config) } } + + test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { + val allocationClientMock = mock[ExecutorAllocationClient] + when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) + when(allocationClientMock.killExecutorsOnHost(any(), any())).thenReturn(true) + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. + conf.set(config.BLACKLIST_KILL_ENABLED, false) + + val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0) + // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole + // application. + (0 until 4).foreach { partition => + taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition) + } + blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures) + + verify(allocationClientMock, times(0)).killExecutor(any()) + + val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1) + // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole + // application. Since that's the second executor that is blacklisted on the same node, we also + // blacklist that node. + (0 until 4).foreach { partition => + taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) + } + + verify(allocationClientMock, times(0)).killExecutors(any(), any(), any()) + verify(allocationClientMock, times(0)).killExecutorsOnHost(any(), any()) + + // Enable auto-kill. Blacklist an executor and make sure killExecutors is called. + conf.set(config.BLACKLIST_KILL_ENABLED, true) + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0) + // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole + // application. + (0 until 4).foreach { partition => + taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition) + } + blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures) + + verify(allocationClientMock).killExecutors(Seq("1"), true, true) + + val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1) + // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole + // application. Since that's the second executor that is blacklisted on the same node, we also + // blacklist that node. + (0 until 4).foreach { partition => + taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) + } + blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures) + + verify(allocationClientMock).killExecutors(Seq("2"), true, true) + verify(allocationClientMock).killExecutorsOnHost("hostA", true) + } } From 4794116dd913d2b610fe2ebe269ae86060552a07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 26 Jan 2017 14:09:52 -0600 Subject: [PATCH 18/27] respond to first half of squito 1/25 review --- .../spark/ExecutorAllocationClient.scala | 13 +++-- .../spark/internal/config/package.scala | 2 +- .../spark/scheduler/BlacklistTracker.scala | 49 ++++++++----------- .../CoarseGrainedSchedulerBackend.scala | 11 +++-- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index e098f8190398f..7e01b0edd67a2 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -59,9 +59,16 @@ private[spark] trait ExecutorAllocationClient { def killExecutors(executorIds: Seq[String]): Seq[String] /** - * Request that the cluster manager try harder to kill the specified executors, - * and maybe replace them. - * @return whether the request is acknowledged by the cluster manager. + * Request that the cluster manager kill the specified executors. + * + * When asking the executor to be replaced, the executor loss is considered a failure, and + * killed tasks that are running on the executor will count towards the failure limits. If no + * replacement is being requested, then the tasks will not count towards the limit. + * + * @param executorIds identifiers of executors to kill + * @param replace whether to replace the killed executors with new ones + * @param force whether to force kill busy executors + * @return the ids of the executors acknowledged by the cluster manager to be removed. */ def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ad08ba4cc1e4d..2d7d256321629 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -142,7 +142,7 @@ package object config { private[spark] val BLACKLIST_KILL_ENABLED = ConfigBuilder("spark.blacklist.kill") .booleanConf - .createOptional + .createWithDefault(false) private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskBlacklistTime") diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 4b500f0983d7a..5f273854ae7b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -177,20 +177,16 @@ private[scheduler] class BlacklistTracker ( // Add executor to blacklist before attempting to kill it. This allows a scheduler backend // to immediately fail to allocate resources on this executor, since killing could be // asynchronous. - conf.get(config.BLACKLIST_KILL_ENABLED) match { - case Some(enabled) => - if (enabled) { - allocationClient match { - case Some(allocationClient) => - logInfo(s"Killing blacklisted executor id $exec" + - s"since spark.blacklist.kill is set.") - allocationClient.killExecutors(Seq(exec), true, true) - case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec" + - s"since allocation client is not defined.") - } - } - case None => + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(allocationClient) => + logInfo(s"Killing blacklisted executor id $exec" + + s"since spark.blacklist.kill is set.") + allocationClient.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec" + + s"since allocation client is not defined.") + } } // In addition to blacklisting the executor, we also update the data for failures on the @@ -206,21 +202,16 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) - // As before, blacklist this node before killing all its executors. - conf.get(config.BLACKLIST_KILL_ENABLED) match { - case Some(enabled) => - if (enabled) { - allocationClient match { - case Some(allocationClient) => - logInfo(s"Killing all executors on blacklisted host $node" + - s"since spark.blacklist.kill is set.") - allocationClient.killExecutorsOnHost(node, true) - case None => - logWarning(s"Not attempting to kill executors on blacklisted host $node" + - s"since allocation client is not defined.") - } - } - case None => + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(allocationClient) => + logInfo(s"Killing all executors on blacklisted host $node" + + s"since spark.blacklist.kill is set.") + allocationClient.killExecutorsOnHost(node, true) + case None => + logWarning(s"Not attempting to kill executors on blacklisted host $node" + + s"since allocation client is not defined.") + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index add7f0342331b..0cf1f815c1281 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -154,9 +154,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else if (scheduler.nodeBlacklist != null && - scheduler.nodeBlacklist.contains(executorId)) { - // Handle a race where the cluster manager finishes creating an executor, just after - // the executor is blacklisted and just before it is possibly killed. + scheduler.nodeBlacklist.contains(executorId)) { + // If the cluster manager gives us an executor on a blacklisted node (because it + // already started allocating those resources before we informed it of our blacklist, + // or if it ignored our blacklist), then we reject that executor immediately. + logInfo(s"Rejecting $executorId as it has been blacklisted.") executorRef.send(RegisterExecutorFailed("Executor is blacklisted: " + executorId)) context.reply(true) } else { @@ -552,8 +554,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones * @param force whether to force kill busy executors - * @return whether the kill request is acknowledged. If list to kill is empty, it will return - * false. + * @return the ids of the executors acknowledged by the cluster manager to be removed. */ final override def killExecutors( executorIds: Seq[String], From a1266d443dcfcb7129c297948be9f332d63cfb3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 26 Jan 2017 14:20:55 -0600 Subject: [PATCH 19/27] remove unneeded replace argument from killExecutorsOnHost and rework test --- .../org/apache/spark/ExecutorAllocationClient.scala | 2 +- .../org/apache/spark/scheduler/BlacklistTracker.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 8 ++++---- .../apache/spark/ExecutorAllocationManagerSuite.scala | 2 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 10 ++++++---- 6 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 7e01b0edd67a2..f04bbdf6cbd77 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -76,7 +76,7 @@ private[spark] trait ExecutorAllocationClient { * Request that the cluster manager kill every executor on the specified host. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutorsOnHost(host: String, replace: Boolean): Boolean + def killExecutorsOnHost(host: String): Boolean /** * Request that the cluster manager kill the specified executor. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 5f273854ae7b3..760d1559e8ac9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -207,7 +207,7 @@ private[scheduler] class BlacklistTracker ( case Some(allocationClient) => logInfo(s"Killing all executors on blacklisted host $node" + s"since spark.blacklist.kill is set.") - allocationClient.killExecutorsOnHost(node, true) + allocationClient.killExecutorsOnHost(node) case None => logWarning(s"Not attempting to kill executors on blacklisted host $node" + s"since allocation client is not defined.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 0f60ff261f86e..2898cd7d17ca0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -43,7 +43,7 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage - case class KillExecutorsOnHost(host: String, replace: Boolean) + case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage sealed trait RegisterExecutorResponse diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0cf1f815c1281..682271b471b23 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -141,9 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } - case KillExecutorsOnHost(host, replace) => + case KillExecutorsOnHost(host) => scheduler.getExecutorsAliveOnHost(host).foreach(exec => - killExecutors(exec.toSeq, replace, force = true) + killExecutors(exec.toSeq, replace = true, force = true) ) } @@ -617,9 +617,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager kill all executors on a given host. * @return whether the kill request is acknowledged. */ - final override def killExecutorsOnHost(host: String, replace: Boolean): Boolean = { + final override def killExecutorsOnHost(host: String): Boolean = { logInfo(s"Requesting to kill any and all executors on host ${host}") - driverEndpoint.send(KillExecutorsOnHost(host, replace)) + driverEndpoint.send(KillExecutorsOnHost(host)) true } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 1bf2b6c27c1d3..01b2d7579b471 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1156,7 +1156,7 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() // Unused. - override def killExecutorsOnHost(host: String, replace: Boolean): Boolean = { + override def killExecutorsOnHost(host: String): Boolean = { false } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index b6ffa08e0a474..1e8caa6bcc4d4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -477,13 +477,15 @@ class StandaloneDynamicAllocationSuite assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) } + val beforeList = getApplications().head.executors.keys.toSet // kill all executors without replacement assert(killExecutorsOnHost(sc, "localhost").equals(true)) + syncExecutors(sc) + val afterList = getApplications().head.executors.keys.toSet + eventually(timeout(10.seconds), interval(100.millis)) { - val apps = getApplications() - assert(apps.head.executors.size === 0) - assert(apps.head.getExecutorLimit === 0) + assert(beforeList.intersect(afterList).size == 0) } } @@ -553,7 +555,7 @@ class StandaloneDynamicAllocationSuite syncExecutors(sc) sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host, false) + b.killExecutorsOnHost(host, true) case _ => fail("expected coarse grained scheduler") } } From 2885bf47987fff0a647a04a97acc68d0310d41e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 26 Jan 2017 14:51:01 -0600 Subject: [PATCH 20/27] just one version of killExecutors --- .../apache/spark/ExecutorAllocationClient.scala | 15 ++++++--------- .../apache/spark/internal/config/package.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 13 ++----------- .../spark/ExecutorAllocationManagerSuite.scala | 9 ++++----- .../deploy/StandaloneDynamicAllocationSuite.scala | 2 +- .../spark/scheduler/BlacklistTrackerSuite.scala | 6 +++--- docs/configuration.md | 2 +- 7 files changed, 18 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index f04bbdf6cbd77..239231932b949 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -52,12 +52,6 @@ private[spark] trait ExecutorAllocationClient { */ def requestExecutors(numAdditionalExecutors: Int): Boolean - /** - * Request that the cluster manager kill the specified executors. - * @return the ids of the executors acknowledged by the cluster manager to be removed. - */ - def killExecutors(executorIds: Seq[String]): Seq[String] - /** * Request that the cluster manager kill the specified executors. * @@ -66,11 +60,14 @@ private[spark] trait ExecutorAllocationClient { * replacement is being requested, then the tasks will not count towards the limit. * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones - * @param force whether to force kill busy executors + * @param replace whether to replace the killed executors with new ones, default false + * @param force whether to force kill busy executors, default false * @return the ids of the executors acknowledged by the cluster manager to be removed. */ - def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] + def killExecutors( + executorIds: Seq[String], + replace: Boolean = false, + force: Boolean = false): Seq[String] /** * Request that the cluster manager kill every executor on the specified host. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2d7d256321629..4849032b1f086 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -140,7 +140,7 @@ package object config { .createOptional private[spark] val BLACKLIST_KILL_ENABLED = - ConfigBuilder("spark.blacklist.kill") + ConfigBuilder("spark.blacklist.killBlacklistedExecutors") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 682271b471b23..2782a7320c589 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -535,15 +535,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful(false) - /** - * Request that the cluster manager kill the specified executors. - * @return whether the kill request is acknowledged. If list to kill is empty, it will return - * false. - */ - final override def killExecutors(executorIds: Seq[String]): Seq[String] = { - killExecutors(executorIds, replace = false, force = false) - } - /** * Request that the cluster manager kill the specified executors. * @@ -552,8 +543,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * replacement is being requested, then the tasks will not count towards the limit. * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones - * @param force whether to force kill busy executors + * @param replace whether to replace the killed executors with new ones, default false + * @param force whether to force kill busy executors, default false * @return the ids of the executors acknowledged by the cluster manager to be removed. */ final override def killExecutors( diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 01b2d7579b471..0eb606350458d 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1138,7 +1138,10 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def requestExecutors(numAdditionalExecutors: Int): Boolean = sc.requestExecutors(numAdditionalExecutors) - override def killExecutors(executorIds: Seq[String]): Seq[String] = { + override def killExecutors( + executorIds: Seq[String], + replace: Boolean, + force: Boolean): Seq[String] = { val response = sc.killExecutors(executorIds) if (response) { executorIds @@ -1159,8 +1162,4 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def killExecutorsOnHost(host: String): Boolean = { false } - - // Unused. - override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) - : Seq[String] = Seq.empty[String] } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1e8caa6bcc4d4..53b2915dbf1b6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -555,7 +555,7 @@ class StandaloneDynamicAllocationSuite syncExecutors(sc) sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host, true) + b.killExecutorsOnHost(host) case _ => fail("expected coarse grained scheduler") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 4dc16ff6166ce..dc917804d8edd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -463,7 +463,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { val allocationClientMock = mock[ExecutorAllocationClient] when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) - when(allocationClientMock.killExecutorsOnHost(any(), any())).thenReturn(true) + when(allocationClientMock.killExecutorsOnHost(any())).thenReturn(true) blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. @@ -488,7 +488,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M } verify(allocationClientMock, times(0)).killExecutors(any(), any(), any()) - verify(allocationClientMock, times(0)).killExecutorsOnHost(any(), any()) + verify(allocationClientMock, times(0)).killExecutorsOnHost(any()) // Enable auto-kill. Blacklist an executor and make sure killExecutors is called. conf.set(config.BLACKLIST_KILL_ENABLED, true) @@ -514,6 +514,6 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures) verify(allocationClientMock).killExecutors(Seq("2"), true, true) - verify(allocationClientMock).killExecutorsOnHost("hostA", true) + verify(allocationClientMock).killExecutorsOnHost("hostA") } } diff --git a/docs/configuration.md b/docs/configuration.md index dc6b7ff26dfe0..56f209ad54997 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1402,7 +1402,7 @@ Apart from these, the following properties are also available, and may be useful - spark.blacklist.kill + spark.blacklist.killBlacklistedExecutors false (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, From 580999e51692824af778fc3e191d735b2badb724 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Thu, 26 Jan 2017 16:15:42 -0600 Subject: [PATCH 21/27] respond to remainder of vanzin 1/25 feedback --- .../scala/org/apache/spark/scheduler/BlacklistTracker.scala | 5 +---- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 +++--- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 760d1559e8ac9..17911e3d46dfe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -54,7 +54,7 @@ private[scheduler] class BlacklistTracker ( clock: Clock = new SystemClock()) extends Logging { def this(sc: SparkContext, allocationClient: Option[ExecutorAllocationClient]) = { - this(sc.listenerBus, sc.getConf, allocationClient) + this(sc.listenerBus, sc.conf, allocationClient) } BlacklistTracker.validateBlacklistConfs(conf) @@ -174,9 +174,6 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) executorIdToFailureList.remove(exec) updateNextExpiryTime() - // Add executor to blacklist before attempting to kill it. This allows a scheduler backend - // to immediately fail to allocate resources on this executor, since killing could be - // asynchronous. if (conf.get(config.BLACKLIST_KILL_ENABLED)) { allocationClient match { case Some(allocationClient) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2782a7320c589..3551b9630004c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -142,9 +142,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } case KillExecutorsOnHost(host) => - scheduler.getExecutorsAliveOnHost(host).foreach(exec => + scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) - ) + } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -154,7 +154,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else if (scheduler.nodeBlacklist != null && - scheduler.nodeBlacklist.contains(executorId)) { + scheduler.nodeBlacklist.contains(hostname)) { // If the cluster manager gives us an executor on a blacklisted node (because it // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 0eb606350458d..4ea42fc7d5c22 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1158,7 +1158,6 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() - // Unused. override def killExecutorsOnHost(host: String): Boolean = { false } From eed4112c092d49b4eafab363f3b0a16d83ec7c9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 27 Jan 2017 16:48:51 -0600 Subject: [PATCH 22/27] minor formatting cleanups from vanzin and squito --- .../org/apache/spark/ExecutorAllocationClient.scala | 2 ++ .../org/apache/spark/internal/config/package.scala | 4 ++-- .../org/apache/spark/scheduler/BlacklistTracker.scala | 9 ++++----- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/scheduler/BlacklistTrackerSuite.scala | 10 +++++----- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 239231932b949..e4b9f8111efca 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -71,6 +71,8 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill every executor on the specified host. + * Results in a call to killExecutors for each executor on the host, with the replace + * and force arguments set to true. * @return whether the request is acknowledged by the cluster manager. */ def killExecutorsOnHost(host: String): Boolean diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4849032b1f086..9f504d7db874d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -141,8 +141,8 @@ package object config { private[spark] val BLACKLIST_KILL_ENABLED = ConfigBuilder("spark.blacklist.killBlacklistedExecutors") - .booleanConf - .createWithDefault(false) + .booleanConf + .createWithDefault(false) private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskBlacklistTime") diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 17911e3d46dfe..22883d7f57c47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -177,11 +177,10 @@ private[scheduler] class BlacklistTracker ( if (conf.get(config.BLACKLIST_KILL_ENABLED)) { allocationClient match { case Some(allocationClient) => - logInfo(s"Killing blacklisted executor id $exec" + - s"since spark.blacklist.kill is set.") + logInfo(s"Killing blacklisted executor id $exec since spark.blacklist.kill is set.") allocationClient.killExecutors(Seq(exec), true, true) case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec" + + logWarning(s"Not attempting to kill blacklisted executor id $exec " + s"since allocation client is not defined.") } } @@ -202,11 +201,11 @@ private[scheduler] class BlacklistTracker ( if (conf.get(config.BLACKLIST_KILL_ENABLED)) { allocationClient match { case Some(allocationClient) => - logInfo(s"Killing all executors on blacklisted host $node" + + logInfo(s"Killing all executors on blacklisted host $node " + s"since spark.blacklist.kill is set.") allocationClient.killExecutorsOnHost(node) case None => - logWarning(s"Not attempting to kill executors on blacklisted host $node" + + logWarning(s"Not attempting to kill executors on blacklisted host $node " + s"since allocation client is not defined.") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3551b9630004c..c50296be5f0d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -159,7 +159,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") - executorRef.send(RegisterExecutorFailed("Executor is blacklisted: " + executorId)) + executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index dc917804d8edd..6a0794355aca8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler -import org.mockito.Matchers._ -import org.mockito.Mockito.{times, verify, when} +import org.mockito.Matchers.any +import org.mockito.Mockito.{never, verify, when} import org.scalatest.BeforeAndAfterEach import org.scalatest.mock.MockitoSugar @@ -477,7 +477,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures) - verify(allocationClientMock, times(0)).killExecutor(any()) + verify(allocationClientMock, never).killExecutor(any()) val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1) // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole @@ -487,8 +487,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) } - verify(allocationClientMock, times(0)).killExecutors(any(), any(), any()) - verify(allocationClientMock, times(0)).killExecutorsOnHost(any()) + verify(allocationClientMock, never).killExecutors(any(), any(), any()) + verify(allocationClientMock, never).killExecutorsOnHost(any()) // Enable auto-kill. Blacklist an executor and make sure killExecutors is called. conf.set(config.BLACKLIST_KILL_ENABLED, true) From fe062e5296842dabd62e86d6cc4dbad328a24f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Sat, 28 Jan 2017 05:54:31 -0600 Subject: [PATCH 23/27] Update blacklist kills executors test to check that hostA is indeed blacklisted. Start on executor rejection test. --- .../spark/scheduler/BlacklistTracker.scala | 4 +- .../scheduler/BlacklistTrackerSuite.scala | 38 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 22883d7f57c47..62c21baf6bdf2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -203,7 +203,9 @@ private[scheduler] class BlacklistTracker ( case Some(allocationClient) => logInfo(s"Killing all executors on blacklisted host $node " + s"since spark.blacklist.kill is set.") - allocationClient.killExecutorsOnHost(node) + if(allocationClient.killExecutorsOnHost(node) == false) { + logError(s"Killing executors on node $node failed.") + } case None => logWarning(s"Not attempting to kill executors on blacklisted host $node " + s"since allocation client is not defined.") diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 6a0794355aca8..9a65f023d6673 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -17,8 +17,10 @@ package org.apache.spark.scheduler +import org.mockito.invocation.InvocationOnMock import org.mockito.Matchers.any import org.mockito.Mockito.{never, verify, when} +import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfterEach import org.scalatest.mock.MockitoSugar @@ -463,7 +465,14 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { val allocationClientMock = mock[ExecutorAllocationClient] when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) - when(allocationClientMock.killExecutorsOnHost(any())).thenReturn(true) + when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + if (blacklist.nodeBlacklist.contains("hostA") == false) { + throw new IllegalStateException("hostA should be on the blacklist") + } + true + } + }) blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. @@ -516,4 +525,31 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M verify(allocationClientMock).killExecutors(Seq("2"), true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") } + + /* + Also it would be nice to have a test for CoarseGrainedSchedulerBackend, that it rejects executors + that are registered on a blacklisted node -- that one will be a bit more work to setup, but + I think it should be possible. + + These tests won't really be stressing this implementation, but they'll help prevent regressions + with future changes. + */ + + test("reject executors on a node scheduled for killing") { + val allocationClientMock = mock[ExecutorAllocationClient] + when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) + // when(allocationClientMock.killExecutorsOnHost(any())).thenReturn(true) + when(allocationClientMock.killExecutorsOnHost("hostA")). + thenReturn(blacklist.nodeBlacklist.contains("hostA")) + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. + conf.set(config.BLACKLIST_KILL_ENABLED, false) + + // Construct an environment with a node and three executors. + // Blacklist the first executor. + // Blacklist the second executor. This should trigger node killing. + // Try to allocate a resource on the third executor and verify that you are unable. + + } } From b1fd9c39c5ea73d782b455d1706c21003fdaa229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Sat, 4 Feb 2017 02:14:45 -0600 Subject: [PATCH 24/27] add test: executor registration on a blacklisted host must fail --- .../StandaloneDynamicAllocationSuite.scala | 29 +++++++++++++++++-- .../scheduler/BlacklistTrackerSuite.scala | 28 +----------------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 53b2915dbf1b6..b45c152fc5d72 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.deploy import scala.collection.mutable import scala.concurrent.duration._ -import org.mockito.Mockito.{mock, when} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, verify, when} import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ @@ -29,10 +30,11 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMaste import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker +import org.apache.spark.internal.config import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RegisterExecutorFailed} /** * End-to-end tests for dynamic allocation in standalone mode. @@ -489,6 +491,29 @@ class StandaloneDynamicAllocationSuite } } + test("executor registration on a blacklisted host must fail") { + sc = new SparkContext(appConf.set(config.BLACKLIST_ENABLED.key, "true")) + val endpointRef = mock(classOf[RpcEndpointRef]) + val mockAddress = mock(classOf[RpcAddress]) + when(endpointRef.address).thenReturn(mockAddress) + val message = RegisterExecutor("one", endpointRef, "localhost", 10, Map.empty) + + // Get "localhost" on a blacklist. + val taskScheduler = mock(classOf[TaskSchedulerImpl]) + when(taskScheduler.nodeBlacklist()).thenReturn(Set("localhost")) + when(taskScheduler.sc).thenReturn(sc) + sc.taskScheduler = taskScheduler + + // Create a fresh scheduler backend to blacklist "localhost". + sc.schedulerBackend.stop() + val backend = + new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL)) + backend.start() + + backend.driverEndpoint.ask[Boolean](message) + verify(endpointRef).send(RegisterExecutorFailed(any())) + } + // =============================== // | Utility methods for testing | // =============================== diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 9a65f023d6673..8e8fb28d9b198 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -495,6 +495,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M (0 until 4).foreach { partition => taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) } + blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures) verify(allocationClientMock, never).killExecutors(any(), any(), any()) verify(allocationClientMock, never).killExecutorsOnHost(any()) @@ -525,31 +526,4 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M verify(allocationClientMock).killExecutors(Seq("2"), true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") } - - /* - Also it would be nice to have a test for CoarseGrainedSchedulerBackend, that it rejects executors - that are registered on a blacklisted node -- that one will be a bit more work to setup, but - I think it should be possible. - - These tests won't really be stressing this implementation, but they'll help prevent regressions - with future changes. - */ - - test("reject executors on a node scheduled for killing") { - val allocationClientMock = mock[ExecutorAllocationClient] - when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) - // when(allocationClientMock.killExecutorsOnHost(any())).thenReturn(true) - when(allocationClientMock.killExecutorsOnHost("hostA")). - thenReturn(blacklist.nodeBlacklist.contains("hostA")) - blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) - - // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. - conf.set(config.BLACKLIST_KILL_ENABLED, false) - - // Construct an environment with a node and three executors. - // Blacklist the first executor. - // Blacklist the second executor. This should trigger node killing. - // Try to allocate a resource on the third executor and verify that you are unable. - - } } From 3daafccf808e998bb8943e12be7675440b1027ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Sat, 4 Feb 2017 02:16:43 -0600 Subject: [PATCH 25/27] tgravescs review feedback --- .../org/apache/spark/scheduler/BlacklistTracker.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 62c21baf6bdf2..e130e609e4f63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -177,7 +177,8 @@ private[scheduler] class BlacklistTracker ( if (conf.get(config.BLACKLIST_KILL_ENABLED)) { allocationClient match { case Some(allocationClient) => - logInfo(s"Killing blacklisted executor id $exec since spark.blacklist.kill is set.") + logInfo(s"Killing blacklisted executor id $exec " + + s"since spark.blacklist.killBlacklistedExecutors is set.") allocationClient.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec " + @@ -202,8 +203,8 @@ private[scheduler] class BlacklistTracker ( allocationClient match { case Some(allocationClient) => logInfo(s"Killing all executors on blacklisted host $node " + - s"since spark.blacklist.kill is set.") - if(allocationClient.killExecutorsOnHost(node) == false) { + s"since spark.blacklist.killBlacklistedExecutors is set.") + if (allocationClient.killExecutorsOnHost(node) == false) { logError(s"Killing executors on node $node failed.") } case None => From 37248a202c15807fffe9e25e5b630a27dda38204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 6 Feb 2017 13:20:09 -0600 Subject: [PATCH 26/27] Updates for squito code review from morning of 2017-02-06 --- .../cluster/CoarseGrainedSchedulerBackend.scala | 6 ++++++ .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 9 +++++---- .../apache/spark/scheduler/BlacklistTrackerSuite.scala | 3 +++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c50296be5f0d3..e006cc96569af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -610,6 +610,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ final override def killExecutorsOnHost(host: String): Boolean = { logInfo(s"Requesting to kill any and all executors on host ${host}") + // A potential race exists if a new executor attempts to register on a host + // that is on the blacklist and is no no longer valid. To avoid this race, + // all executor registration and killing happens in the event loop. This way, either + // an executor will fail to register, or will be killed when all executors on a host + // are killed. + // Kill all the executors on this host in an event loop to ensure serialization. driverEndpoint.send(KillExecutorsOnHost(host)) true } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index b45c152fc5d72..db5b5eeb9f4ed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -480,7 +480,6 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === Int.MaxValue) } val beforeList = getApplications().head.executors.keys.toSet - // kill all executors without replacement assert(killExecutorsOnHost(sc, "localhost").equals(true)) syncExecutors(sc) @@ -496,11 +495,11 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor("one", endpointRef, "localhost", 10, Map.empty) + val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty) // Get "localhost" on a blacklist. val taskScheduler = mock(classOf[TaskSchedulerImpl]) - when(taskScheduler.nodeBlacklist()).thenReturn(Set("localhost")) + when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) when(taskScheduler.sc).thenReturn(sc) sc.taskScheduler = taskScheduler @@ -511,7 +510,9 @@ class StandaloneDynamicAllocationSuite backend.start() backend.driverEndpoint.ask[Boolean](message) - verify(endpointRef).send(RegisterExecutorFailed(any())) + eventually(timeout(10.seconds), interval(100.millis)) { + verify(endpointRef).send(RegisterExecutorFailed(any())) + } } // =============================== diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 8e8fb28d9b198..2b18ebee79a2b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -466,6 +466,9 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M val allocationClientMock = mock[ExecutorAllocationClient] when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] { + // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist + // is updated before we ask the executor allocation client to kill all the executors + // on a particular host. override def answer(invocation: InvocationOnMock): Boolean = { if (blacklist.nodeBlacklist.contains("hostA") == false) { throw new IllegalStateException("hostA should be on the blacklist") From cb241672692db3e604c18bcd56f441f6863a09e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 6 Feb 2017 13:33:02 -0600 Subject: [PATCH 27/27] fix indentation --- .../apache/spark/deploy/StandaloneDynamicAllocationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index db5b5eeb9f4ed..54ea72737c5b2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -506,7 +506,7 @@ class StandaloneDynamicAllocationSuite // Create a fresh scheduler backend to blacklist "localhost". sc.schedulerBackend.stop() val backend = - new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL)) + new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL)) backend.start() backend.driverEndpoint.ask[Boolean](message)