Skip to content
Permalink
Browse files

[SPARK-24415][CORE] Fixed the aggregated stage metrics by retaining s…

…tage objects in liveStages until all tasks are complete

The problem occurs because stage object is removed from liveStages in
AppStatusListener onStageCompletion. Because of this any onTaskEnd event
received after onStageCompletion event do not update stage metrics.

The fix is to retain stage objects in liveStages until all tasks are complete.

1. Fixed the reproducible example posted in the JIRA
2. Added unit test

Closes #22209 from ankuriitg/ankurgupta/SPARK-24415.

Authored-by: ankurgupta <ankur.gupta@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 39a02d8)
Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information...
ankuriitg authored and tgravescs committed Sep 5, 2018
1 parent 84922e5 commit 5b8b6b4e9e36228e993a15cab19c80e7fad43786
@@ -310,11 +310,20 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
it.remove()
update(stage, now)
if (v1.StageStatus.PENDING.equals(stage.status)) {
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
job.activeStages -= 1

pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds - stage.info.stageId
update(pool, now)
}

it.remove()
update(stage, now, last = true)
}
}
}

@@ -466,7 +475,16 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
}
maybeUpdate(stage, now)
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
val removeStage =
stage.activeTasks == 0 &&
(v1.StageStatus.COMPLETE.equals(stage.status) ||
v1.StageStatus.FAILED.equals(stage.status))
if (removeStage) {
update(stage, now, last = true)
} else {
maybeUpdate(stage, now)
}

// Store both stage ID and task index in a single long variable for tracking at job level.
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
@@ -481,7 +499,7 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
}
maybeUpdate(job, now)
conditionalLiveUpdate(job, now, removeStage)
}

val esummary = stage.executorSummary(event.taskInfo.executorId)
@@ -492,14 +510,17 @@ private[spark] class AppStatusListener(
if (metricsDelta != null) {
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
}
maybeUpdate(esummary, now)
conditionalLiveUpdate(esummary, now, removeStage)

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
if (removeStage) {
liveStages.remove((event.stageId, event.stageAttemptId))
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -524,17 +545,13 @@ private[spark] class AppStatusListener(

// Force an update on live applications when the number of active tasks reaches 0. This is
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
if (exec.activeTasks == 0) {
liveUpdate(exec, now)
} else {
maybeUpdate(exec, now)
}
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
}
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val maybeStage =
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
maybeStage.foreach { stage =>
val now = System.nanoTime()
stage.info = event.stageInfo
@@ -568,7 +585,13 @@ private[spark] class AppStatusListener(
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now, last = true)

// Remove stage only if there are no active tasks remaining
val removeStage = stage.activeTasks == 0
update(stage, now, last = removeStage)
if (removeStage) {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
}
}

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
@@ -830,6 +853,14 @@ private[spark] class AppStatusListener(
}
}

private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
if (condition) {
liveUpdate(entity, now)
} else {
maybeUpdate(entity, now)
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
@@ -1126,6 +1126,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}

test("SPARK-24415: update metrics for tasks that finish late") {
val listener = new AppStatusListener(store, conf, true)

val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")

// Start job
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))

// Start 2 stages
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))

// Start 2 Tasks
val tasks = createTasks(2, Array("1"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
}

// Task 1 Finished
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))

// Stage 1 Completed
stage1.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage1))

// Stop job 1
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))

// Task 2 Killed
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
TaskKilled(reason = "Killed"), tasks(1), null))

// Ensure killed task metrics are updated
val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
assert(failedStages.size == 1)
assert(failedStages.head.numKilledTasks == 1)
assert(failedStages.head.numCompleteTasks == 1)

val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
assert(allJobs.size == 1)
assert(allJobs.head.numKilledTasks == 1)
assert(allJobs.head.numCompletedTasks == 1)
assert(allJobs.head.numActiveStages == 1)
assert(allJobs.head.numFailedStages == 1)
}

test("driver logs") {
val listener = new AppStatusListener(store, conf, true)

@@ -77,7 +77,12 @@ class UISeleniumSuite
inputStream.foreachRDD { rdd =>
rdd.foreach(_ => {})
try {
rdd.foreach(_ => throw new RuntimeException("Oops"))
rdd.foreach { _ =>
// Failing the task with id 15 to ensure only one task fails
if (TaskContext.get.taskAttemptId() % 15 == 0) {
throw new RuntimeException("Oops")
}
}
} catch {
case e: SparkException if e.getMessage.contains("Oops") =>
}
@@ -166,7 +171,7 @@ class UISeleniumSuite

// Check job progress
findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
List("4/4", "4/4", "4/4", "3/4 (1 failed)"))

// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq

0 comments on commit 5b8b6b4

Please sign in to comment.
You can’t perform that action at this time.