diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9a0e3b5557892..337631a6f9a34 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -420,7 +420,10 @@ private[spark] class ExecutorAllocationManager( executors.foreach { executorIdToBeRemoved => if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (limit $minNumExecutors)") + s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") + } else if (newExecutorTotal - 1 < numExecutorsTarget) { + logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + + s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") } else if (canBeKilled(executorIdToBeRemoved)) { executorIdsToBeRemoved += executorIdToBeRemoved newExecutorTotal -= 1 diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 4ea42fc7d5c22..b9ce71a0c5254 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite assert(executorsPendingToRemove(manager).isEmpty) } + test ("Removing with various numExecutorsTarget condition") { + sc = createSparkContext(5, 12, 5) + val manager = sc.executorAllocationManager.get + + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8))) + + // Remove when numExecutorsTarget is the same as the current number of executors + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 2) + (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach { + info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + assert(executorIds(manager).size === 8) + assert(numExecutorsTarget(manager) === 8) + assert(maxNumExecutorsNeeded(manager) == 8) + assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors + + // Remove executors when numExecutorsTarget is lower than current number of executors + (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { + info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) } + adjustRequestedExecutors(manager) + assert(executorIds(manager).size === 8) + assert(numExecutorsTarget(manager) === 5) + assert(maxNumExecutorsNeeded(manager) == 5) + assert(removeExecutor(manager, "1")) + assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3")) + onExecutorRemoved(manager, "1") + onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") + + // numExecutorsTarget is lower than minNumExecutors + sc.listenerBus.postToAll( + SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null)) + assert(executorIds(manager).size === 5) + assert(numExecutorsTarget(manager) === 5) + assert(maxNumExecutorsNeeded(manager) == 4) + assert(!removeExecutor(manager, "4")) // lower limit + assert(addExecutors(manager) === 0) // upper limit + } + test ("interleaving add and remove") { - sc = createSparkContext(5, 10, 5) + sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) @@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, "7") onExecutorAdded(manager, "8") assert(executorIds(manager).size === 8) + assert(numExecutorsTarget(manager) === 8) - // Remove until limit - assert(removeExecutor(manager, "1")) - assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3")) - assert(!removeExecutor(manager, "4")) // lower limit reached - assert(!removeExecutor(manager, "5")) - onExecutorRemoved(manager, "1") - onExecutorRemoved(manager, "2") - onExecutorRemoved(manager, "3") - assert(executorIds(manager).size === 5) - // Add until limit - assert(addExecutors(manager) === 2) // upper limit reached - assert(addExecutors(manager) === 0) - assert(!removeExecutor(manager, "4")) // still at lower limit - assert((manager, Seq("5")) !== Seq("5")) + // Remove when numTargetExecutors is equal to the current number of executors + assert(!removeExecutor(manager, "1")) + assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3")) + + // Remove until limit onExecutorAdded(manager, "9") onExecutorAdded(manager, "10") onExecutorAdded(manager, "11") onExecutorAdded(manager, "12") - onExecutorAdded(manager, "13") - assert(executorIds(manager).size === 10) + assert(executorIds(manager).size === 12) + assert(numExecutorsTarget(manager) === 8) - // Remove succeeds again, now that we are no longer at the lower limit - assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6")) - assert(removeExecutor(manager, "7")) - assert(executorIds(manager).size === 10) - assert(addExecutors(manager) === 0) + assert(removeExecutor(manager, "1")) + assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4")) + assert(!removeExecutor(manager, "5")) // lower limit reached + assert(!removeExecutor(manager, "6")) + onExecutorRemoved(manager, "1") + onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") onExecutorRemoved(manager, "4") - onExecutorRemoved(manager, "5") assert(executorIds(manager).size === 8) - // Number of executors pending restarts at 1 - assert(numExecutorsToAdd(manager) === 1) - assert(addExecutors(manager) === 0) - assert(executorIds(manager).size === 8) - onExecutorRemoved(manager, "6") - onExecutorRemoved(manager, "7") + // Add until limit + assert(!removeExecutor(manager, "7")) // still at lower limit + assert((manager, Seq("8")) !== Seq("8")) + onExecutorAdded(manager, "13") onExecutorAdded(manager, "14") onExecutorAdded(manager, "15") - assert(executorIds(manager).size === 8) - assert(addExecutors(manager) === 0) // still at upper limit onExecutorAdded(manager, "16") + assert(executorIds(manager).size === 12) + + // Remove succeeds again, now that we are no longer at the lower limit + assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7")) + assert(removeExecutor(manager, "8")) + assert(executorIds(manager).size === 12) + onExecutorRemoved(manager, "5") + onExecutorRemoved(manager, "6") + assert(executorIds(manager).size === 10) + assert(numExecutorsToAdd(manager) === 4) + onExecutorRemoved(manager, "9") + onExecutorRemoved(manager, "10") + assert(addExecutors(manager) === 4) // at upper limit onExecutorAdded(manager, "17") + onExecutorAdded(manager, "18") assert(executorIds(manager).size === 10) - assert(numExecutorsTarget(manager) === 10) + assert(addExecutors(manager) === 0) // still at upper limit + onExecutorAdded(manager, "19") + onExecutorAdded(manager, "20") + assert(executorIds(manager).size === 12) + assert(numExecutorsTarget(manager) === 12) } test("starting/canceling add timer") { @@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, "third") onExecutorAdded(manager, "fourth") onExecutorAdded(manager, "fifth") - assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + onExecutorAdded(manager, "sixth") + onExecutorAdded(manager, "seventh") + onExecutorAdded(manager, "eighth") + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth", + "sixth", "seventh", "eighth")) removeExecutor(manager, "first") removeExecutors(manager, Seq("second", "third")) assert(executorsPendingToRemove(manager) === Set("first", "second", "third")) - assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth")) + assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth", + "sixth", "seventh", "eighth")) // Cluster manager lost will make all the live executors lost, so here simulate this behavior