Skip to content

Commit

Permalink
[SPARK-26758][CORE] Idle Executors are not getting killed after spark…
Browse files Browse the repository at this point in the history
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes #23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <sandeep.katta2007@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
sandeep-katta authored and srowen committed Feb 5, 2019
1 parent 755f9c2 commit 1dd7419
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

updateAndSyncNumExecutorsTarget(now)

val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -323,6 +321,8 @@ private[spark] class ExecutorAllocationManager(
}
!expired
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(now)
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,12 +936,7 @@ class ExecutorAllocationManagerSuite

assert(maxNumExecutorsNeeded(manager) === 0)
schedule(manager)
// Verify executor is timeout but numExecutorsTarget is not recalculated
assert(numExecutorsTarget(manager) === 3)

// Schedule again to recalculate the numExecutorsTarget after executor is timeout
schedule(manager)
// Verify that current number of executors should be ramp down when executor is timeout
// Verify executor is timeout,numExecutorsTarget is recalculated
assert(numExecutorsTarget(manager) === 2)
}

Expand Down Expand Up @@ -1148,6 +1143,25 @@ class ExecutorAllocationManagerSuite
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
}

test("SPARK-26758 check executor target number after idle time out ") {
sc = createSparkContext(1, 5, 3)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(10000L)
manager.setClock(clock)
assert(numExecutorsTarget(manager) === 3)
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty)))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty)))
// make all the executors as idle, so that it will be killed
clock.advance(executorIdleTimeout * 1000)
schedule(manager)
// once the schedule is run target executor number should be 1
assert(numExecutorsTarget(manager) === 1)
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down

0 comments on commit 1dd7419

Please sign in to comment.