From 761f1ee1f3ef98a1c9d5f3d7e5c4ecbb71755656 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Feb 2018 13:59:43 -0800 Subject: [PATCH 1/6] Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them --- .../apache/spark/status/AppStatusListener.scala | 6 +++--- .../scala/org/apache/spark/status/KVUtils.scala | 15 +++++++++------ .../sql/execution/ui/SQLAppStatusListener.scala | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 3e34bdc0c7b63..f857d580b17bd 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -878,7 +878,7 @@ private[spark] class AppStatusListener( val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), countToDelete.toInt) { j => j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN - } + } { _.info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) } toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } } @@ -891,7 +891,7 @@ private[spark] class AppStatusListener( val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), countToDelete.toInt) { s => s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING - } + } { _.info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) } stages.foreach { s => val key = Array(s.info.stageId, s.info.attemptId) @@ -951,7 +951,7 @@ private[spark] class AppStatusListener( // Try to delete finished tasks only. val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => !live || t.status != TaskState.RUNNING.toString() - } + } { t => t.launchTime + t.duration } toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) } stage.savedTasks.addAndGet(-toDelete.size) diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 99b1843d8e1c0..7f09ec4848aff 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging { db } - /** Turns a KVStoreView into a Scala sequence, applying a filter. */ - def viewToSeq[T]( - view: KVStoreView[T], - max: Int) - (filter: T => Boolean): Seq[T] = { + /** + * Turns a KVStoreView into a Scala sequence, applying a filter, sorting the sequence and + * selecting the first `max` values. + */ + def viewToSeq[T, S: Ordering]( + view: KVStoreView[T], + max: Int) + (filter: T => Boolean)(sorter: T => S): Seq[T] = { val iter = view.closeableIterator() try { - iter.asScala.filter(filter).take(max).toList + iter.asScala.filter(filter).toList.sortBy(sorter).take(max) } finally { iter.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 73a105266e1c1..5927933e590b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -333,7 +333,7 @@ class SQLAppStatusListener( } val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]), - countToDelete.toInt) { e => e.completionTime.isDefined } + countToDelete.toInt) { _.completionTime.isDefined } { _.completionTime.get.getTime } toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } } From 40024ece97a1f3d2f9b6f6c5fdba29ed02104d2d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Feb 2018 16:14:50 -0800 Subject: [PATCH 2/6] use index --- .../apache/spark/status/AppStatusListener.scala | 10 +++++----- .../scala/org/apache/spark/status/KVUtils.scala | 15 ++++++--------- .../org/apache/spark/status/storeTypes.scala | 6 ++++++ .../sql/execution/ui/SQLAppStatusListener.scala | 5 +++-- .../sql/execution/ui/SQLAppStatusStore.scala | 9 +++++++-- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f857d580b17bd..8f468c3077aff 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -875,10 +875,10 @@ private[spark] class AppStatusListener( return } - val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]).index("completionTime"), countToDelete.toInt) { j => j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN - } { _.info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) } + } toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } } @@ -888,10 +888,10 @@ private[spark] class AppStatusListener( return } - val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), + val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]).index("completionTime"), countToDelete.toInt) { s => s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING - } { _.info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) } + } stages.foreach { s => val key = Array(s.info.stageId, s.info.attemptId) @@ -951,7 +951,7 @@ private[spark] class AppStatusListener( // Try to delete finished tasks only. val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => !live || t.status != TaskState.RUNNING.toString() - } { t => t.launchTime + t.duration } + } toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) } stage.savedTasks.addAndGet(-toDelete.size) diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 7f09ec4848aff..99b1843d8e1c0 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -69,17 +69,14 @@ private[spark] object KVUtils extends Logging { db } - /** - * Turns a KVStoreView into a Scala sequence, applying a filter, sorting the sequence and - * selecting the first `max` values. - */ - def viewToSeq[T, S: Ordering]( - view: KVStoreView[T], - max: Int) - (filter: T => Boolean)(sorter: T => S): Seq[T] = { + /** Turns a KVStoreView into a Scala sequence, applying a filter. */ + def viewToSeq[T]( + view: KVStoreView[T], + max: Int) + (filter: T => Boolean): Seq[T] = { val iter = view.closeableIterator() try { - iter.asScala.filter(filter).toList.sortBy(sorter).take(max) + iter.asScala.filter(filter).take(max).toList } finally { iter.close() } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index c9cb996a55fcc..9d744d8520f86 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -73,6 +73,8 @@ private[spark] class JobDataWrapper( @JsonIgnore @KVIndex private def id: Int = info.jobId + @JsonIgnore @KVIndex("completionTime") + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) } private[spark] class StageDataWrapper( @@ -90,6 +92,8 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex("active") private def active: Boolean = info.status == StageStatus.ACTIVE + @JsonIgnore @KVIndex("completionTime") + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) } /** @@ -337,6 +341,8 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE) private def error: String = if (errorMessage.isDefined) errorMessage.get else "" + @JsonIgnore @KVIndex("completionTime") + private def completionTime: Long = launchTime + duration } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 5927933e590b8..aeeda94232be9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -332,8 +332,9 @@ class SQLAppStatusListener( return } - val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]), - countToDelete.toInt) { _.completionTime.isDefined } { _.completionTime.get.getTime } + val toDelete = KVUtils.viewToSeq( + kvstore.view(classOf[SQLExecutionUIData]).index("completionTime"), + countToDelete.toInt)(_.completionTime.isDefined) toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 910f2e52fdbb3..9bd4574d0b902 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -23,11 +23,12 @@ import java.util.Date import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus import org.apache.spark.status.KVUtils.KVIndexParam -import org.apache.spark.util.kvstore.KVStore +import org.apache.spark.util.kvstore.{KVIndex, KVStore} /** * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's @@ -90,7 +91,11 @@ class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) + val metricValues: Map[Long, String]) { + + @JsonIgnore @KVIndex("completionTime") + private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(Long.MaxValue) +} class SparkPlanGraphWrapper( @KVIndexParam val executionId: Long, From f0de4befb5c238b310f2c4aeb313c943f42a56b5 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Feb 2018 16:15:47 -0800 Subject: [PATCH 3/6] fix tasks --- core/src/main/scala/org/apache/spark/status/storeTypes.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9d744d8520f86..efc39149195fc 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -341,8 +341,6 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE) private def error: String = if (errorMessage.isDefined) errorMessage.get else "" - @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = launchTime + duration } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { From 0424c1d1f23e21f1f478544a99c7b5902a0f1235 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Feb 2018 17:09:05 -0800 Subject: [PATCH 4/6] add task parent index --- .../org/apache/spark/status/AppStatusListener.scala | 11 +++++------ .../scala/org/apache/spark/status/storeTypes.scala | 7 +++++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 8f468c3077aff..0df49b907c75b 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -875,8 +875,8 @@ private[spark] class AppStatusListener( return } - val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]).index("completionTime"), - countToDelete.toInt) { j => + val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN } toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } @@ -888,8 +888,8 @@ private[spark] class AppStatusListener( return } - val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]).index("completionTime"), - countToDelete.toInt) { s => + val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L) + val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s => s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING } @@ -945,8 +945,7 @@ private[spark] class AppStatusListener( val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt if (countToDelete > 0) { val stageKey = Array(stage.info.stageId, stage.info.attemptNumber) - val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) - .last(stageKey) + val view = kvstore.view(classOf[TaskDataWrapper]).index("completionTime").parent(stageKey) // Try to delete finished tasks only. val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index efc39149195fc..1b7782f64e81d 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -74,7 +74,7 @@ private[spark] class JobDataWrapper( private def id: Int = info.jobId @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1) } private[spark] class StageDataWrapper( @@ -93,7 +93,7 @@ private[spark] class StageDataWrapper( private def active: Boolean = info.status == StageStatus.ACTIVE @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1) } /** @@ -138,6 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" + final val COMPLETION_TIME = "completionTime" } /** @@ -341,6 +342,8 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE) private def error: String = if (errorMessage.isDefined) errorMessage.get else "" + @JsonIgnore @KVIndex(value = TaskIndexNames.COMPLETION_TIME, parent = TaskIndexNames.STAGE) + private def completionTime: Long = launchTime + duration } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { From 4c1080ab8d76f60eb76deefe001d6f84291b5ae0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Feb 2018 17:22:14 -0800 Subject: [PATCH 5/6] update sql as well --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 5 ++--- .../apache/spark/sql/execution/ui/SQLAppStatusStore.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index aeeda94232be9..53fb9a0cc21cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -332,9 +332,8 @@ class SQLAppStatusListener( return } - val toDelete = KVUtils.viewToSeq( - kvstore.view(classOf[SQLExecutionUIData]).index("completionTime"), - countToDelete.toInt)(_.completionTime.isDefined) + val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined) toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 9bd4574d0b902..9a76584717f42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -94,7 +94,7 @@ class SQLExecutionUIData( val metricValues: Map[Long, String]) { @JsonIgnore @KVIndex("completionTime") - private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(Long.MaxValue) + private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) } class SparkPlanGraphWrapper( From b83b396dcd10fabf9d28ef57d4206fba2980efa5 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 2 Feb 2018 13:22:30 -0800 Subject: [PATCH 6/6] nits; tests --- .../spark/status/AppStatusListener.scala | 4 +- .../org/apache/spark/status/storeTypes.scala | 6 +- .../spark/status/AppStatusListenerSuite.scala | 90 +++++++++++++++++++ .../ui/SQLAppStatusListenerSuite.scala | 45 ++++++++++ 4 files changed, 141 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 0df49b907c75b..ab01cddfca5b0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -945,7 +945,9 @@ private[spark] class AppStatusListener( val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt if (countToDelete > 0) { val stageKey = Array(stage.info.stageId, stage.info.attemptNumber) - val view = kvstore.view(classOf[TaskDataWrapper]).index("completionTime").parent(stageKey) + val view = kvstore.view(classOf[TaskDataWrapper]) + .index(TaskIndexNames.COMPLETION_TIME) + .parent(stageKey) // Try to delete finished tasks only. val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 1b7782f64e81d..412644d3657b5 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -74,7 +74,7 @@ private[spark] class JobDataWrapper( private def id: Int = info.jobId @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1) + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } private[spark] class StageDataWrapper( @@ -93,7 +93,7 @@ private[spark] class StageDataWrapper( private def active: Boolean = info.status == StageStatus.ACTIVE @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1) + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } /** @@ -138,7 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" - final val COMPLETION_TIME = "completionTime" + final val COMPLETION_TIME = "ct" } /** diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 042bba7f226fd..b74d6ee2ec836 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1010,6 +1010,96 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("eviction should respect job completion time") { + val testConf = conf.clone().set(MAX_RETAINED_JOBS, 2) + val listener = new AppStatusListener(store, testConf, true) + + // Start job 1 and job 2 + time += 1 + listener.onJobStart(SparkListenerJobStart(1, time, Nil, null)) + time += 1 + listener.onJobStart(SparkListenerJobStart(2, time, Nil, null)) + + // Stop job 2 before job 1 + time += 1 + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // Start job 3 and job 2 should be evicted. + time += 1 + listener.onJobStart(SparkListenerJobStart(3, time, Nil, null)) + assert(store.count(classOf[JobDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[JobDataWrapper], 2) + } + } + + test("eviction should respect stage completion time") { + val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2) + val listener = new AppStatusListener(store, testConf, true) + + val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") + val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3") + + // Start stage 1 and stage 2 + time += 1 + stage1.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) + time += 1 + stage2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties())) + + // Stop stage 2 before stage 1 + time += 1 + stage2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage2)) + time += 1 + stage1.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + + // Start stage 3 and stage 2 should be evicted. + stage3.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties())) + assert(store.count(classOf[StageDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(2, 0)) + } + } + + test("eviction should respect task completion time") { + val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2) + val listener = new AppStatusListener(store, testConf, true) + + val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + stage1.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) + + // Start task 1 and task 2 + val tasks = createTasks(3, Array("1")) + tasks.take(2).foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task)) + } + + // Stop task 2 before task 1 + time += 1 + tasks(1).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null)) + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null)) + + // Start task 3 and task 2 should be evicted. + listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2))) + assert(store.count(classOf[TaskDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[TaskDataWrapper], tasks(1).id) + } + } + test("driver logs") { val listener = new AppStatusListener(store, conf, true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 7d84f45d36bee..85face3994fd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.status.ElementTrackingStore import org.apache.spark.status.config._ @@ -510,6 +511,50 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with } } + test("eviction should respect execution completion time") { + val conf = sparkContext.conf.clone().set(UI_RETAINED_EXECUTIONS.key, "2") + val store = new ElementTrackingStore(new InMemoryStore, conf) + val listener = new SQLAppStatusListener(conf, store, live = true) + val statusStore = new SQLAppStatusStore(store, Some(listener)) + + var time = 0 + val df = createTestDataFrame + // Start execution 1 and execution 2 + time += 1 + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + time)) + time += 1 + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 2, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + time)) + + // Stop execution 2 before execution 1 + time += 1 + listener.onOtherEvent(SparkListenerSQLExecutionEnd(2, time)) + time += 1 + listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, time)) + + // Start execution 3 and execution 2 should be evicted. + time += 1 + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 3, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + time)) + assert(statusStore.executionsCount === 2) + assert(statusStore.execution(2) === None) + } }