[SPARK-23121][CORE] Fix for ui becoming unaccessible for long running…

… streaming apps

## What changes were proposed in this pull request?

The allJobs and the job pages attempt to use stage attempt and DAG visualization from the store, but for long running jobs they are not guaranteed to be retained, leading to exceptions when these pages are rendered.

To fix it `store.lastStageAttempt(stageId)` and `store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default values are used if the info is missing.

## How was this patch tested?

Manual testing of the UI, also using the test command reported in SPARK-23121:

./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount ./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark

Closes #20287

Author: Sandor Murakozi <>

Closes #20330 from smurakozi/SPARK-23121.

(cherry picked from commit 446948a)
Signed-off-by: Marcelo Vanzin <>
smurakozi authored and vanzin committed Jan 22, 2018
1 parent d963ba0 commit 4e75b0cb4b575d4799c02455eed286fe971c6c50
@@ -36,6 +36,9 @@ import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") {

import ApiHelper._

private val JOBS_LEGEND =
<div class="legend-area"><svg width="150px" height="85px">
<rect class="succeeded-job-legend"
@@ -65,10 +68,9 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
}.map { job =>
val jobId = job.jobId
val status = job.status
val jobDescription = store.lastStageAttempt(job.stageIds.max).description
val displayJobDescription = jobDescription
.map(UIUtils.makeDescription(_, "", plainText = true).text)
val (_, lastStageDescription) = lastStageNameAndDescription(store, job)
val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text

val submissionTime = job.submissionTime.get.getTime()
val completionTime =
val classNameByStatus = status match {
@@ -80,7 +82,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We

// The timeline library treats contents as HTML, so we have to escape them. We need to add
// extra layers of escaping in order to embed this in a Javascript string literal.
val escapedDesc = Utility.escape(displayJobDescription)
val escapedDesc = Utility.escape(jobDescription)
val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
val jobEventJsonAsStr =
@@ -403,6 +405,8 @@ private[ui] class JobDataSource(
sortColumn: String,
desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {

import ApiHelper._

// Convert JobUIData to JobTableRowData which contains the final contents to show in the table
// so that we can avoid creating duplicate contents during sorting the data
private val data =, desc))
@@ -427,23 +431,21 @@ private[ui] class JobDataSource(
val formattedDuration = => UIUtils.formatDuration(d)).getOrElse("Unknown")
val submissionTime = jobData.submissionTime
val formattedSubmissionTime ="Unknown")
val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
val lastStageDescription = lastStageAttempt.description.getOrElse("")
val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData)

val formattedJobDescription =
UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)

val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)

new JobTableRowData(
@@ -336,8 +336,14 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
store.executorList(false), appStartTime)

content ++= UIUtils.showDagVizForJob(
jobId, store.operationGraphForJob(jobId))
val operationGraphContent = store.asOption(store.operationGraphForJob(jobId)) match {
case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, operationGraph)
case None =>
<div id="no-info">
<p>No DAG visualization information to display for job {jobId}</p>
content ++= operationGraphContent

if (shouldShowActiveStages) {
content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
@@ -23,12 +23,10 @@ import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.{HashMap, HashSet}
import scala.xml.{Elem, Node, Unparsed}
import scala.xml.{Node, Unparsed}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.TaskLocality
import org.apache.spark.status._
import org.apache.spark.status.api.v1._
@@ -1012,4 +1010,9 @@ private object ApiHelper {

def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = {
val stage = store.asOption(store.lastStageAttempt(job.stageIds.max))
(""), stage.flatMap(_.description).getOrElse(


