Skip to content

Commit

Permalink
[SPARK-21656][CORE] spark dynamic allocation should not idle timeout …
Browse files Browse the repository at this point in the history
…executors when tasks still to run

## What changes were proposed in this pull request?

Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer than the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run.
We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run.

## How was this patch tested?

Tested by manually adding executors to `executorsIdsToBeRemoved` list and seeing if those executors were removed when there are a lot of tasks and a high `numExecutorsTarget` value.

Code used

In  `ExecutorAllocationManager.start()`

```
    start_time = clock.getTimeMillis()
```

In `ExecutorAllocationManager.schedule()`
```
    val executorIdsToBeRemoved = ArrayBuffer[String]()
    if ( now > start_time + 1000 * 60 * 2) {
      logInfo("--- REMOVING 1/2 of the EXECUTORS ---")
      start_time +=  1000 * 60 * 100
      var counter = 0
      for (x <- executorIds) {
        counter += 1
        if (counter == 2) {
          counter = 0
          executorIdsToBeRemoved += x
        }
      }
    }

Author: John Lee <jlee2@yahoo-inc.com>

Closes apache#18874 from yoonlee95/SPARK-21656.

(cherry picked from commit adf005d)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
  • Loading branch information
John Lee authored and MatthewRBruce committed Jul 31, 2018
1 parent be00ef0 commit fd2f153
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 35 deletions.
Expand Up @@ -410,7 +410,10 @@ private[spark] class ExecutorAllocationManager(
executors.foreach { executorIdToBeRemoved =>
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
} else if (newExecutorTotal - 1 < numExecutorsTarget) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
} else if (canBeKilled(executorIdToBeRemoved)) {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
Expand Down
Expand Up @@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite
assert(executorsPendingToRemove(manager).isEmpty)
}

test ("Removing with various numExecutorsTarget condition") {
sc = createSparkContext(5, 12, 5)
val manager = sc.executorAllocationManager.get

sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8)))

// Remove when numExecutorsTarget is the same as the current number of executors
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 8)
assert(maxNumExecutorsNeeded(manager) == 8)
assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors

// Remove executors when numExecutorsTarget is lower than current number of executors
(1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) }
adjustRequestedExecutors(manager)
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 5)
assert(maxNumExecutorsNeeded(manager) == 5)
assert(removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
onExecutorRemoved(manager, "3")

// numExecutorsTarget is lower than minNumExecutors
sc.listenerBus.postToAll(
SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null))
assert(executorIds(manager).size === 5)
assert(numExecutorsTarget(manager) === 5)
assert(maxNumExecutorsNeeded(manager) == 4)
assert(!removeExecutor(manager, "4")) // lower limit
assert(addExecutors(manager) === 0) // upper limit
}

test ("interleaving add and remove") {
sc = createSparkContext(5, 10, 5)
sc = createSparkContext(5, 12, 5)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

Expand All @@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite
onExecutorAdded(manager, "7")
onExecutorAdded(manager, "8")
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 8)

// Remove until limit
assert(removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
assert(!removeExecutor(manager, "4")) // lower limit reached
assert(!removeExecutor(manager, "5"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
onExecutorRemoved(manager, "3")
assert(executorIds(manager).size === 5)

// Add until limit
assert(addExecutors(manager) === 2) // upper limit reached
assert(addExecutors(manager) === 0)
assert(!removeExecutor(manager, "4")) // still at lower limit
assert((manager, Seq("5")) !== Seq("5"))
// Remove when numTargetExecutors is equal to the current number of executors
assert(!removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3"))

// Remove until limit
onExecutorAdded(manager, "9")
onExecutorAdded(manager, "10")
onExecutorAdded(manager, "11")
onExecutorAdded(manager, "12")
onExecutorAdded(manager, "13")
assert(executorIds(manager).size === 10)
assert(executorIds(manager).size === 12)
assert(numExecutorsTarget(manager) === 8)

// Remove succeeds again, now that we are no longer at the lower limit
assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
assert(removeExecutor(manager, "7"))
assert(executorIds(manager).size === 10)
assert(addExecutors(manager) === 0)
assert(removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
assert(!removeExecutor(manager, "5")) // lower limit reached
assert(!removeExecutor(manager, "6"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
onExecutorRemoved(manager, "3")
onExecutorRemoved(manager, "4")
onExecutorRemoved(manager, "5")
assert(executorIds(manager).size === 8)

// Number of executors pending restarts at 1
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
assert(executorIds(manager).size === 8)
onExecutorRemoved(manager, "6")
onExecutorRemoved(manager, "7")
// Add until limit
assert(!removeExecutor(manager, "7")) // still at lower limit
assert((manager, Seq("8")) !== Seq("8"))
onExecutorAdded(manager, "13")
onExecutorAdded(manager, "14")
onExecutorAdded(manager, "15")
assert(executorIds(manager).size === 8)
assert(addExecutors(manager) === 0) // still at upper limit
onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 12)

// Remove succeeds again, now that we are no longer at the lower limit
assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
assert(removeExecutor(manager, "8"))
assert(executorIds(manager).size === 12)
onExecutorRemoved(manager, "5")
onExecutorRemoved(manager, "6")
assert(executorIds(manager).size === 10)
assert(numExecutorsToAdd(manager) === 4)
onExecutorRemoved(manager, "9")
onExecutorRemoved(manager, "10")
assert(addExecutors(manager) === 4) // at upper limit
onExecutorAdded(manager, "17")
onExecutorAdded(manager, "18")
assert(executorIds(manager).size === 10)
assert(numExecutorsTarget(manager) === 10)
assert(addExecutors(manager) === 0) // still at upper limit
onExecutorAdded(manager, "19")
onExecutorAdded(manager, "20")
assert(executorIds(manager).size === 12)
assert(numExecutorsTarget(manager) === 12)
}

test("starting/canceling add timer") {
Expand Down Expand Up @@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite
onExecutorAdded(manager, "third")
onExecutorAdded(manager, "fourth")
onExecutorAdded(manager, "fifth")
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
onExecutorAdded(manager, "sixth")
onExecutorAdded(manager, "seventh")
onExecutorAdded(manager, "eighth")
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
"sixth", "seventh", "eighth"))

removeExecutor(manager, "first")
removeExecutors(manager, Seq("second", "third"))
assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
"sixth", "seventh", "eighth"))


// Cluster manager lost will make all the live executors lost, so here simulate this behavior
Expand Down

0 comments on commit fd2f153

Please sign in to comment.