From 76f180181261a2d7adcce27c40bfb9126c094bc5 Mon Sep 17 00:00:00 2001 From: ankurgupta Date: Thu, 23 Aug 2018 11:48:40 -0700 Subject: [PATCH 1/3] [SPARK-24415][Core] Fixed the aggregated stage metrics by retaining stage objects in liveStages until all tasks are complete The problem occurs because stage object is removed from liveStages in AppStatusListener onStageCompletion. Because of this any onTaskEnd event received after onStageCompletion event do not update stage metrics. The fix is to retain stage objects in liveStages until all tasks are complete. Testing Done: 1. Fixed the reproducible example posted in the JIRA 2. Added unit test 3. Fixed the flaky test in UISeleniumSuite --- .../spark/status/AppStatusListener.scala | 66 ++++++++++++++----- .../spark/status/AppStatusListenerSuite.scala | 55 ++++++++++++++++ .../spark/streaming/UISeleniumSuite.scala | 11 +++- 3 files changed, 115 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5ea161cd0d151..a41293ba4a8d9 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -350,11 +350,25 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + // Only update the stage if it has not finished already + if (v1.StageStatus.ACTIVE.equals(stage.status) || + v1.StageStatus.PENDING.equals(stage.status)) { + // Mark the stage as skipped if in Pending status + if (v1.StageStatus.PENDING.equals(stage.status)) { + stage.status = v1.StageStatus.SKIPPED + job.skippedStages += stage.info.stageId + job.skippedTasks += stage.info.numTasks + job.activeStages -= 1 + } + + pools.get(stage.schedulingPool).foreach { pool => + pool.stageIds = pool.stageIds - stage.info.stageId + update(pool, now) + } + + it.remove() + update(stage, now, last = true) + } } } @@ -506,7 +520,16 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } - maybeUpdate(stage, now) + // Remove stage if there are no active tasks left and stage is already finished + val removeStage = + stage.activeTasks == 0 && + (v1.StageStatus.COMPLETE.equals(stage.status) || + v1.StageStatus.FAILED.equals(stage.status)) + if (removeStage) { + update(stage, now, last = true) + } else { + maybeUpdate(stage, now) + } // Store both stage ID and task index in a single long variable for tracking at job level. val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index @@ -521,7 +544,7 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { job.killedSummary = killedTasksSummary(event.reason, job.killedSummary) } - maybeUpdate(job, now) + conditionalLiveUpdate(job, now, removeStage) } val esummary = stage.executorSummary(event.taskInfo.executorId) @@ -532,7 +555,7 @@ private[spark] class AppStatusListener( if (metricsDelta != null) { esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - maybeUpdate(esummary, now) + conditionalLiveUpdate(esummary, now, removeStage) if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { stage.cleaning = true @@ -540,6 +563,9 @@ private[spark] class AppStatusListener( cleanupTasks(stage) } } + if (removeStage) { + liveStages.remove((event.stageId, event.stageAttemptId)) + } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -564,17 +590,13 @@ private[spark] class AppStatusListener( // Force an update on live applications when the number of active tasks reaches 0. This is // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date. - if (exec.activeTasks == 0) { - liveUpdate(exec, now) - } else { - maybeUpdate(exec, now) - } + conditionalLiveUpdate(exec, now, exec.activeTasks == 0) } } override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { val maybeStage = - Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))) + Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber))) maybeStage.foreach { stage => val now = System.nanoTime() stage.info = event.stageInfo @@ -608,7 +630,6 @@ private[spark] class AppStatusListener( } stage.executorSummaries.values.foreach(update(_, now)) - update(stage, now, last = true) val executorIdsForStage = stage.blackListedExecutors executorIdsForStage.foreach { executorId => @@ -616,6 +637,13 @@ private[spark] class AppStatusListener( removeBlackListedStageFrom(exec, event.stageInfo.stageId, now) } } + + // Remove stage only if there are no active tasks remaining + val removeStage = stage.activeTasks == 0 + update(stage, now, last = removeStage) + if (removeStage) { + liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)) + } } appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) @@ -882,6 +910,14 @@ private[spark] class AppStatusListener( } } + private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = { + if (condition) { + liveUpdate(entity, now) + } else { + maybeUpdate(entity, now) + } + } + private def cleanupExecutors(count: Long): Unit = { // Because the limit is on the number of *dead* executors, we need to calculate whether // there are actually enough dead executors to be deleted. diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 1b3639ad64a73..ea80fea905340 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(appStore.asOption(appStore.lastStageAttempt(3)) === None) } + test("SPARK-24415: update metrics for tasks that finish late") { + val listener = new AppStatusListener(store, conf, true) + + val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") + + // Start job + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null)) + + // Start 2 stages + listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties())) + + // Start 2 Tasks + val tasks = createTasks(2, Array("1")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task)) + } + + // Task 1 Finished + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null)) + + // Stage 1 Completed + stage1.failureReason = Some("Failed") + listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + + // Stop job 1 + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // Task 2 Killed + time += 1 + tasks(1).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", + TaskKilled(reason = "Killed"), tasks(1), null)) + + // Ensure killed task metrics are updated + val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info) + val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED) + assert(failedStages.size == 1) + assert(failedStages.head.numKilledTasks == 1) + assert(failedStages.head.numCompleteTasks == 1) + + val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info) + assert(allJobs.size == 1) + assert(allJobs.head.numKilledTasks == 1) + assert(allJobs.head.numCompletedTasks == 1) + assert(allJobs.head.numActiveStages == 1) + assert(allJobs.head.numFailedStages == 1) + } + test("driver logs") { val listener = new AppStatusListener(store, conf, true) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index f2204a1870933..32fe0e732b9f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -77,7 +77,14 @@ class UISeleniumSuite inputStream.foreachRDD { rdd => rdd.foreach(_ => {}) try { - rdd.foreach(_ => throw new RuntimeException("Oops")) + rdd.foreach { _ => + import org.apache.spark.TaskContext + val tc: TaskContext = TaskContext.get + // Failing the task with id 15 to ensure only one task fails + if (tc.taskAttemptId() % 15 == 0) { + throw new RuntimeException("Oops") + } + } } catch { case e: SparkException if e.getMessage.contains("Oops") => } @@ -166,7 +173,7 @@ class UISeleniumSuite // Check job progress findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be ( - List("4/4", "4/4", "4/4", "0/4 (1 failed)")) + List("4/4", "4/4", "4/4", "3/4 (1 failed)")) // Check stacktrace val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq From dccbf36a2041052da7489f301abce3fda3a845ef Mon Sep 17 00:00:00 2001 From: ankurgupta Date: Tue, 28 Aug 2018 15:39:57 -0700 Subject: [PATCH 2/3] Updated onJobEnd to not update stages in Active status --- .../apache/spark/status/AppStatusListener.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index a41293ba4a8d9..32f419f2e3f4a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -350,16 +350,12 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - // Only update the stage if it has not finished already - if (v1.StageStatus.ACTIVE.equals(stage.status) || - v1.StageStatus.PENDING.equals(stage.status)) { - // Mark the stage as skipped if in Pending status - if (v1.StageStatus.PENDING.equals(stage.status)) { - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - job.activeStages -= 1 - } + // Mark the stage as skipped if in Pending status + if (v1.StageStatus.PENDING.equals(stage.status)) { + stage.status = v1.StageStatus.SKIPPED + job.skippedStages += stage.info.stageId + job.skippedTasks += stage.info.numTasks + job.activeStages -= 1 pools.get(stage.schedulingPool).foreach { pool => pool.stageIds = pool.stageIds - stage.info.stageId From b164bb1fee5435269161d7e83f0cf3901ff24402 Mon Sep 17 00:00:00 2001 From: ankurgupta Date: Wed, 29 Aug 2018 15:54:58 -0700 Subject: [PATCH 3/3] Updated code comments --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 32f419f2e3f4a..91b75e4852999 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -350,7 +350,6 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - // Mark the stage as skipped if in Pending status if (v1.StageStatus.PENDING.equals(stage.status)) { stage.status = v1.StageStatus.SKIPPED job.skippedStages += stage.info.stageId @@ -516,7 +515,7 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } - // Remove stage if there are no active tasks left and stage is already finished + // [SPARK-24415] Wait for all tasks to finish before removing stage from live list val removeStage = stage.activeTasks == 0 && (v1.StageStatus.COMPLETE.equals(stage.status) ||