From 0f32845edb58be9b5a935136d0cd3460438bb9f2 Mon Sep 17 00:00:00 2001 From: proflin Date: Sat, 19 Mar 2016 17:35:55 +0800 Subject: [PATCH 1/6] Add job/stage descriptionPlain for event-line --- .../main/scala/org/apache/spark/SparkContext.scala | 6 ++++++ .../org/apache/spark/ui/jobs/AllJobsPage.scala | 14 +++++++++----- .../apache/spark/ui/jobs/JobProgressListener.scala | 3 +++ .../scala/org/apache/spark/ui/jobs/UIData.scala | 5 +++++ .../spark/streaming/scheduler/JobScheduler.scala | 1 + 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d2cf3bfd60ee3..ed2e1371c4813 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -626,6 +626,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) } + def setJobDescriptionPlain(value: String) { + setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION_PLAIN, value) + } + /** * Assigns a group ID to all the jobs started by this thread until the group ID is set to a * different value or cleared. @@ -663,6 +667,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Clear the current thread's job group ID and its description. */ def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) + setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION_PLAIN, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) } @@ -2208,6 +2213,7 @@ object SparkContext extends Logging { } private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" + private[spark] val SPARK_JOB_DESCRIPTION_PLAIN = "spark.job.descriptionPlain" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 451cd83b51ae7..30e28d335b2df 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -52,15 +52,18 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { Removed .toString.filter(_ != '\n') - private def getLastStageNameAndDescription(job: JobUIData): (String, String) = { + private def getLastStageNameAndDescription(job: JobUIData, descPlain: Boolean) + : (String, String) = { val lastStageInfo = Option(job.stageIds) .filter(_.nonEmpty) - .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max)} + .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max) } val lastStageData = lastStageInfo.flatMap { s => parent.jobProgresslistener.stageIdToData.get((s.stageId, s.attemptId)) } val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val description = lastStageData.flatMap(_.description).getOrElse("") + val description = lastStageData.flatMap { s => + if (descPlain && s.descriptionPlain.isDefined) s.descriptionPlain else s.description + }.getOrElse("") (name, description) } @@ -70,7 +73,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { }.map { jobUIData => val jobId = jobUIData.jobId val status = jobUIData.status - val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData) + val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData, descPlain = true) val displayJobDescription = if (jobDescription.isEmpty) jobName else jobDescription val submissionTime = jobUIData.submissionTime.get val completionTimeOpt = jobUIData.completionTime @@ -215,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def makeRow(job: JobUIData): Seq[Node] = { - val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job) + val (lastStageName, lastStageDescription) = + getLastStageNameAndDescription(job, descPlain = false) val duration: Option[Long] = { job.submissionTime.map { start => val end = job.completionTime.getOrElse(System.currentTimeMillis()) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index ed3ab66e3b68b..b462e2b87e41d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -307,6 +307,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } + stageData.descriptionPlain = Option(stageSubmitted.properties).flatMap { + p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION_PLAIN)) + } val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) stages(stage.stageId) = stage diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 78165d7b743e2..0553b2bab76b1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -89,7 +89,12 @@ private[spark] object UIData { var diskBytesSpilled: Long = _ var schedulingPool: String = "" + // This will be used when: + // (1) we display text tables + // (2) we display the event line if descriptionPlain is not specifically defined var description: Option[String] = None + // This will be used when we display the event line + var descriptionPlain: Option[String] = None var accumulables = new HashMap[Long, AccumulableInfo] var taskData = new HashMap[Long, TaskUIData] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 61f9e0974ca95..30f9fb7f5fede 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -208,6 +208,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ssc.sc.setJobDescription( s"""Streaming job from $batchLinkText""") + ssc.sc.setJobDescriptionPlain(s"Streaming job from $batchLinkText") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // Checkpoint all RDDs marked for checkpointing to ensure their lineages are From 3c4178b48581c2e5cbe00b460979f0caec83b33d Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 22 Mar 2016 09:59:26 +0800 Subject: [PATCH 2/6] Revert "Add job/stage descriptionPlain for event-line" This reverts commit 0f32845edb58be9b5a935136d0cd3460438bb9f2. --- .../main/scala/org/apache/spark/SparkContext.scala | 6 ------ .../org/apache/spark/ui/jobs/AllJobsPage.scala | 14 +++++--------- .../apache/spark/ui/jobs/JobProgressListener.scala | 3 --- .../scala/org/apache/spark/ui/jobs/UIData.scala | 5 ----- .../spark/streaming/scheduler/JobScheduler.scala | 1 - 5 files changed, 5 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ed2e1371c4813..d2cf3bfd60ee3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -626,10 +626,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) } - def setJobDescriptionPlain(value: String) { - setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION_PLAIN, value) - } - /** * Assigns a group ID to all the jobs started by this thread until the group ID is set to a * different value or cleared. @@ -667,7 +663,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Clear the current thread's job group ID and its description. */ def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) - setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION_PLAIN, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) } @@ -2213,7 +2208,6 @@ object SparkContext extends Logging { } private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" - private[spark] val SPARK_JOB_DESCRIPTION_PLAIN = "spark.job.descriptionPlain" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 30e28d335b2df..451cd83b51ae7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -52,18 +52,15 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { Removed .toString.filter(_ != '\n') - private def getLastStageNameAndDescription(job: JobUIData, descPlain: Boolean) - : (String, String) = { + private def getLastStageNameAndDescription(job: JobUIData): (String, String) = { val lastStageInfo = Option(job.stageIds) .filter(_.nonEmpty) - .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max) } + .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max)} val lastStageData = lastStageInfo.flatMap { s => parent.jobProgresslistener.stageIdToData.get((s.stageId, s.attemptId)) } val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val description = lastStageData.flatMap { s => - if (descPlain && s.descriptionPlain.isDefined) s.descriptionPlain else s.description - }.getOrElse("") + val description = lastStageData.flatMap(_.description).getOrElse("") (name, description) } @@ -73,7 +70,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { }.map { jobUIData => val jobId = jobUIData.jobId val status = jobUIData.status - val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData, descPlain = true) + val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData) val displayJobDescription = if (jobDescription.isEmpty) jobName else jobDescription val submissionTime = jobUIData.submissionTime.get val completionTimeOpt = jobUIData.completionTime @@ -218,8 +215,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def makeRow(job: JobUIData): Seq[Node] = { - val (lastStageName, lastStageDescription) = - getLastStageNameAndDescription(job, descPlain = false) + val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job) val duration: Option[Long] = { job.submissionTime.map { start => val end = job.completionTime.getOrElse(System.currentTimeMillis()) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b462e2b87e41d..ed3ab66e3b68b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -307,9 +307,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - stageData.descriptionPlain = Option(stageSubmitted.properties).flatMap { - p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION_PLAIN)) - } val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) stages(stage.stageId) = stage diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 0553b2bab76b1..78165d7b743e2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -89,12 +89,7 @@ private[spark] object UIData { var diskBytesSpilled: Long = _ var schedulingPool: String = "" - // This will be used when: - // (1) we display text tables - // (2) we display the event line if descriptionPlain is not specifically defined var description: Option[String] = None - // This will be used when we display the event line - var descriptionPlain: Option[String] = None var accumulables = new HashMap[Long, AccumulableInfo] var taskData = new HashMap[Long, TaskUIData] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 30f9fb7f5fede..61f9e0974ca95 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -208,7 +208,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ssc.sc.setJobDescription( s"""Streaming job from $batchLinkText""") - ssc.sc.setJobDescriptionPlain(s"Streaming job from $batchLinkText") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // Checkpoint all RDDs marked for checkpointing to ensure their lineages are From 7b08f9595579dfc5c9ed0e7297a024b7577cd2d0 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 22 Mar 2016 15:28:04 +0800 Subject: [PATCH 3/6] fix job description for the event timeline --- .../scala/org/apache/spark/ui/UIUtils.scala | 37 ++++++--- .../apache/spark/ui/jobs/AllJobsPage.scala | 9 ++- .../org/apache/spark/ui/UIUtilsSuite.scala | 76 ++++++++++++++++--- 3 files changed, 100 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index aa2548a55412f..158163b4aa17a 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -417,7 +417,7 @@ private[spark] object UIUtils extends Logging { * attempts to embed links outside Spark UI, or other tags like