From 6df4a9f5f09dfa2f600a2bbf82c58589b16e81f9 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 15 May 2015 15:17:05 +0800 Subject: [PATCH 1/5] blacklist mechanism initial commit --- .../scala/org/apache/spark/SparkContext.scala | 16 ++ .../scheduler/ExecutorBlacklistTracker.scala | 159 ++++++++++++++++++ .../spark/scheduler/TaskSetManager.scala | 10 ++ 3 files changed, 185 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a453c9bf4864a..50c99c18c31c5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -225,6 +225,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _executorBlacklistTracker: Option[ExecutorBlacklistTracker] = None /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -325,6 +326,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] = _executorAllocationManager + private[spark] def executorBlacklistTracker: Option[ExecutorBlacklistTracker] = + _executorBlacklistTracker + private[spark] def cleaner: Option[ContextCleaner] = _cleaner private[spark] var checkpointDir: Option[String] = None @@ -531,6 +535,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } _executorAllocationManager.foreach(_.start()) + val executorBlacklistEnabled = _conf.getBoolean("spark.scheduler.blacklist.enabled", false) + _executorBlacklistTracker = if (executorBlacklistEnabled) { + Some(new ExecutorBlacklistTracker(_conf)) + } else { + None + } + _executorBlacklistTracker.foreach { e => + listenerBus.addListener(e) + e.start() + } + _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) @@ -1638,6 +1653,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } _cleaner.foreach(_.stop()) _executorAllocationManager.foreach(_.stop()) + _executorBlacklistTracker.foreach(_.stop()) if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala new file mode 100644 index 0000000000000..6979c9272e0e6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{TimeUnit, ConcurrentLinkedQueue} + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.util.{Utils, ThreadUtils, SystemClock} + +private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkListener { + import ExecutorBlacklistTracker._ + + private val maxBlacklistFraction = conf.getDouble( + "spark.scheduler.blacklist.maxBlacklistFraction", MAX_BLACKLIST_FRACTION) + private val avgBlacklistThreshold = conf.getDouble( + "spark.scheduler.blacklist.averageBlacklistThreshold", AVERAGE_BLACKLIST_THRESHOLD) + private val executorFaultThreshold = conf.getInt( + "spark.scheduler.blacklist.executorFaultThreshold", EXECUTOR_FAULT_THRESHOLD) + private val executorFaultTimeoutWindowInMinutes = conf.getInt( + "spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes", EXECUTOR_FAULT_TIMEOUT_WINDOW) + + // Count the number of executors registered + private var numExecutorsRegistered: Int = 0 + + // Track the number of failure tasks to executor id + private val executorIdToTaskFailures = new mutable.HashMap[String, Int]() + + // Track the executor id to host mapping relation + private val executorIdToHosts = new mutable.HashMap[String, String]() + + // Maintain the executor blacklist + private val executorBlacklist = new ConcurrentLinkedQueue[(String, Long)]() + + // Clock used to update and exclude the executors which are out of time window. + private val clock = new SystemClock() + + // Executor that handles the scheduling task + private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + def start(): Unit = { + val scheduleTask = new Runnable() { + override def run(): Unit = { + Utils.logUncaughtExceptions(expireTimeoutExecutorBlacklist()) + } + } + executor.scheduleAtFixedRate(scheduleTask, 0L, 60, TimeUnit.SECONDS) + } + + def stop(): Unit = { + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) + } + + def getExecutorBlacklist: Set[String] = { + val executors = new Array[(String, Long)](executorBlacklist.size()) + executorBlacklist.toArray(executors).map(_._1).toSet + } + + def getHostBlacklist: Set[String] = { + val executorBlacklist = getExecutorBlacklist + executorBlacklist.map(executorIdToHosts.get(_)).flatMap(x => x).toSet + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskEnd.reason match { + case e: FetchFailed | ExceptionFailure | TaskResultLost | + ExecutorLostFailure | UnknownReason => + val numFailures = executorIdToTaskFailures.getOrElseUpdate( + taskEnd.taskInfo.executorId, 0) + 1 + executorIdToTaskFailures.put(taskEnd.taskInfo.executorId, numFailures) + // Update the executor blacklist + updateExecutorBlacklist() + case _ => Unit + } + } + + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + numExecutorsRegistered += 1 + executorIdToHosts(executorAdded.executorId) = executorAdded.executorInfo.executorHost + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + numExecutorsRegistered -= 1 + executorIdToHosts -= executorRemoved.executorId + } + + private def updateExecutorBlacklist(): Unit = { + // Filter out the executor Ids where task failure number is larger than executorFaultThreshold + val failedExecutors = executorIdToTaskFailures.filter(_._2 >= executorFaultThreshold) + if (!failedExecutors.isEmpty) { + val avgNumFailed = executorIdToTaskFailures.values.sum.toDouble / executorBlacklist.size + for ( (executorId, numFailed) <- failedExecutors) { + // If the number of failure task is more than average blacklist threshold of average + // failed number and current executor blacklist is less than the max fraction of number + // executors + if ((numFailed.toDouble > avgNumFailed * (1 + avgBlacklistThreshold)) && + (executorBlacklist.size.toDouble < numExecutorsRegistered * maxBlacklistFraction)) { + executorBlacklist.add((executorId, System.currentTimeMillis())) + executorIdToTaskFailures -= executorId + } + } + } + } + + private def expireTimeoutExecutorBlacklist(): Unit = { + val now = clock.getTimeMillis() + var loop = true + + while (loop) { + Option(executorBlacklist.peek()) match { + case (executorId, addedTime) => + if ((now - addedTime) > executorFaultTimeoutWindowInMinutes * 60 * 1000) { + executorBlacklist.poll() + } else { + loop = false + } + case None => loop = false + } + } + } +} + +private[spark] object ExecutorBlacklistTracker { + // The maximum fraction (range [0.0-1.0]) of executors in cluster allowed to be added to the + // blacklist via heuristics. By default, no more than 50% of the executors can be + // heuristically blacklisted. + val MAX_BLACKLIST_FRACTION = 0.5 + + // A executor is blacklisted only if number of faults is more than X% above the average number + // of faults (averaged across all executor in cluster). X is the blacklist threshold here; 0.3 + // would correspond to 130% of the average, for example. + val AVERAGE_BLACKLIST_THRESHOLD = 0.5 + + // Fault threshold (number occurring within EXECUTOR_FAULT_TIMEOUT_WINDOW) + // to consider a executor bad enough to blacklist heuristically. + val EXECUTOR_FAULT_THRESHOLD = 4 + + // Width of overall fault-tracking sliding window (in minutes), that was used to forgive a + // single fault if no others occurred in the interval.) + val EXECUTOR_FAULT_TIMEOUT_WINDOW = 180 +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 82455b0426a5d..4679ed545c0e9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -177,6 +177,9 @@ private[spark] class TaskSetManager( var emittedTaskSizeWarning = false + // Executor blacklist tracker for TaskSetManager to decide how to schedule the task + private val executorBlacklistTracker = sched.sc.executorBlacklistTracker + /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. @@ -287,6 +290,13 @@ private[spark] class TaskSetManager( clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT } + if (executorBlacklistTracker.isDefined) { + val blacklist = executorBlacklistTracker.get.getExecutorBlacklist + if (blacklist.contains(execId)) { + return true + } + } + false } From 031cb196ff4b9d5db11856a2724e760713682a6a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 19 May 2015 17:33:21 +0800 Subject: [PATCH 2/5] Enable blacklist in Yarn --- .../spark/scheduler/ExecutorBlacklistTracker.scala | 10 +++++----- .../org/apache/spark/scheduler/TaskSetManager.scala | 7 +++---- .../cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../scheduler/cluster/YarnSchedulerBackend.scala | 4 +++- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 11 +++++++++++ 5 files changed, 24 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala index 6979c9272e0e6..7d6a5f4e73903 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala @@ -81,8 +81,8 @@ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkList override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.reason match { - case e: FetchFailed | ExceptionFailure | TaskResultLost | - ExecutorLostFailure | UnknownReason => + case _: FetchFailed | _: ExceptionFailure | TaskResultLost | + _: ExecutorLostFailure | UnknownReason => val numFailures = executorIdToTaskFailures.getOrElseUpdate( taskEnd.taskInfo.executorId, 0) + 1 executorIdToTaskFailures.put(taskEnd.taskInfo.executorId, numFailures) @@ -107,13 +107,13 @@ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkList val failedExecutors = executorIdToTaskFailures.filter(_._2 >= executorFaultThreshold) if (!failedExecutors.isEmpty) { val avgNumFailed = executorIdToTaskFailures.values.sum.toDouble / executorBlacklist.size - for ( (executorId, numFailed) <- failedExecutors) { + for ((executorId, numFailed) <- failedExecutors) { // If the number of failure task is more than average blacklist threshold of average // failed number and current executor blacklist is less than the max fraction of number // executors if ((numFailed.toDouble > avgNumFailed * (1 + avgBlacklistThreshold)) && (executorBlacklist.size.toDouble < numExecutorsRegistered * maxBlacklistFraction)) { - executorBlacklist.add((executorId, System.currentTimeMillis())) + executorBlacklist.add((executorId, clock.getTimeMillis())) executorIdToTaskFailures -= executorId } } @@ -126,7 +126,7 @@ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkList while (loop) { Option(executorBlacklist.peek()) match { - case (executorId, addedTime) => + case Some((executorId, addedTime)) => if ((now - addedTime) > executorFaultTimeoutWindowInMinutes * 60 * 1000) { executorBlacklist.poll() } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4679ed545c0e9..4917dd6b37175 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -177,9 +177,6 @@ private[spark] class TaskSetManager( var emittedTaskSizeWarning = false - // Executor blacklist tracker for TaskSetManager to decide how to schedule the task - private val executorBlacklistTracker = sched.sc.executorBlacklistTracker - /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. @@ -280,7 +277,8 @@ private[spark] class TaskSetManager( /** * Is this re-execution of a failed task on an executor it already failed in before - * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? + * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed, or is the task which want to execute on the + * specific executor already blacklisted? */ private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { if (failedExecutors.contains(taskId)) { @@ -290,6 +288,7 @@ private[spark] class TaskSetManager( clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT } + val executorBlacklistTracker = sched.sc.executorBlacklistTracker if (executorBlacklistTracker.isDefined) { val blacklist = executorBlacklistTracker.get.getExecutorBlacklist if (blacklist.contains(execId)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 4be1eda2e9291..7a78dc097e4ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -86,7 +86,8 @@ private[spark] object CoarseGrainedClusterMessages { // Request executors by specifying the new total number of executors desired // This includes executors already pending or running - case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage + case class RequestExecutors(requestedTotal: Int, blacklist: Seq[String]) + extends CoarseGrainedClusterMessage case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 190ff61d689d1..ca71030837328 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -53,7 +53,9 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal)) + val blacklist = scheduler.sc.executorBlacklistTracker + .map(_.getHostBlacklist.toSeq).getOrElse(Nil) + yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal, blacklist)) } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 940873fbd046c..4200d2ed0ca14 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -127,6 +127,9 @@ private[yarn] class YarnAllocator( } } + // Maintain the list of hosts which are blacklisted. + private var hostBlacklist: Seq[String] = Nil + def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = numExecutorsFailed @@ -172,6 +175,14 @@ private[yarn] class YarnAllocator( } } + def updateHostBlacklist(blacklist: Seq[String]): Unit = synchronized { + val blacklistAdditions = blacklist.toSet -- hostBlacklist.toSet + val blacklistRemovals = hostBlacklist.toSet -- blacklist.toSet + + amClient.updateBlacklist(blacklistAdditions.toSeq, blacklistRemovals.toSeq) + hostBlacklist = blacklist + } + /** * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers * equal to maxExecutors. From 9e613557cf4bd89fabbb8ad4c2426119c588fae1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 12 Jun 2015 09:21:51 +0800 Subject: [PATCH 3/5] Continue working on this --- .../org/apache/spark/scheduler/ExecutorBlacklistTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala index 7d6a5f4e73903..065e2a1bbbc26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala @@ -22,7 +22,7 @@ import java.util.concurrent.{TimeUnit, ConcurrentLinkedQueue} import scala.collection.mutable import org.apache.spark._ -import org.apache.spark.util.{Utils, ThreadUtils, SystemClock} +import org.apache.spark.util.{SystemClock, ThreadUtils, Utils} private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkListener { import ExecutorBlacklistTracker._ From b8066f3788fb6458eff8d5afcde266ab77d22fb1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 18 Jun 2015 09:51:43 +0800 Subject: [PATCH 4/5] code refactor and add unit test --- .../scheduler/ExecutorBlacklistTracker.scala | 110 ++++++------ .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../cluster/YarnSchedulerBackend.scala | 4 +- .../ExecutorBlacklistTrackerSuite.scala | 156 ++++++++++++++++++ .../spark/deploy/yarn/YarnAllocator.scala | 11 -- 5 files changed, 221 insertions(+), 63 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala index 065e2a1bbbc26..ae36870918676 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala @@ -17,13 +17,29 @@ package org.apache.spark.scheduler -import java.util.concurrent.{TimeUnit, ConcurrentLinkedQueue} +import java.util.concurrent.TimeUnit import scala.collection.mutable import org.apache.spark._ -import org.apache.spark.util.{SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +/** + * ExecutorBlacklistTracker blacklists the executors by tracking the status of running tasks with + * heuristic algorithm. + * + * A executor will be considered bad enough only when: + * 1. The failure task number on this executor is more than + * spark.scheduler.blacklist.executorFaultThreshold. + * 2. The failure task number on this executor is + * spark.scheduler.blacklist.averageBlacklistThreshold more than average failure task number + * of this cluster. + * + * Also max number of blacklisted executors will not exceed the + * spark.scheduler.blacklist.maxBlacklistFraction of whole cluster, and blacklisted executors + * will be forgiven when there is no failure tasks in the + * spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes. + */ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkListener { import ExecutorBlacklistTracker._ @@ -37,19 +53,13 @@ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkList "spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes", EXECUTOR_FAULT_TIMEOUT_WINDOW) // Count the number of executors registered - private var numExecutorsRegistered: Int = 0 + var numExecutorsRegistered: Int = 0 - // Track the number of failure tasks to executor id - private val executorIdToTaskFailures = new mutable.HashMap[String, Int]() - - // Track the executor id to host mapping relation - private val executorIdToHosts = new mutable.HashMap[String, String]() - - // Maintain the executor blacklist - private val executorBlacklist = new ConcurrentLinkedQueue[(String, Long)]() + // Track the number of failure tasks and time of latest failure to executor id + val executorIdToTaskFailures = new mutable.HashMap[String, ExecutorFailureStatus]() // Clock used to update and exclude the executors which are out of time window. - private val clock = new SystemClock() + private var clock: Clock = new SystemClock() // Executor that handles the scheduling task private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( @@ -69,23 +79,23 @@ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkList executor.awaitTermination(10, TimeUnit.SECONDS) } - def getExecutorBlacklist: Set[String] = { - val executors = new Array[(String, Long)](executorBlacklist.size()) - executorBlacklist.toArray(executors).map(_._1).toSet + def setClock(newClock: Clock): Unit = { + clock = newClock } - def getHostBlacklist: Set[String] = { - val executorBlacklist = getExecutorBlacklist - executorBlacklist.map(executorIdToHosts.get(_)).flatMap(x => x).toSet + def getExecutorBlacklist: Set[String] = synchronized { + executorIdToTaskFailures.filter(_._2.isBlackListed).keys.toSet } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { taskEnd.reason match { case _: FetchFailed | _: ExceptionFailure | TaskResultLost | _: ExecutorLostFailure | UnknownReason => - val numFailures = executorIdToTaskFailures.getOrElseUpdate( - taskEnd.taskInfo.executorId, 0) + 1 - executorIdToTaskFailures.put(taskEnd.taskInfo.executorId, numFailures) + val failureStatus = executorIdToTaskFailures.getOrElseUpdate(taskEnd.taskInfo.executorId, + new ExecutorFailureStatus) + failureStatus.numFailures += 1 + failureStatus.updatedTime = clock.getTimeMillis() + // Update the executor blacklist updateExecutorBlacklist() case _ => Unit @@ -94,45 +104,45 @@ private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends SparkList override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { numExecutorsRegistered += 1 - executorIdToHosts(executorAdded.executorId) = executorAdded.executorInfo.executorHost } - override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved + ): Unit = synchronized { numExecutorsRegistered -= 1 - executorIdToHosts -= executorRemoved.executorId + executorIdToTaskFailures -= executorRemoved.executorId } private def updateExecutorBlacklist(): Unit = { - // Filter out the executor Ids where task failure number is larger than executorFaultThreshold - val failedExecutors = executorIdToTaskFailures.filter(_._2 >= executorFaultThreshold) - if (!failedExecutors.isEmpty) { - val avgNumFailed = executorIdToTaskFailures.values.sum.toDouble / executorBlacklist.size - for ((executorId, numFailed) <- failedExecutors) { + // Filter out the executor Ids where task failure number is larger than + // executorFaultThreshold and not blacklisted + val failedExecutors = executorIdToTaskFailures.filter { case(_, e) => + e.numFailures >= executorFaultThreshold && !e.isBlackListed + } + + val blacklistedExecutorNum = executorIdToTaskFailures.filter(_._2.isBlackListed).size + + if (failedExecutors.nonEmpty) { + val avgNumFailed = executorIdToTaskFailures.values.map(_.numFailures).sum.toDouble / + numExecutorsRegistered + for ((executorId, failureStatus) <- failedExecutors) { // If the number of failure task is more than average blacklist threshold of average // failed number and current executor blacklist is less than the max fraction of number // executors - if ((numFailed.toDouble > avgNumFailed * (1 + avgBlacklistThreshold)) && - (executorBlacklist.size.toDouble < numExecutorsRegistered * maxBlacklistFraction)) { - executorBlacklist.add((executorId, clock.getTimeMillis())) - executorIdToTaskFailures -= executorId + if ((failureStatus.numFailures.toDouble > avgNumFailed * (1 + avgBlacklistThreshold)) && + (blacklistedExecutorNum.toDouble < numExecutorsRegistered * maxBlacklistFraction)) { + failureStatus.isBlackListed = true } } } } - private def expireTimeoutExecutorBlacklist(): Unit = { + private def expireTimeoutExecutorBlacklist(): Unit = synchronized { val now = clock.getTimeMillis() - var loop = true - - while (loop) { - Option(executorBlacklist.peek()) match { - case Some((executorId, addedTime)) => - if ((now - addedTime) > executorFaultTimeoutWindowInMinutes * 60 * 1000) { - executorBlacklist.poll() - } else { - loop = false - } - case None => loop = false + + executorIdToTaskFailures.foreach { case (id, failureStatus) => + if ((now - failureStatus.updatedTime) > executorFaultTimeoutWindowInMinutes * 60 * 1000 + && failureStatus.isBlackListed) { + failureStatus.isBlackListed = false } } } @@ -154,6 +164,12 @@ private[spark] object ExecutorBlacklistTracker { val EXECUTOR_FAULT_THRESHOLD = 4 // Width of overall fault-tracking sliding window (in minutes), that was used to forgive a - // single fault if no others occurred in the interval.) + // single fault if no others occurred in the interval. val EXECUTOR_FAULT_TIMEOUT_WINDOW = 180 + + final class ExecutorFailureStatus { + var numFailures: Int = 0 + var updatedTime: Long = 0L + var isBlackListed: Boolean = false + } } \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 7a78dc097e4ed..4be1eda2e9291 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -86,8 +86,7 @@ private[spark] object CoarseGrainedClusterMessages { // Request executors by specifying the new total number of executors desired // This includes executors already pending or running - case class RequestExecutors(requestedTotal: Int, blacklist: Seq[String]) - extends CoarseGrainedClusterMessage + case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index ca71030837328..190ff61d689d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -53,9 +53,7 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - val blacklist = scheduler.sc.executorBlacklistTracker - .map(_.getHostBlacklist.toSeq).getOrElse(Nil) - yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal, blacklist)) + yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal)) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala new file mode 100644 index 0000000000000..da850b9f7907b --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} + +import org.apache.spark._ +import org.apache.spark.scheduler.ExecutorBlacklistTracker.ExecutorFailureStatus +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.util.ManualClock + +class ExecutorBlacklistTrackerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { + import ExecutorBlacklistTrackerSuite._ + + before { + if (sc == null) { + sc = createSparkContext + } + } + + after { + if (sc != null) { + sc.stop() + sc = null + } + } + + test("add executor to blacklist") { + // Add 5 executors + addExecutors(5) + val tracker = sc.executorBlacklistTracker.get + assert(numExecutorsRegistered(tracker) === 5) + + // Post 5 TaskEnd event to executor-1 to add executor-1 into blacklist + (0 until 5).foreach(_ => postTaskEndEvent(TaskResultLost, "executor-1")) + + assert(tracker.getExecutorBlacklist === Set("executor-1")) + assert(executorIdToTaskFailures(tracker)("executor-1").numFailures === 5) + assert(executorIdToTaskFailures(tracker)("executor-1").isBlackListed === true) + + // Post 10 TaskEnd event to executor-2 to add executor-2 into blacklist + (0 until 10).foreach(_ => postTaskEndEvent(TaskResultLost, "executor-2")) + assert(tracker.getExecutorBlacklist === Set("executor-1", "executor-2")) + assert(executorIdToTaskFailures(tracker)("executor-2").numFailures === 10) + assert(executorIdToTaskFailures(tracker)("executor-2").isBlackListed === true) + + // Post 5 TaskEnd event to executor-3 to verify whether executor-3 is blacklisted + (0 until 5).foreach(_ => postTaskEndEvent(TaskResultLost, "executor-3")) + // Since the failure number of executor-3 is less than the average blacklist threshold, + // though exceed the fault threshold, still should not be added into blacklist + assert(tracker.getExecutorBlacklist === Set("executor-1", "executor-2")) + assert(executorIdToTaskFailures(tracker)("executor-3").numFailures === 5) + assert(executorIdToTaskFailures(tracker)("executor-3").isBlackListed === false) + + // Keep post TaskEnd event to executor-3 to add executor-3 into blacklist + (0 until 2).foreach(_ => postTaskEndEvent(TaskResultLost, "executor-3")) + assert(tracker.getExecutorBlacklist === Set("executor-1", "executor-2", "executor-3")) + assert(executorIdToTaskFailures(tracker)("executor-3").numFailures === 7) + assert(executorIdToTaskFailures(tracker)("executor-3").isBlackListed === true) + + // Post TaskEnd event to executor-4 to verify whether executor-4 could be added into blacklist + (0 until 10).foreach(_ => postTaskEndEvent(TaskResultLost, "executor-4")) + // Event executor-4's failure task number is above than blacklist threshold, + // but the blacklisted executor number is reaching to maximum fraction, + // so executor-4 still cannot be added into blacklist. + assert(tracker.getExecutorBlacklist === Set("executor-1", "executor-2", "executor-3")) + assert(executorIdToTaskFailures(tracker)("executor-4").numFailures === 10) + assert(executorIdToTaskFailures(tracker)("executor-4").isBlackListed === false) + } + + test("remove executor from blacklist") { + // Add 5 executors + addExecutors(5) + val tracker = sc.executorBlacklistTracker.get + val clock = new ManualClock(10000L) + tracker.setClock(clock) + assert(numExecutorsRegistered(tracker) === 5) + + // Post 5 TaskEnd event to executor-1 to add executor-1 into blacklist + (0 until 5).foreach(_ => postTaskEndEvent(TaskResultLost, "executor-1")) + + assert(tracker.getExecutorBlacklist === Set("executor-1")) + assert(executorIdToTaskFailures(tracker)("executor-1").numFailures === 5) + assert(executorIdToTaskFailures(tracker)("executor-1").isBlackListed === true) + assert(executorIdToTaskFailures(tracker)("executor-1").updatedTime === 10000L) + + // Advance the timer + clock.advance(70 * 1000) + expireTimeoutExecutorBlacklist(tracker) + assert(tracker.getExecutorBlacklist === Set.empty) + assert(executorIdToTaskFailures(tracker)("executor-1").numFailures === 5) + assert(executorIdToTaskFailures(tracker)("executor-1").isBlackListed === false) + } + + private def createSparkContext: SparkContext = { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-executor-blacklist-tracker") + .set("spark.scheduler.blacklist.enabled", "true") + .set("spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes", "1") + val sc = new SparkContext(conf) + sc + } + + private def addExecutors(numExecutor: Int): Unit = { + for (i <- 1 to numExecutor) { + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, s"executor-$i", new ExecutorInfo(s"host$i", 1, Map.empty))) + } + } + + private def postTaskEndEvent(taskEndReason: TaskEndReason, executorId: String): Unit = { + val taskInfo = new TaskInfo(0L, 0, 0, 0L, executorId, null, null, false) + val taskEnd = SparkListenerTaskEnd(0, 0, "", taskEndReason, taskInfo, null) + sc.listenerBus.postToAll(taskEnd) + } +} + +private object ExecutorBlacklistTrackerSuite extends PrivateMethodTester { + private val _numExecutorsRegistered = PrivateMethod[Int]('numExecutorsRegistered) + private val _executorIdToTaskFailures = + PrivateMethod[mutable.HashMap[String, ExecutorFailureStatus]]('executorIdToTaskFailures) + private val _expireTimeoutExecutorBlacklist = + PrivateMethod[Unit]('expireTimeoutExecutorBlacklist) + + private def numExecutorsRegistered(tracker: ExecutorBlacklistTracker): Int = { + tracker invokePrivate _numExecutorsRegistered() + } + + private def executorIdToTaskFailures(tracker: ExecutorBlacklistTracker + ): mutable.HashMap[String, ExecutorFailureStatus] = { + tracker invokePrivate _executorIdToTaskFailures() + } + + private def expireTimeoutExecutorBlacklist(tracker: ExecutorBlacklistTracker): Unit = { + tracker invokePrivate _expireTimeoutExecutorBlacklist() + } +} + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4200d2ed0ca14..940873fbd046c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -127,9 +127,6 @@ private[yarn] class YarnAllocator( } } - // Maintain the list of hosts which are blacklisted. - private var hostBlacklist: Seq[String] = Nil - def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = numExecutorsFailed @@ -175,14 +172,6 @@ private[yarn] class YarnAllocator( } } - def updateHostBlacklist(blacklist: Seq[String]): Unit = synchronized { - val blacklistAdditions = blacklist.toSet -- hostBlacklist.toSet - val blacklistRemovals = hostBlacklist.toSet -- blacklist.toSet - - amClient.updateBlacklist(blacklistAdditions.toSeq, blacklistRemovals.toSeq) - hostBlacklist = blacklist - } - /** * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers * equal to maxExecutors. From fb3821f58de7ec95556de5632cb3686177303f34 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 18 Jun 2015 14:19:03 +0800 Subject: [PATCH 5/5] style fix --- .../apache/spark/scheduler/ExecutorBlacklistTracker.scala | 2 +- .../spark/scheduler/ExecutorBlacklistTrackerSuite.scala | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala index ae36870918676..31344adf909f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala @@ -172,4 +172,4 @@ private[spark] object ExecutorBlacklistTracker { var updatedTime: Long = 0L var isBlackListed: Boolean = false } -} \ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala index da850b9f7907b..33ac963be09aa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorBlacklistTrackerSuite.scala @@ -19,14 +19,17 @@ package org.apache.spark.scheduler import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark._ import org.apache.spark.scheduler.ExecutorBlacklistTracker.ExecutorFailureStatus import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.ManualClock -class ExecutorBlacklistTrackerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { +class ExecutorBlacklistTrackerSuite + extends SparkFunSuite + with LocalSparkContext + with BeforeAndAfter { import ExecutorBlacklistTrackerSuite._ before { @@ -153,4 +156,3 @@ private object ExecutorBlacklistTrackerSuite extends PrivateMethodTester { tracker invokePrivate _expireTimeoutExecutorBlacklist() } } -