From a006b74829a857086d705a25c4bda866b47a1a2a Mon Sep 17 00:00:00 2001 From: Shahid Date: Wed, 26 Sep 2018 00:54:59 +0530 Subject: [PATCH] [SPARK-25533]AppSummary should hold the information about completed Jobs and Succeeded Task only --- .../apache/spark/status/AppStatusListener.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5ea161cd0d151..3c426d94c00e2 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -365,10 +365,11 @@ private[spark] class AppStatusListener( job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None update(job, now, last = true) + if(job.status.equals(JobExecutionStatus.SUCCEEDED)) { + appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) + kvstore.write(appSummary) + } } - - appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) - kvstore.write(appSummary) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -616,10 +617,11 @@ private[spark] class AppStatusListener( removeBlackListedStageFrom(exec, event.stageInfo.stageId, now) } } + if(stage.status.equals(v1.StageStatus.COMPLETE)) { + appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) + kvstore.write(appSummary) + } } - - appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) - kvstore.write(appSummary) } private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = {