From c23f887b62a75415bab74036e78d03b92b1a5541 Mon Sep 17 00:00:00 2001 From: Grace Date: Wed, 28 Oct 2015 23:04:37 +0800 Subject: [PATCH 01/21] rebase to master branch --- .../spark/ExecutorAllocationClient.scala | 12 +++++++-- .../spark/ExecutorAllocationManager.scala | 9 +++++++ .../scala/org/apache/spark/SparkContext.scala | 11 ++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 25 +++++++++++++------ .../CoarseGrainedSchedulerBackend.scala | 25 ++++++++++++++++--- 5 files changed, 64 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 842bfdbadc948..c0544f95e4a3f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -52,11 +52,19 @@ private[spark] trait ExecutorAllocationClient { * Request that the cluster manager kill the specified executors. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutors(executorIds: Seq[String]): Boolean + def killExecutors(executorIds: Seq[String], force: Boolean): Boolean /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) + private[spark] def killExecutor(executorId: String): Boolean = + killExecutors(Seq(executorId), force = false) + + /** + * Request that the cluster manager kill the specified executor. + * @return whether the request is acknowledged by the cluster manager. + */ + def killExecutor(executorId: String, force: Boolean): Boolean = + killExecutors(Seq(executorId), force) } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..95b13560fcccc 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -403,6 +403,8 @@ private[spark] class ExecutorAllocationManager( // Send a request to the backend to kill this executor val removeRequestAcknowledged = testing || client.killExecutor(executorId) if (removeRequestAcknowledged) { + // even we get removeRequestAcknowledged, the executor may not be killed + // it can be rescued while onTaskStart event happens logInfo(s"Removing executor $executorId because it has been idle for " + s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})") executorsPendingToRemove.add(executorId) @@ -509,6 +511,13 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) + + // Executor is added to remove by misjudgment due to async listener making it as idle). + // see SPARK-9552 + if (executorsPendingToRemove.contains(executorId)) { + // Rescue the executor from pending to remove list + executorsPendingToRemove.remove(executorId) + } } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a6857b4c7d882..1ce4c92440370 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1448,10 +1448,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutors(executorIds: Seq[String]): Boolean = { + override def killExecutors(executorIds: Seq[String], force: Boolean = true): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds) + b.killExecutors(executorIds, force) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false @@ -1470,7 +1470,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) + override def killExecutor(executorId: String, force: Boolean = true): Boolean = + super.killExecutor(executorId, force) /** * Request that the cluster manager kill the specified executor without adjusting the @@ -1486,10 +1487,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * @return whether the request is received. */ - private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { + private[spark] def killAndReplaceExecutor(executorId: String, force: Boolean = true): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(Seq(executorId), replace = true) + b.killExecutors(Seq(executorId), replace = true, force) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1c7bfe89c02ac..421ce0b43b7fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -88,7 +88,8 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on - val activeExecutorIds = new HashSet[String] + // each executor will record running or launched task number + val activeExecutorIdsWithLoads = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host @@ -254,6 +255,7 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId + activeExecutorIdsWithLoads(execId) += 1 executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) @@ -282,7 +284,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host - activeExecutorIds += o.executorId + activeExecutorIdsWithLoads.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -331,7 +333,8 @@ private[spark] class TaskSchedulerImpl( if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (activeExecutorIds.contains(execId)) { + + if (activeExecutorIdsWithLoads.contains(execId)) { removeExecutor(execId, SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) failedExecutor = Some(execId) @@ -341,7 +344,10 @@ private[spark] class TaskSchedulerImpl( case Some(taskSet) => if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) - taskIdToExecutorId.remove(tid) + taskIdToExecutorId.remove(tid) match { + case Some(execId) => activeExecutorIdsWithLoads(execId) -= 1 + case None => + } } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - if (activeExecutorIds.contains(executorId)) { + if (activeExecutorIdsWithLoads.contains(executorId)) { val hostPort = executorIdToHost(executorId) logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason)) removeExecutor(executorId, reason) @@ -484,7 +490,8 @@ private[spark] class TaskSchedulerImpl( /** Remove an executor from all our data structures and mark it as lost */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { - activeExecutorIds -= executorId + activeExecutorIdsWithLoads -= executorId + val host = executorIdToHost(executorId) val execs = executorsByHost.getOrElse(host, new HashSet) execs -= executorId @@ -518,7 +525,11 @@ private[spark] class TaskSchedulerImpl( } def isExecutorAlive(execId: String): Boolean = synchronized { - activeExecutorIds.contains(execId) + activeExecutorIdsWithLoads.contains(execId) + } + + def isExecutorBusy(execId: String): Boolean = synchronized { + activeExecutorIdsWithLoads.getOrElse(execId, -1) > 0 } // By default, rack is unknown diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 55a564b5c8eac..5c423e71554c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -410,8 +410,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager kill the specified executors. * @return whether the kill request is acknowledged. */ - final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized { - killExecutors(executorIds, replace = false) + final override def killExecutors( + executorIds: Seq[String], + force: Boolean): Boolean = synchronized { + killExecutors(executorIds, replace = false, force) } /** @@ -421,15 +423,29 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @param replace whether to replace the killed executors with new ones * @return whether the kill request is acknowledged. */ - final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized { + final def killExecutors( + executorIds: Seq[String], + replace: Boolean, + force: Boolean): Boolean = synchronized { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) unknownExecutors.foreach { id => logWarning(s"Executor to kill $id does not exist!") } + // force killing all busy and idle executors + // otherwise, only idle executors are valid to be killed + val idleExecutors = + if (force) { + knownExecutors + } else { + knownExecutors.filter { id => + logWarning(s"Busy executor $id is not valid to be killed!") + !scheduler.isExecutorBusy(id)} + } + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) - val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } + val executorsToKill = idleExecutors.filter { id => !executorsPendingToRemove.contains(id) } executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors @@ -442,6 +458,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors += knownExecutors.size } + // executorsToKill may be empty doKillExecutors(executorsToKill) } From dc660f63c416c300bd3da48c6a0b9442633313fc Mon Sep 17 00:00:00 2001 From: Grace Date: Tue, 10 Nov 2015 15:10:09 +0800 Subject: [PATCH 02/21] change the task number to count; change the WithLoads to WithTaskCount --- .../spark/scheduler/TaskSchedulerImpl.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 421ce0b43b7fb..515b6482f80ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -88,8 +88,8 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on - // each executor will record running or launched task number - val activeExecutorIdsWithLoads = new HashMap[String, Int] + // each executor will record running or launched task count + val activeExecutorIdsWithTaskCount = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host @@ -255,7 +255,7 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId - activeExecutorIdsWithLoads(execId) += 1 + activeExecutorIdsWithTaskCount(execId) += 1 executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) @@ -284,7 +284,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host - activeExecutorIdsWithLoads.getOrElseUpdate(o.executorId, 0) + activeExecutorIdsWithTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -334,7 +334,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (activeExecutorIdsWithLoads.contains(execId)) { + if (activeExecutorIdsWithTaskCount.contains(execId)) { removeExecutor(execId, SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) failedExecutor = Some(execId) @@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl( if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid) match { - case Some(execId) => activeExecutorIdsWithLoads(execId) -= 1 + case Some(execId) => activeExecutorIdsWithTaskCount(execId) -= 1 case None => } } @@ -468,7 +468,7 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - if (activeExecutorIdsWithLoads.contains(executorId)) { + if (activeExecutorIdsWithTaskCount.contains(executorId)) { val hostPort = executorIdToHost(executorId) logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason)) removeExecutor(executorId, reason) @@ -490,7 +490,7 @@ private[spark] class TaskSchedulerImpl( /** Remove an executor from all our data structures and mark it as lost */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { - activeExecutorIdsWithLoads -= executorId + activeExecutorIdsWithTaskCount -= executorId val host = executorIdToHost(executorId) val execs = executorsByHost.getOrElse(host, new HashSet) @@ -525,11 +525,11 @@ private[spark] class TaskSchedulerImpl( } def isExecutorAlive(execId: String): Boolean = synchronized { - activeExecutorIdsWithLoads.contains(execId) + activeExecutorIdsWithTaskCount.contains(execId) } def isExecutorBusy(execId: String): Boolean = synchronized { - activeExecutorIdsWithLoads.getOrElse(execId, -1) > 0 + activeExecutorIdsWithTaskCount.getOrElse(execId, -1) > 0 } // By default, rack is unknown From 27faa6b70e8332f6f70359739cb417be5be7e31e Mon Sep 17 00:00:00 2001 From: Grace Date: Tue, 10 Nov 2015 15:14:39 +0800 Subject: [PATCH 03/21] keep sparkcontext public API un-changed --- .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1ce4c92440370..603d9249bfce1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1448,10 +1448,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutors(executorIds: Seq[String], force: Boolean = true): Boolean = { + override def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds, force) + b.killExecutors(executorIds, true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false @@ -1470,8 +1470,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutor(executorId: String, force: Boolean = true): Boolean = - super.killExecutor(executorId, force) + override def killExecutor(executorId: String): Boolean = + super.killExecutor(executorId, true) /** * Request that the cluster manager kill the specified executor without adjusting the @@ -1487,10 +1487,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * @return whether the request is received. */ - private[spark] def killAndReplaceExecutor(executorId: String, force: Boolean = true): Boolean = { + private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(Seq(executorId), replace = true, force) + b.killExecutors(Seq(executorId), replace = true, true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false From 8774124e07666d337501d0e79bc27bbb35d78b74 Mon Sep 17 00:00:00 2001 From: Grace Date: Tue, 10 Nov 2015 20:50:46 +0800 Subject: [PATCH 04/21] fix compile issue --- core/src/main/scala/org/apache/spark/SparkContext.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 603d9249bfce1..0dfc9636c1c70 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1448,16 +1448,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutors(executorIds: Seq[String]): Boolean = { + override def killExecutors(executorIds: Seq[String], force: Boolean = true): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds, true) + b.killExecutors(executorIds, force) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false } } + def killExecutors(executorIds: Seq[String]) = + killExecutors(executorIds, force = true) + /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executor. From 946ed7e0966b10a706372200551345304d2c8089 Mon Sep 17 00:00:00 2001 From: Grace Date: Tue, 10 Nov 2015 21:04:08 +0800 Subject: [PATCH 05/21] fix compile issue --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0dfc9636c1c70..6d65cc43ec547 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1458,7 +1458,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - def killExecutors(executorIds: Seq[String]) = + def killExecutors(executorIds: Seq[String]): Boolean = killExecutors(executorIds, force = true) /** From feefbfef79e396df0187ae947e1b39c80e810082 Mon Sep 17 00:00:00 2001 From: Grace Date: Wed, 11 Nov 2015 15:10:41 +0800 Subject: [PATCH 06/21] keep public API --- .../org/apache/spark/ExecutorAllocationClient.scala | 13 +++---------- .../apache/spark/ExecutorAllocationManager.scala | 7 ------- .../main/scala/org/apache/spark/SparkContext.scala | 12 +++++------- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 ++++----- 4 files changed, 12 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index c0544f95e4a3f..ccfeb0fca489e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -52,19 +52,12 @@ private[spark] trait ExecutorAllocationClient { * Request that the cluster manager kill the specified executors. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutors(executorIds: Seq[String], force: Boolean): Boolean + def killExecutors(executorIds: Seq[String]): Boolean /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ - private[spark] def killExecutor(executorId: String): Boolean = - killExecutors(Seq(executorId), force = false) - - /** - * Request that the cluster manager kill the specified executor. - * @return whether the request is acknowledged by the cluster manager. - */ - def killExecutor(executorId: String, force: Boolean): Boolean = - killExecutors(Seq(executorId), force) + def killExecutor(executorId: String): Boolean = + killExecutors(Seq(executorId)) } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 95b13560fcccc..f44634e2ff449 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -511,13 +511,6 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) - - // Executor is added to remove by misjudgment due to async listener making it as idle). - // see SPARK-9552 - if (executorsPendingToRemove.contains(executorId)) { - // Rescue the executor from pending to remove list - executorsPendingToRemove.remove(executorId) - } } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6d65cc43ec547..eb88e4ccf7d30 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1448,19 +1448,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutors(executorIds: Seq[String], force: Boolean = true): Boolean = { + def killExecutors(executorIds: Seq[String]): Boolean = + { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds, force) + b.killExecutors(executorIds) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false } } - def killExecutors(executorIds: Seq[String]): Boolean = - killExecutors(executorIds, force = true) - /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executor. @@ -1474,7 +1472,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutor(executorId: String): Boolean = - super.killExecutor(executorId, true) + super.killExecutor(executorId) /** * Request that the cluster manager kill the specified executor without adjusting the @@ -1493,7 +1491,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(Seq(executorId), replace = true, true) + b.killExecutors(Seq(executorId), replace = true, force = false) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5c423e71554c6..adb58adc89f9d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -411,9 +411,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. */ final override def killExecutors( - executorIds: Seq[String], - force: Boolean): Boolean = synchronized { - killExecutors(executorIds, replace = false, force) + executorIds: Seq[String]): Boolean = synchronized { + killExecutors(executorIds, replace = false, force = false) } /** @@ -421,6 +420,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones + * @param force whether to force kill busy executors * @return whether the kill request is acknowledged. */ final def killExecutors( @@ -458,8 +458,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors += knownExecutors.size } - // executorsToKill may be empty - doKillExecutors(executorsToKill) + !executorsToKill.isEmpty && doKillExecutors(executorsToKill) } /** From fa3c88ea1033b569b853e7a758587bf84323bd6c Mon Sep 17 00:00:00 2001 From: Grace Date: Wed, 11 Nov 2015 15:12:32 +0800 Subject: [PATCH 07/21] keep public API --- .../main/scala/org/apache/spark/ExecutorAllocationClient.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index ccfeb0fca489e..842bfdbadc948 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -58,6 +58,5 @@ private[spark] trait ExecutorAllocationClient { * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutor(executorId: String): Boolean = - killExecutors(Seq(executorId)) + def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) } From cb78e5605679bad667e5a93ed24a1d280827d121 Mon Sep 17 00:00:00 2001 From: Grace Date: Wed, 11 Nov 2015 15:14:05 +0800 Subject: [PATCH 08/21] remove unnecessary comments --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index f44634e2ff449..b93536e6536e2 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -403,8 +403,6 @@ private[spark] class ExecutorAllocationManager( // Send a request to the backend to kill this executor val removeRequestAcknowledged = testing || client.killExecutor(executorId) if (removeRequestAcknowledged) { - // even we get removeRequestAcknowledged, the executor may not be killed - // it can be rescued while onTaskStart event happens logInfo(s"Removing executor $executorId because it has been idle for " + s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})") executorsPendingToRemove.add(executorId) From 5bcfd8148af20c43a993dfaa5a90597070a7c343 Mon Sep 17 00:00:00 2001 From: Grace Date: Wed, 11 Nov 2015 15:15:35 +0800 Subject: [PATCH 09/21] clean code --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index eb88e4ccf7d30..1918797a71333 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1448,8 +1448,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - def killExecutors(executorIds: Seq[String]): Boolean = - { + def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) @@ -1471,8 +1470,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - override def killExecutor(executorId: String): Boolean = - super.killExecutor(executorId) + override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) /** * Request that the cluster manager kill the specified executor without adjusting the From 01c236ad3cb435c8b63f8be59c3f5d099b797cf3 Mon Sep 17 00:00:00 2001 From: Grace Date: Wed, 11 Nov 2015 15:16:24 +0800 Subject: [PATCH 10/21] clean code --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1918797a71333..c18ecd5e2e9dc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1448,7 +1448,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ @DeveloperApi - def killExecutors(executorIds: Seq[String]): Boolean = { + override def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) From 2108dbfdfa962b40ffabf8874a364e8bf1009f93 Mon Sep 17 00:00:00 2001 From: Grace Date: Thu, 12 Nov 2015 21:53:54 +0800 Subject: [PATCH 11/21] refine code --- .../spark/ExecutorAllocationManager.scala | 7 +++++ .../spark/scheduler/TaskSchedulerImpl.scala | 26 +++++++++---------- .../CoarseGrainedSchedulerBackend.scala | 23 ++++++---------- .../StandaloneDynamicAllocationSuite.scala | 26 +++++++++++++++++++ 4 files changed, 54 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..79aefabbabda4 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -509,6 +509,13 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) + + // Executor is added to remove by misjudgment due to async listener making it as idle). + // see SPARK-9552 + if (executorsPendingToRemove.contains(executorId)) { + // Rescue the executor from pending to remove list + executorsPendingToRemove.remove(executorId) + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 515b6482f80ae..d3ea30f3b0f50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -87,9 +87,8 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Which executor IDs we have executors on - // each executor will record running or launched task count - val activeExecutorIdsWithTaskCount = new HashMap[String, Int] + // Number of tasks runing on each executor + private val executorIdToTaskCount = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host @@ -255,7 +254,7 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId - activeExecutorIdsWithTaskCount(execId) += 1 + executorIdToTaskCount(execId) += 1 executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) @@ -284,7 +283,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host - activeExecutorIdsWithTaskCount.getOrElseUpdate(o.executorId, 0) + executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -334,7 +333,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (activeExecutorIdsWithTaskCount.contains(execId)) { + if (executorIdToTaskCount.contains(execId)) { removeExecutor(execId, SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) failedExecutor = Some(execId) @@ -344,9 +343,10 @@ private[spark] class TaskSchedulerImpl( case Some(taskSet) => if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) - taskIdToExecutorId.remove(tid) match { - case Some(execId) => activeExecutorIdsWithTaskCount(execId) -= 1 - case None => + taskIdToExecutorId.remove(tid).foreach { execId => + if (executorIdToTaskCount.contains(execId)) { + executorIdToTaskCount(execId) -= 1 + } } } if (state == TaskState.FINISHED) { @@ -468,7 +468,7 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - if (activeExecutorIdsWithTaskCount.contains(executorId)) { + if (executorIdToTaskCount.contains(executorId)) { val hostPort = executorIdToHost(executorId) logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason)) removeExecutor(executorId, reason) @@ -490,7 +490,7 @@ private[spark] class TaskSchedulerImpl( /** Remove an executor from all our data structures and mark it as lost */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { - activeExecutorIdsWithTaskCount -= executorId + executorIdToTaskCount -= executorId val host = executorIdToHost(executorId) val execs = executorsByHost.getOrElse(host, new HashSet) @@ -525,11 +525,11 @@ private[spark] class TaskSchedulerImpl( } def isExecutorAlive(execId: String): Boolean = synchronized { - activeExecutorIdsWithTaskCount.contains(execId) + executorIdToTaskCount.contains(execId) } def isExecutorBusy(execId: String): Boolean = synchronized { - activeExecutorIdsWithTaskCount.getOrElse(execId, -1) > 0 + executorIdToTaskCount.getOrElse(execId, -1) > 0 } // By default, rack is unknown diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index adb58adc89f9d..e27c13555e363 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -410,8 +410,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager kill the specified executors. * @return whether the kill request is acknowledged. */ - final override def killExecutors( - executorIds: Seq[String]): Boolean = synchronized { + final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized { killExecutors(executorIds, replace = false, force = false) } @@ -433,19 +432,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logWarning(s"Executor to kill $id does not exist!") } - // force killing all busy and idle executors - // otherwise, only idle executors are valid to be killed - val idleExecutors = - if (force) { - knownExecutors - } else { - knownExecutors.filter { id => - logWarning(s"Busy executor $id is not valid to be killed!") - !scheduler.isExecutorBusy(id)} - } - // If an executor is already pending to be removed, do not kill it again (SPARK-9795) - val executorsToKill = idleExecutors.filter { id => !executorsPendingToRemove.contains(id) } + // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) + val executorsToKill = knownExecutors + .filter { id => !executorsPendingToRemove.contains(id) } + .filter { id => force || !scheduler.isExecutorBusy(id) } + // for test only + .filter { id => force || !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors @@ -458,7 +451,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors += knownExecutors.size } - !executorsToKill.isEmpty && doKillExecutors(executorsToKill) + doKillExecutors(executorsToKill) } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index d145e78834b1b..ed27573dc82ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -31,6 +31,10 @@ import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.cluster._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor +import scala.concurrent.Await +import org.apache.spark.deploy.DeployMessages.MasterStateResponse +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor +import java.util.concurrent.TimeUnit /** * End-to-end tests for dynamic allocation in standalone mode. @@ -404,6 +408,28 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1) } + test("disable force kill for busy executors (SPARK-9552)") { + sc = new SparkContext(appConf.set("spark.dynamicAllocation.testing", "true")) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + val executors = getExecutorIds(sc) + assert(executors.size === 2) + // try to kill busy executor + assert(sc.killExecutor(executors.head)) + var apps = getApplications() + // won't kill busy executor + assert(apps.head.executors.size === 2) + } + // =============================== // | Utility methods for testing | // =============================== From c0a1d549e84b110e13e06810a9b64da1f5b3230d Mon Sep 17 00:00:00 2001 From: Grace Date: Thu, 12 Nov 2015 22:00:12 +0800 Subject: [PATCH 12/21] remove unnecessary imports --- .../apache/spark/deploy/StandaloneDynamicAllocationSuite.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index ed27573dc82ac..d94f066e39016 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -30,11 +30,8 @@ import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor -import scala.concurrent.Await import org.apache.spark.deploy.DeployMessages.MasterStateResponse import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor -import java.util.concurrent.TimeUnit /** * End-to-end tests for dynamic allocation in standalone mode. From 4b1959f6c7d393f34e145aed5eca3ae28e7b7a83 Mon Sep 17 00:00:00 2001 From: Grace Date: Thu, 12 Nov 2015 22:05:24 +0800 Subject: [PATCH 13/21] set sc.killExecutor as force = true --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c18ecd5e2e9dc..3f5c7ba06dcac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1451,7 +1451,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli override def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(executorIds) + b.killExecutors(executorIds, replace = false, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e27c13555e363..218de04377db2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -438,7 +438,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } // for test only - .filter { id => force || !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} + .filter { id => !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors From 0293d8241df79b9c60ebefb6e18c89456be30547 Mon Sep 17 00:00:00 2001 From: Grace Date: Thu, 12 Nov 2015 22:20:44 +0800 Subject: [PATCH 14/21] use force = false to do the unittest --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/deploy/StandaloneDynamicAllocationSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 218de04377db2..e27c13555e363 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -438,7 +438,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } // for test only - .filter { id => !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} + .filter { id => force || !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index d94f066e39016..a92a1ca015a49 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -421,7 +421,7 @@ class StandaloneDynamicAllocationSuite val executors = getExecutorIds(sc) assert(executors.size === 2) // try to kill busy executor - assert(sc.killExecutor(executors.head)) + assert(sc.killAndReplaceExecutor(executors.head)) var apps = getApplications() // won't kill busy executor assert(apps.head.executors.size === 2) From 342a59d34e76c55f2fc306b45d8510c74cf73af8 Mon Sep 17 00:00:00 2001 From: Grace Date: Fri, 13 Nov 2015 11:32:06 +0800 Subject: [PATCH 15/21] refine the unit test & change semantics for force == true only --- .../spark/ExecutorAllocationManager.scala | 7 ------ .../scala/org/apache/spark/SparkContext.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../StandaloneDynamicAllocationSuite.scala | 23 +++++++++++++++---- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 79aefabbabda4..b93536e6536e2 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -509,13 +509,6 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) - - // Executor is added to remove by misjudgment due to async listener making it as idle). - // see SPARK-9552 - if (executorsPendingToRemove.contains(executorId)) { - // Rescue the executor from pending to remove list - executorsPendingToRemove.remove(executorId) - } } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3f5c7ba06dcac..3178850a47456 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1489,7 +1489,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - b.killExecutors(Seq(executorId), replace = true, force = false) + b.killExecutors(Seq(executorId), replace = true, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e27c13555e363..2dd91a4fdc0cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -451,7 +451,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors += knownExecutors.size } - doKillExecutors(executorsToKill) + (force || !executorsToKill.isEmpty) && doKillExecutors(executorsToKill) } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index a92a1ca015a49..dce43b9a0e291 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -418,13 +418,18 @@ class StandaloneDynamicAllocationSuite // sync executors between the Master and the driver, needed because // the driver refuses to kill executors it does not know about syncExecutors(sc) - val executors = getExecutorIds(sc) + var executors = getExecutorIds(sc) assert(executors.size === 2) - // try to kill busy executor - assert(sc.killAndReplaceExecutor(executors.head)) + // force kill busy executor + assert(killExecutorWithForce(sc, executors.head)) var apps = getApplications() + // kill executor successfully + assert(apps.head.executors.size === 1) + // try to kill busy executor but it should be failed + assert(killExecutorWithForce(sc, executors.head, false) === false) + apps = getApplications() // won't kill busy executor - assert(apps.head.executors.size === 2) + assert(apps.head.executors.size === 1) } // =============================== @@ -478,6 +483,16 @@ class StandaloneDynamicAllocationSuite sc.killExecutors(getExecutorIds(sc).take(n)) } + private def killExecutorWithForce(sc: SparkContext, executorId: String, force: Boolean = true): Boolean = { + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutors(Seq(executorId), replace = false, force) + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false + } + } + /** * Return a list of executor IDs belonging to this application. * From 806a64d8e360e4589f4ad1569cc8c6f379e5987b Mon Sep 17 00:00:00 2001 From: Grace Date: Fri, 13 Nov 2015 11:43:49 +0800 Subject: [PATCH 16/21] refine the unit test & change semantics for force == true only --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2dd91a4fdc0cd..bd1826b47e953 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -438,7 +438,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } // for test only - .filter { id => force || !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} + .filter { id => force || + !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors From 4ce0ec06d79d2d6a5c68d0415c25c41d19f34736 Mon Sep 17 00:00:00 2001 From: Grace Date: Fri, 13 Nov 2015 11:51:48 +0800 Subject: [PATCH 17/21] remove unnecessary imports --- .../apache/spark/deploy/StandaloneDynamicAllocationSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index dce43b9a0e291..9b7d2aeb04b00 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.cluster._ -import org.apache.spark.deploy.DeployMessages.MasterStateResponse import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor /** From 0000551cd2860a85a6895b8c0a3b1815a9a7f108 Mon Sep 17 00:00:00 2001 From: Grace Date: Fri, 13 Nov 2015 12:55:42 +0800 Subject: [PATCH 18/21] fix checkstyle issue --- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 9b7d2aeb04b00..b9ccc36461165 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -482,7 +482,10 @@ class StandaloneDynamicAllocationSuite sc.killExecutors(getExecutorIds(sc).take(n)) } - private def killExecutorWithForce(sc: SparkContext, executorId: String, force: Boolean = true): Boolean = { + private def killExecutorWithForce( + sc: SparkContext, + executorId: String, + force: Boolean = true): Boolean = { sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(Seq(executorId), replace = false, force) From c44ef8714c6cde70404043e065e81330746f7881 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 13 Nov 2015 10:51:48 -0800 Subject: [PATCH 19/21] Suggestions --- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 5 +-- .../StandaloneDynamicAllocationSuite.scala | 34 +++++++++++-------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d3ea30f3b0f50..576b3b487abd5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -87,7 +87,7 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Number of tasks runing on each executor + // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index bd1826b47e953..9dc1d59f5f453 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -437,9 +437,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } - // for test only - .filter { id => force || - !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors @@ -452,7 +449,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors += knownExecutors.size } - (force || !executorsToKill.isEmpty) && doKillExecutors(executorsToKill) + doKillExecutors(executorsToKill) } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index b9ccc36461165..9eafb654a477f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy +import scala.collection.mutable import scala.concurrent.duration._ import org.mockito.Mockito.{mock, when} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ @@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor @@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE class StandaloneDynamicAllocationSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfterAll { + with BeforeAndAfterAll + with PrivateMethodTester { private val numWorkers = 2 private val conf = new SparkConf() @@ -405,7 +408,7 @@ class StandaloneDynamicAllocationSuite } test("disable force kill for busy executors (SPARK-9552)") { - sc = new SparkContext(appConf.set("spark.dynamicAllocation.testing", "true")) + sc = new SparkContext(appConf) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -417,17 +420,21 @@ class StandaloneDynamicAllocationSuite // sync executors between the Master and the driver, needed because // the driver refuses to kill executors it does not know about syncExecutors(sc) - var executors = getExecutorIds(sc) + val executors = getExecutorIds(sc) assert(executors.size === 2) // force kill busy executor - assert(killExecutorWithForce(sc, executors.head)) + assert(killExecutor(sc, executors.head, force = true)) var apps = getApplications() // kill executor successfully assert(apps.head.executors.size === 1) - // try to kill busy executor but it should be failed - assert(killExecutorWithForce(sc, executors.head, false) === false) + // simulate running a task on the executor + val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount) + val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl] + val executorIdToTaskCount = taskScheduler invokePrivate getMap() + executorIdToTaskCount(executors.head) = 1 + // kill the busy executor without force; this should fail + assert(killExecutor(sc, executors.head, force = false)) apps = getApplications() - // won't kill busy executor assert(apps.head.executors.size === 1) } @@ -482,16 +489,13 @@ class StandaloneDynamicAllocationSuite sc.killExecutors(getExecutorIds(sc).take(n)) } - private def killExecutorWithForce( - sc: SparkContext, - executorId: String, - force: Boolean = true): Boolean = { + /** Kill the given executor, specifying whether to force kill it. */ + private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = { + syncExecutors(sc) sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(Seq(executorId), replace = false, force) - case _ => - logWarning("Killing executors is only supported in coarse-grained mode") - false + case _ => fail("expected coarse grained scheduler") } } From d3f51dbfa4d0b9e416c56a69383d19da2699d478 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 13 Nov 2015 15:02:51 -0800 Subject: [PATCH 20/21] Clean up state in ExecutorAllocationManager --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..6419218f47c85 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) + executorsPendingToRemove.remove(executorId) } /** From 1938e6158838bb1f21884c3d3eba9f0b8ffabe05 Mon Sep 17 00:00:00 2001 From: Grace Date: Tue, 17 Nov 2015 16:29:05 +0800 Subject: [PATCH 21/21] fix unittest issue --- .../deploy/StandaloneDynamicAllocationSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 9eafb654a477f..2fa795f846667 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -417,16 +417,13 @@ class StandaloneDynamicAllocationSuite assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) } + var apps = getApplications() // sync executors between the Master and the driver, needed because // the driver refuses to kill executors it does not know about syncExecutors(sc) val executors = getExecutorIds(sc) assert(executors.size === 2) - // force kill busy executor - assert(killExecutor(sc, executors.head, force = true)) - var apps = getApplications() - // kill executor successfully - assert(apps.head.executors.size === 1) + // simulate running a task on the executor val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount) val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl] @@ -435,7 +432,14 @@ class StandaloneDynamicAllocationSuite // kill the busy executor without force; this should fail assert(killExecutor(sc, executors.head, force = false)) apps = getApplications() + assert(apps.head.executors.size === 2) + + // force kill busy executor + assert(killExecutor(sc, executors.head, force = true)) + apps = getApplications() + // kill executor successfully assert(apps.head.executors.size === 1) + } // ===============================