Skip to content
Browse files

[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when …

…cleaning up stages

## What changes were proposed in this pull request?

* Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage.
* This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly.

Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data.

## How was this patch tested?

Using existing tests in AppStatusListenerSuite

This is my original work and I license the work to the project under the project’s open source license.

Closes #22883 from patrickbrownsync/cleanup-stages-fix.

Authored-by: Patrick Brown <>
Signed-off-by: Marcelo Vanzin <>
(cherry picked from commit e9d3ca0)
Signed-off-by: Marcelo Vanzin <>
  • Loading branch information...
patrickbrownsync authored and vanzin committed Nov 1, 2018
1 parent 632c0d9 commit 49e1eb8bdeff2ea13e235ed3a82173887c48643e
Showing with 9 additions and 10 deletions.
  1. +9 −10 core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -950,16 +950,6 @@ private[spark] class AppStatusListener(

val tasks = kvstore.view(classOf[TaskDataWrapper])

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.taskId)

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
@@ -982,6 +972,15 @@ private[spark] class AppStatusListener(


// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala
val keys = { s => (, }.toSet
tasks.foreach { t =>
if (keys.contains((t.stageId, t.stageAttemptId))) {
kvstore.delete(t.getClass(), t.taskId)

private def cleanupTasks(stage: LiveStage): Unit = {

0 comments on commit 49e1eb8

Please sign in to comment.
You can’t perform that action at this time.