From 03a33a96c3cae0e4272a7bae2230f3a8c2c4589a Mon Sep 17 00:00:00 2001 From: zhoukang Date: Tue, 5 Sep 2017 18:28:06 +0800 Subject: [PATCH 1/8] Fix duration always updating when task failed but status is still RUNNING --- .../org/apache/spark/ui/jobs/StagePage.scala | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 633e740b9c9bd..1b45b1494065a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,10 +23,10 @@ import javax.servlet.http.HttpServletRequest import scala.collection.mutable.HashSet import scala.xml.{Elem, Node, Unparsed} - import org.apache.commons.lang3.StringEscapeUtils - +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ @@ -41,6 +41,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener private val executorsListener = parent.executorsListener + private val logDir = parent.conf.getOption("spark.history.fs.logDirectory") + .getOrElse("") + private var fs: FileSystem = null private val TIMELINE_LEGEND = {
@@ -286,6 +289,36 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { 1 } } + + def getApplicationId = { + var applicationId = "" + val applicationIdArray = parent.basePath.split("/").filter(_.startsWith("application")) + if (applicationIdArray.length != 0) { + applicationId = applicationIdArray(0) + } + applicationId + } + + def getFileSystem = { + if (!logDir.isEmpty && fs == null) { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(parent.conf) + fs = new Path(logDir).getFileSystem(hadoopConf) + } + fs + } + + def getLatestUpdateTime = { + var latestUpdateTime = System.currentTimeMillis() + if (!logDir.isEmpty) { + val statuses = getFileSystem.globStatus(new Path(logDir + "/" + getApplicationId + "*")) + if (statuses.length != 0) { + val status = statuses(0) + latestUpdateTime = status.getModificationTime + } + } + latestUpdateTime + } + val currentTime = System.currentTimeMillis() val (taskTable, taskTableHTML) = try { val _taskTable = new TaskPagedTable( @@ -300,6 +333,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stageData.hasShuffleWrite, stageData.hasBytesSpilled, currentTime, + getLatestUpdateTime, pageSize = taskPageSize, sortColumn = taskSortColumn, desc = taskSortDesc, @@ -864,6 +898,7 @@ private[ui] class TaskDataSource( hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, currentTime: Long, + latestUpdateTime: Long, pageSize: Int, sortColumn: String, desc: Boolean, @@ -889,7 +924,9 @@ private[ui] class TaskDataSource( private def taskRow(taskData: TaskUIData): TaskTableRowData = { val info = taskData.taskInfo val metrics = taskData.metrics - val duration = taskData.taskDuration.getOrElse(1L) + val duration = + if (info.status == "RUNNING") info.timeRunning(latestUpdateTime) + else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("") val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) @@ -1155,6 +1192,7 @@ private[ui] class TaskPagedTable( hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, currentTime: Long, + latestUpdateTime: Long, pageSize: Int, sortColumn: String, desc: Boolean, @@ -1180,6 +1218,7 @@ private[ui] class TaskPagedTable( hasShuffleWrite, hasBytesSpilled, currentTime, + latestUpdateTime: Long, pageSize, sortColumn, desc, From a442000878bbb7a36b6f4f3dd62f274aea0bddcf Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 6 Sep 2017 11:02:41 +0800 Subject: [PATCH 2/8] Revert "Fix duration always updating when task failed but status is still RUNNING" This reverts commit 03a33a96c3cae0e4272a7bae2230f3a8c2c4589a. --- .../org/apache/spark/ui/jobs/StagePage.scala | 45 ++----------------- 1 file changed, 3 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 1b45b1494065a..633e740b9c9bd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,10 +23,10 @@ import javax.servlet.http.HttpServletRequest import scala.collection.mutable.HashSet import scala.xml.{Elem, Node, Unparsed} + import org.apache.commons.lang3.StringEscapeUtils -import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ @@ -41,9 +41,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener private val executorsListener = parent.executorsListener - private val logDir = parent.conf.getOption("spark.history.fs.logDirectory") - .getOrElse("") - private var fs: FileSystem = null private val TIMELINE_LEGEND = {
@@ -289,36 +286,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { 1 } } - - def getApplicationId = { - var applicationId = "" - val applicationIdArray = parent.basePath.split("/").filter(_.startsWith("application")) - if (applicationIdArray.length != 0) { - applicationId = applicationIdArray(0) - } - applicationId - } - - def getFileSystem = { - if (!logDir.isEmpty && fs == null) { - val hadoopConf = SparkHadoopUtil.get.newConfiguration(parent.conf) - fs = new Path(logDir).getFileSystem(hadoopConf) - } - fs - } - - def getLatestUpdateTime = { - var latestUpdateTime = System.currentTimeMillis() - if (!logDir.isEmpty) { - val statuses = getFileSystem.globStatus(new Path(logDir + "/" + getApplicationId + "*")) - if (statuses.length != 0) { - val status = statuses(0) - latestUpdateTime = status.getModificationTime - } - } - latestUpdateTime - } - val currentTime = System.currentTimeMillis() val (taskTable, taskTableHTML) = try { val _taskTable = new TaskPagedTable( @@ -333,7 +300,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stageData.hasShuffleWrite, stageData.hasBytesSpilled, currentTime, - getLatestUpdateTime, pageSize = taskPageSize, sortColumn = taskSortColumn, desc = taskSortDesc, @@ -898,7 +864,6 @@ private[ui] class TaskDataSource( hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, currentTime: Long, - latestUpdateTime: Long, pageSize: Int, sortColumn: String, desc: Boolean, @@ -924,9 +889,7 @@ private[ui] class TaskDataSource( private def taskRow(taskData: TaskUIData): TaskTableRowData = { val info = taskData.taskInfo val metrics = taskData.metrics - val duration = - if (info.status == "RUNNING") info.timeRunning(latestUpdateTime) - else metrics.map(_.executorRunTime).getOrElse(1L) + val duration = taskData.taskDuration.getOrElse(1L) val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("") val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) @@ -1192,7 +1155,6 @@ private[ui] class TaskPagedTable( hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, currentTime: Long, - latestUpdateTime: Long, pageSize: Int, sortColumn: String, desc: Boolean, @@ -1218,7 +1180,6 @@ private[ui] class TaskPagedTable( hasShuffleWrite, hasBytesSpilled, currentTime, - latestUpdateTime: Long, pageSize, sortColumn, desc, From 8b474184dd18fa55df7105f5905658b9f3fc3315 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 6 Sep 2017 11:34:44 +0800 Subject: [PATCH 3/8] Do not access fs in SPARK-UI pass event log access time from FSHistoryServer --- .../apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../apache/spark/status/api/v1/AllStagesResource.scala | 2 +- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 8 ++++++-- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 9 +++++++-- .../main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 1 + .../src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 5 +++-- 6 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 687fd2d3ffe64..d892e2ae74de2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -249,7 +249,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, HistoryServer.getAttemptURI(appId, attempt.attemptId), - attempt.startTime) + attempt.lastUpdated, attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 56028710ecc66..9e5dd8f6b9a24 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -142,7 +142,7 @@ private[v1] object AllStagesResource { index = uiData.taskInfo.index, attempt = uiData.taskInfo.attemptNumber, launchTime = new Date(uiData.taskInfo.launchTime), - duration = uiData.taskDuration, + duration = uiData.taskDuration(), executorId = uiData.taskInfo.executorId, host = uiData.taskInfo.host, status = uiData.taskInfo.status, diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 589f811145519..661bf39964cfa 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,6 +50,7 @@ private[spark] class SparkUI private ( val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, + val lastUpdateTime: Long = -1L, val startTime: Long) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -176,9 +177,11 @@ private[spark] object SparkUI { securityManager: SecurityManager, appName: String, basePath: String, + lastUpdateTime: Long, startTime: Long): SparkUI = { val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) + None, conf, listenerBus, securityManager, appName, basePath, + lastUpdateTime = lastUpdateTime, startTime = startTime) val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], Utils.getContextOrSparkClassLoader).asScala @@ -204,6 +207,7 @@ private[spark] object SparkUI { appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, + lastUpdateTime: Long = -1L, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { @@ -226,6 +230,6 @@ private[spark] object SparkUI { new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + appName, basePath, lastUpdateTime, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 633e740b9c9bd..09ac843a36945 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -299,6 +299,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled, + parent.lastUpdateTime, currentTime, pageSize = taskPageSize, sortColumn = taskSortColumn, @@ -863,6 +864,7 @@ private[ui] class TaskDataSource( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, + lastUpdateTime: Long, currentTime: Long, pageSize: Int, sortColumn: String, @@ -889,8 +891,9 @@ private[ui] class TaskDataSource( private def taskRow(taskData: TaskUIData): TaskTableRowData = { val info = taskData.taskInfo val metrics = taskData.metrics - val duration = taskData.taskDuration.getOrElse(1L) - val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("") + val duration = taskData.taskDuration(lastUpdateTime).getOrElse(1L) + val formatDuration = + taskData.taskDuration(lastUpdateTime).map(d => UIUtils.formatDuration(d)).getOrElse("") val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) @@ -1154,6 +1157,7 @@ private[ui] class TaskPagedTable( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, + lastUpdateTime: Long, currentTime: Long, pageSize: Int, sortColumn: String, @@ -1179,6 +1183,7 @@ private[ui] class TaskPagedTable( hasShuffleRead, hasShuffleWrite, hasBytesSpilled, + lastUpdateTime, currentTime, pageSize, sortColumn, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 799d769626395..0787ea6625903 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -30,6 +30,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener val executorsListener = parent.executorsListener + val lastUpdateTime = parent.lastUpdateTime attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 9448baac096dc..cbfc6d6969cee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -133,9 +133,10 @@ private[spark] object UIData { _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics) } - def taskDuration: Option[Long] = { + def taskDuration(lastUpdateTime: Long = -1L): Option[Long] = { if (taskInfo.status == "RUNNING") { - Some(_taskInfo.timeRunning(System.currentTimeMillis)) + Some(_taskInfo.timeRunning( + if (lastUpdateTime == -1) System.currentTimeMillis else lastUpdateTime)) } else { _metrics.map(_.executorRunTime) } From 25fe22cddde276f846fd4808de1b575a87b1c059 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Tue, 12 Sep 2017 11:14:22 +0800 Subject: [PATCH 4/8] Update as commented --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 6 +++--- .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 5 ++--- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d892e2ae74de2..20fe911f2d294 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -249,7 +249,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, HistoryServer.getAttemptURI(appId, attempt.attemptId), - attempt.lastUpdated, attempt.startTime) + Some(attempt.lastUpdated), attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 661bf39964cfa..f3fcf2778d39e 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,7 +50,7 @@ private[spark] class SparkUI private ( val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, - val lastUpdateTime: Long = -1L, + val lastUpdateTime: Option[Long] = None, val startTime: Long) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -177,7 +177,7 @@ private[spark] object SparkUI { securityManager: SecurityManager, appName: String, basePath: String, - lastUpdateTime: Long, + lastUpdateTime: Option[Long], startTime: Long): SparkUI = { val sparkUI = create( None, conf, listenerBus, securityManager, appName, basePath, @@ -207,7 +207,7 @@ private[spark] object SparkUI { appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, - lastUpdateTime: Long = -1L, + lastUpdateTime: Option[Long] = None, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 09ac843a36945..4d80308eb0a6d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -864,7 +864,7 @@ private[ui] class TaskDataSource( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, - lastUpdateTime: Long, + lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, @@ -1157,7 +1157,7 @@ private[ui] class TaskPagedTable( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, - lastUpdateTime: Long, + lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index cbfc6d6969cee..126a96bbb0163 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -133,10 +133,9 @@ private[spark] object UIData { _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics) } - def taskDuration(lastUpdateTime: Long = -1L): Option[Long] = { + def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = { if (taskInfo.status == "RUNNING") { - Some(_taskInfo.timeRunning( - if (lastUpdateTime == -1) System.currentTimeMillis else lastUpdateTime)) + Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis))) } else { _metrics.map(_.executorRunTime) } From 560d442a8d25e37f9d831699663b1c8413ddd6a9 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 13 Sep 2017 13:48:38 +0800 Subject: [PATCH 5/8] Update as commented --- .../spark/status/api/v1/AllStagesResource.scala | 16 +++++++++++----- .../spark/status/api/v1/OneStageResource.scala | 7 ++++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 9e5dd8f6b9a24..a55e49abd8a47 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -47,7 +47,8 @@ private[v1] class AllStagesResource(ui: SparkUI) { listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) } } yield { - AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) + AllStagesResource.stageUiToStageData( + status, stageInfo, stageUiData, includeDetails = false, Some(ui)) } } } @@ -57,7 +58,8 @@ private[v1] object AllStagesResource { status: StageStatus, stageInfo: StageInfo, stageUiData: StageUIData, - includeDetails: Boolean): StageData = { + includeDetails: Boolean, + sparkUI: Option[SparkUI] = None): StageData = { val taskLaunchTimes = stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0) @@ -69,7 +71,7 @@ private[v1] object AllStagesResource { } val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) + Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v, sparkUI) } ) } else { None } @@ -136,13 +138,17 @@ private[v1] object AllStagesResource { } } - def convertTaskData(uiData: TaskUIData): TaskData = { + def convertTaskData(uiData: TaskUIData, sparkUI: Option[SparkUI]): TaskData = { new TaskData( taskId = uiData.taskInfo.taskId, index = uiData.taskInfo.index, attempt = uiData.taskInfo.attemptNumber, launchTime = new Date(uiData.taskInfo.launchTime), - duration = uiData.taskDuration(), + duration = uiData.taskDuration( + sparkUI match { + case Some(ui) => ui.lastUpdateTime + case None => None + }), executorId = uiData.taskInfo.executorId, host = uiData.taskInfo.host, status = uiData.taskInfo.status, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index 3e6d2942d0fbb..ff23f03446507 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -36,7 +36,7 @@ private[v1] class OneStageResource(ui: SparkUI) { withStage(stageId) { stageAttempts => stageAttempts.map { stage => AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, - includeDetails = true) + includeDetails = true, Some(ui)) } } } @@ -48,7 +48,7 @@ private[v1] class OneStageResource(ui: SparkUI) { @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { withStageAttempt(stageId, stageAttemptId) { stage => AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, - includeDetails = true) + includeDetails = true, Some(ui)) } } @@ -81,7 +81,8 @@ private[v1] class OneStageResource(ui: SparkUI) { @DefaultValue("20") @QueryParam("length") length: Int, @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { withStageAttempt(stageId, stageAttemptId) { stage => - val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq + val tasks = stage.ui.taskData.values.map{ + AllStagesResource.convertTaskData(_, Some(ui))}.toIndexedSeq .sorted(OneStageResource.ordering(sortBy)) tasks.slice(offset, offset + length) } From f0e1dedf1a2f167f9a4be66050cb8948e9ccbea2 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 13 Sep 2017 15:38:14 +0800 Subject: [PATCH 6/8] Make api more cleaner --- .../spark/status/api/v1/AllStagesResource.scala | 17 +++++++---------- .../spark/status/api/v1/OneStageResource.scala | 8 +++++--- .../scala/org/apache/spark/ui/jobs/UIData.scala | 1 + 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index a55e49abd8a47..5ad4b50cd13af 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -47,8 +47,9 @@ private[v1] class AllStagesResource(ui: SparkUI) { listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) } } yield { + stageUiData.jobLastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData( - status, stageInfo, stageUiData, includeDetails = false, Some(ui)) + status, stageInfo, stageUiData, includeDetails = false) } } } @@ -58,8 +59,7 @@ private[v1] object AllStagesResource { status: StageStatus, stageInfo: StageInfo, stageUiData: StageUIData, - includeDetails: Boolean, - sparkUI: Option[SparkUI] = None): StageData = { + includeDetails: Boolean): StageData = { val taskLaunchTimes = stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0) @@ -71,7 +71,8 @@ private[v1] object AllStagesResource { } val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v, sparkUI) } ) + Some(stageUiData.taskData.map { case (k, v) => + k -> convertTaskData(v, stageUiData.jobLastUpdateTime) } ) } else { None } @@ -138,17 +139,13 @@ private[v1] object AllStagesResource { } } - def convertTaskData(uiData: TaskUIData, sparkUI: Option[SparkUI]): TaskData = { + def convertTaskData(uiData: TaskUIData, jobLastUpdateTime: Option[Long]): TaskData = { new TaskData( taskId = uiData.taskInfo.taskId, index = uiData.taskInfo.index, attempt = uiData.taskInfo.attemptNumber, launchTime = new Date(uiData.taskInfo.launchTime), - duration = uiData.taskDuration( - sparkUI match { - case Some(ui) => ui.lastUpdateTime - case None => None - }), + duration = uiData.taskDuration(jobLastUpdateTime), executorId = uiData.taskInfo.executorId, host = uiData.taskInfo.host, status = uiData.taskInfo.status, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index ff23f03446507..207ff6c248fa1 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -35,8 +35,9 @@ private[v1] class OneStageResource(ui: SparkUI) { def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { withStage(stageId) { stageAttempts => stageAttempts.map { stage => + stage.ui.jobLastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, - includeDetails = true, Some(ui)) + includeDetails = true) } } } @@ -47,8 +48,9 @@ private[v1] class OneStageResource(ui: SparkUI) { @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { withStageAttempt(stageId, stageAttemptId) { stage => + stage.ui.jobLastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, - includeDetails = true, Some(ui)) + includeDetails = true) } } @@ -82,7 +84,7 @@ private[v1] class OneStageResource(ui: SparkUI) { @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { withStageAttempt(stageId, stageAttemptId) { stage => val tasks = stage.ui.taskData.values.map{ - AllStagesResource.convertTaskData(_, Some(ui))}.toIndexedSeq + AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq .sorted(OneStageResource.ordering(sortBy)) tasks.slice(offset, offset + length) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 126a96bbb0163..49ec54de18032 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -97,6 +97,7 @@ private[spark] object UIData { var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ var isBlacklisted: Int = _ + var jobLastUpdateTime: Option[Long] = None var schedulingPool: String = "" var description: Option[String] = None From 8ec4319dd81cf4e19917fe4a64a016049f3a2e83 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 14 Sep 2017 13:59:28 +0800 Subject: [PATCH 7/8] Modify field name and update code style --- .../spark/status/api/v1/AllStagesResource.scala | 11 +++++------ .../apache/spark/status/api/v1/OneStageResource.scala | 8 ++++---- .../main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 5ad4b50cd13af..0f04408c06438 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -47,9 +47,8 @@ private[v1] class AllStagesResource(ui: SparkUI) { listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) } } yield { - stageUiData.jobLastUpdateTime = ui.lastUpdateTime - AllStagesResource.stageUiToStageData( - status, stageInfo, stageUiData, includeDetails = false) + stageUiData.lastUpdateTime = ui.lastUpdateTime + AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) } } } @@ -72,7 +71,7 @@ private[v1] object AllStagesResource { val taskData = if (includeDetails) { Some(stageUiData.taskData.map { case (k, v) => - k -> convertTaskData(v, stageUiData.jobLastUpdateTime) } ) + k -> convertTaskData(v, stageUiData.lastUpdateTime) } ) } else { None } @@ -139,13 +138,13 @@ private[v1] object AllStagesResource { } } - def convertTaskData(uiData: TaskUIData, jobLastUpdateTime: Option[Long]): TaskData = { + def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): TaskData = { new TaskData( taskId = uiData.taskInfo.taskId, index = uiData.taskInfo.index, attempt = uiData.taskInfo.attemptNumber, launchTime = new Date(uiData.taskInfo.launchTime), - duration = uiData.taskDuration(jobLastUpdateTime), + duration = uiData.taskDuration(lastUpdateTime), executorId = uiData.taskInfo.executorId, host = uiData.taskInfo.host, status = uiData.taskInfo.status, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index 207ff6c248fa1..f15073bccced2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -35,7 +35,7 @@ private[v1] class OneStageResource(ui: SparkUI) { def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { withStage(stageId) { stageAttempts => stageAttempts.map { stage => - stage.ui.jobLastUpdateTime = ui.lastUpdateTime + stage.ui.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) } @@ -48,7 +48,7 @@ private[v1] class OneStageResource(ui: SparkUI) { @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { withStageAttempt(stageId, stageAttemptId) { stage => - stage.ui.jobLastUpdateTime = ui.lastUpdateTime + stage.ui.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) } @@ -83,8 +83,8 @@ private[v1] class OneStageResource(ui: SparkUI) { @DefaultValue("20") @QueryParam("length") length: Int, @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { withStageAttempt(stageId, stageAttemptId) { stage => - val tasks = stage.ui.taskData.values.map{ - AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq + val tasks = stage.ui.taskData.values + .map{ AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq .sorted(OneStageResource.ordering(sortBy)) tasks.slice(offset, offset + length) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 49ec54de18032..d9c87f69d8a54 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -97,7 +97,7 @@ private[spark] object UIData { var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ var isBlacklisted: Int = _ - var jobLastUpdateTime: Option[Long] = None + var lastUpdateTime: Option[Long] = None var schedulingPool: String = "" var description: Option[String] = None From 06035f7cd637511a01d3e9dc04c1d6f78afc27cc Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 14 Sep 2017 15:03:16 +0800 Subject: [PATCH 8/8] Update code style --- .../org/apache/spark/status/api/v1/AllStagesResource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 0f04408c06438..4a4ed954d689e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -71,7 +71,7 @@ private[v1] object AllStagesResource { val taskData = if (includeDetails) { Some(stageUiData.taskData.map { case (k, v) => - k -> convertTaskData(v, stageUiData.lastUpdateTime) } ) + k -> convertTaskData(v, stageUiData.lastUpdateTime) }) } else { None }