From e8e7894ab6e7fa9e1ca4ac8b4d692d966cbef361 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 17:15:03 -0700 Subject: [PATCH 01/13] Set job descriptions for all streaming jobs --- .../spark/streaming/StreamingContext.scala | 4 +-- .../streaming/scheduler/JobScheduler.scala | 5 +++ .../streaming/scheduler/ReceiverTracker.scala | 3 ++ .../apache/spark/streaming/ui/BatchPage.scala | 31 ++++++++++++------- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b496d1f341a0b..ec0b0720ebf64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} -import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils} +import org.apache.spark.util.{CallSite, ShutdownHookManager} /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -198,7 +198,7 @@ class StreamingContext private[streaming] ( private var state: StreamingContextState = INITIALIZED - private val startSite = new AtomicReference[CallSite](null) + private[streaming] val startSite = new AtomicReference[CallSite](null) private var shutdownHookRef: AnyRef = _ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 0cd39594ee923..3619461622a3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -25,6 +25,7 @@ import scala.util.{Failure, Success} import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ +import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{EventLoop, ThreadUtils} @@ -193,6 +194,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { def run() { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) + val formattedTime = UIUtils.formatBatchTime( + job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + ssc.sc.setJobDescription( + s"Streaming job from [output operation ${job.outputOpId}, batch time ${formattedTime}]") try { // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index f86fd44b48719..3941e42e0cfee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -554,6 +554,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors)) } receiverRDD.setName(s"Receiver $receiverId") + ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") + ssc.sparkContext.setCallSite(ssc.startSite.get) + val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 90d1b0fadecfc..92cac93d36122 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -19,14 +19,14 @@ package org.apache.spark.streaming.ui import javax.servlet.http.HttpServletRequest -import scala.xml.{NodeSeq, Node, Text, Unparsed} +import scala.xml.{Node, NodeSeq, Text, Unparsed} import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.streaming.Time -import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} -import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} +import org.apache.spark.streaming.ui.StreamingJobProgressListener.{OutputOpId, SparkJobId} import org.apache.spark.ui.jobs.UIData.JobUIData +import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) @@ -207,16 +207,23 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { sparkListener.stageIdToInfo.get(sparkJob.stageIds.max) } } - val lastStageData = lastStageInfo.flatMap { s => - sparkListener.stageIdToData.get((s.stageId, s.attemptId)) - } - - val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") + Text(lastStageInfo.map { _.name }.getOrElse("(Unknown Stage)")) + lastStageInfo match { + case Some(stageInfo) => + val details = if (stageInfo.details.nonEmpty) { + + +details + ++ + + } - - {lastStageDescription} - ++ Text(lastStageName) +
{stageInfo.name} {details}
+ case None => + Text("(Unknown)") + } } private def failureReasonCell(failureReason: String): Seq[Node] = { From 130e82e6461dbd5b88e454af6df6a7df84c14cc4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 19:58:02 -0700 Subject: [PATCH 02/13] Added link to streaming UI in desc of streaming jobs --- .../org/apache/spark/ui/jobs/StageTable.scala | 24 +++++++++++++++---- .../streaming/scheduler/JobScheduler.scala | 3 ++- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 99812db4912a3..5d93ff289c100 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,11 +17,11 @@ package org.apache.spark.ui.jobs -import scala.xml.Node -import scala.xml.Text - import java.util.Date +import scala.util.control.NonFatal +import scala.xml.{Node, Text} + import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.scheduler.StageInfo @@ -116,7 +116,23 @@ private[ui] class StageTableBase( stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) desc <- stageData.description } yield { - {desc} + // If the description can be parsed as HTML and has only relative links, then render + // as HTML, otherwise render as escaped string + try { + val xml = scala.xml.XML.loadString(s"""$desc""") + val allLinks = xml \\ "_" flatMap { _.attributes } filter { _.key == "href" } + val areAllLinksRelative = allLinks.forall { _.value.toString.startsWith ("/") } + if (areAllLinksRelative) { + xml + } else { + println("some not relative; all links: " + allLinks.mkString(";")) + {desc} + } + } catch { + case NonFatal(e) => + println(e) + {desc} + } }
{stageDesc.getOrElse("")} {killLink} {nameLink} {details}
} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 3619461622a3d..d0512d559ef84 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -196,8 +196,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + val batchLink = s"/streaming/batch/?id=${job.time.milliseconds}" ssc.sc.setJobDescription( - s"Streaming job from [output operation ${job.outputOpId}, batch time ${formattedTime}]") + s"""Streaming job from [output operation ${job.outputOpId}, batch time ${formattedTime}]""") try { // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then From 7206f2e9d4c42edf5a6a231b5cbde7309a62f041 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 20:04:00 -0700 Subject: [PATCH 03/13] Addressed comments --- .../main/scala/org/apache/spark/streaming/ui/BatchPage.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 92cac93d36122..5d8cd57053250 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, NodeSeq, Text, Unparsed} +import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils @@ -207,7 +207,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { sparkListener.stageIdToInfo.get(sparkJob.stageIds.max) } } - Text(lastStageInfo.map { _.name }.getOrElse("(Unknown Stage)")) lastStageInfo match { case Some(stageInfo) => val details = if (stageInfo.details.nonEmpty) { @@ -218,6 +217,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { + } else { + NodeSeq.Empty }
{stageInfo.name} {details}
From b15012f3a2b4f4205723d6ea37bf22bfe7c300f7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 20:30:05 -0700 Subject: [PATCH 04/13] More fixes --- .../apache/spark/ui/jobs/AllJobsPage.scala | 29 +++++++++++++++++-- .../org/apache/spark/ui/jobs/StageTable.scala | 12 ++++---- .../streaming/scheduler/JobScheduler.scala | 19 +++++++----- .../apache/spark/streaming/ui/BatchPage.scala | 7 +++-- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index e72547df7254b..dfba40a37303d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -18,11 +18,14 @@ package org.apache.spark.ui.jobs import scala.collection.mutable.{HashMap, ListBuffer} -import scala.xml.{Node, NodeSeq, Unparsed, Utility} +import scala.util.control.NonFatal +import scala.xml._ import java.util.Date import javax.servlet.http.HttpServletRequest +import org.apache.commons.lang3.StringEscapeUtils + import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData} import org.apache.spark.JobExecutionStatus @@ -224,6 +227,28 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") + val jobDescription = { + val d = lastStageDescription + // If the description can be parsed as HTML and has only relative links, then render + // as HTML, otherwise render as escaped string + try { + // Try to load the description as unescaped HTML + val xml = XML.loadString("$d") + val allLinks = xml \\ "_" flatMap { _.attributes } filter { _.key == "href" } + val areAllLinksRelative = allLinks.forall { _.value.toString.startsWith ("/") } + if (areAllLinksRelative) { + xml + } else { + {d} + } + } catch { + case NonFatal(e) => + {d} + } + } + + val detailUrl = "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId) @@ -231,7 +256,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} - {lastStageDescription} + {jobDescription} {lastStageName} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 5d93ff289c100..5231b8b552797 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import java.util.Date import scala.util.control.NonFatal -import scala.xml.{Node, Text} +import scala.xml.{Node, Text, XML} import org.apache.commons.lang3.StringEscapeUtils @@ -119,19 +119,21 @@ private[ui] class StageTableBase( // If the description can be parsed as HTML and has only relative links, then render // as HTML, otherwise render as escaped string try { - val xml = scala.xml.XML.loadString(s"""$desc""") + // Try to load the description as unescaped HTML + val xml = XML.loadString("$desc") val allLinks = xml \\ "_" flatMap { _.attributes } filter { _.key == "href" } val areAllLinksRelative = allLinks.forall { _.value.toString.startsWith ("/") } if (areAllLinksRelative) { xml } else { - println("some not relative; all links: " + allLinks.mkString(";")) - {desc} + println("links not relative") + {desc} } } catch { case NonFatal(e) => println(e) - {desc} + {desc} } }
{stageDesc.getOrElse("")} {killLink} {nameLink} {details}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d0512d559ef84..32d995dc42f27 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -191,15 +191,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } private class JobHandler(job: Job) extends Runnable with Logging { + import JobScheduler._ + def run() { - ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) - ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) - val formattedTime = UIUtils.formatBatchTime( - job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) - val batchLink = s"/streaming/batch/?id=${job.time.milliseconds}" - ssc.sc.setJobDescription( - s"""Streaming job from [output operation ${job.outputOpId}, batch time ${formattedTime}]""") try { + val formattedTime = UIUtils.formatBatchTime( + job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" + val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" + + ssc.sc.setJobDescription( + s"""Streaming job from $batchLinkText""") + ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) + ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) + // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 5d8cd57053250..9129c1f26abd4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -210,9 +210,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { lastStageInfo match { case Some(stageInfo) => val details = if (stageInfo.details.nonEmpty) { - - +details + + +details ++