From a457acf3faeba94bb1473d2496b8a7126e4f8b1d Mon Sep 17 00:00:00 2001 From: Sandor Murakozi Date: Wed, 6 Dec 2017 17:34:28 +0100 Subject: [PATCH 1/4] [SPARK-21672][CORE] Remove SHS-specific application / attempt data structures --- .../history/ApplicationHistoryProvider.scala | 30 ++------------ .../deploy/history/FsHistoryProvider.scala | 32 ++++++--------- .../spark/deploy/history/HistoryServer.scala | 8 ++-- .../spark/status/AppStatusListener.scala | 15 +++---- .../apache/spark/status/AppStatusStore.scala | 5 ++- .../api/v1/ApplicationListResource.scala | 32 --------------- .../org/apache/spark/status/api/v1/api.scala | 39 +++++++++++-------- .../history/FsHistoryProviderSuite.scala | 13 +++++-- 8 files changed, 61 insertions(+), 113 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 38f0d6f2afa5e..f1c06205bf04c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -23,31 +23,9 @@ import java.util.zip.ZipOutputStream import scala.xml.Node import org.apache.spark.SparkException +import org.apache.spark.status.api.v1.ApplicationInfo import org.apache.spark.ui.SparkUI -private[spark] case class ApplicationAttemptInfo( - attemptId: Option[String], - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - completed: Boolean = false, - appSparkVersion: String) - -private[spark] case class ApplicationHistoryInfo( - id: String, - name: String, - attempts: List[ApplicationAttemptInfo]) { - - /** - * Has this application completed? - * @return true if the most recent attempt has completed - */ - def completed: Boolean = { - attempts.nonEmpty && attempts.head.completed - } -} - /** * A loaded UI for a Spark application. * @@ -119,7 +97,7 @@ private[history] abstract class ApplicationHistoryProvider { * * @return List of all know applications. */ - def getListing(): Iterator[ApplicationHistoryInfo] + def getListing(): Iterator[ApplicationInfo] /** * Returns the Spark UI for a specific application. @@ -152,9 +130,9 @@ private[history] abstract class ApplicationHistoryProvider { def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit /** - * @return the [[ApplicationHistoryInfo]] for the appId if it exists. + * @return the [[ApplicationInfo]] for the appId if it exists. */ - def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] + def getApplicationInfo(appId: String): Option[ApplicationInfo] /** * @return html text to display when the application list is empty diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6a83c106f6d84..a8e1becc56ab7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -43,7 +43,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ -import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -252,19 +252,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[ApplicationHistoryInfo] = { + override def getListing(): Iterator[ApplicationInfo] = { // Return the listing in end time descending order. listing.view(classOf[ApplicationInfoWrapper]) .index("endTime") .reverse() .iterator() .asScala - .map(_.toAppHistoryInfo()) + .map(_.toApplicationInfo()) } - override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = { + override def getApplicationInfo(appId: String): Option[ApplicationInfo] = { try { - Some(load(appId).toAppHistoryInfo()) + Some(load(appId).toApplicationInfo()) } catch { case e: NoSuchElementException => None @@ -795,24 +795,16 @@ private[history] case class LogInfo( fileSize: Long) private[history] class AttemptInfoWrapper( - val info: v1.ApplicationAttemptInfo, + val info: ApplicationAttemptInfo, val logPath: String, val fileSize: Long, val adminAcls: Option[String], val viewAcls: Option[String], val adminAclsGroups: Option[String], - val viewAclsGroups: Option[String]) { - - def toAppAttemptInfo(): ApplicationAttemptInfo = { - ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), - info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, - info.completed, info.appSparkVersion) - } - -} + val viewAclsGroups: Option[String]) private[history] class ApplicationInfoWrapper( - val info: v1.ApplicationInfo, + val info: ApplicationInfo, val attempts: List[AttemptInfoWrapper]) { @JsonIgnore @KVIndexParam @@ -824,9 +816,7 @@ private[history] class ApplicationInfoWrapper( @JsonIgnore @KVIndexParam("oldestAttempt") def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min - def toAppHistoryInfo(): ApplicationHistoryInfo = { - ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) - } + def toApplicationInfo(): ApplicationInfo = info.copy(attempts = attempts.map(_.info)) } @@ -883,7 +873,7 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends var memoryPerExecutorMB: Option[Int] = None def toView(): ApplicationInfoWrapper = { - val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, + val apiInfo = ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, memoryPerExecutorMB, Nil) new ApplicationInfoWrapper(apiInfo, List(attempt.toView())) } @@ -906,7 +896,7 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends var viewAclsGroups: Option[String] = None def toView(): AttemptInfoWrapper = { - val apiInfo = new v1.ApplicationAttemptInfo( + val apiInfo = ApplicationAttemptInfo( attemptId, startTime, endTime, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index b822a48e98e91..5f11dc51fc0df 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot} +import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils} @@ -175,7 +175,7 @@ class HistoryServer( * * @return List of all known applications. */ - def getApplicationList(): Iterator[ApplicationHistoryInfo] = { + def getApplicationList(): Iterator[ApplicationInfo] = { provider.getListing() } @@ -188,11 +188,11 @@ class HistoryServer( } def getApplicationInfoList: Iterator[ApplicationInfo] = { - getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + getApplicationList() } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { - provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + provider.getApplicationInfo(appId) } override def writeEventLogs( diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 9c23d9d8c923a..9580b4346d214 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -29,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, RuntimeInfo} import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ @@ -50,7 +51,7 @@ private[spark] class AppStatusListener( import config._ private var sparkVersion = SPARK_VERSION - private var appInfo: v1.ApplicationInfo = null + private var appInfo: ApplicationInfo = null private var coresPerTask: Int = 1 // How often to update live entities. -1 means "never update" when replaying applications, @@ -77,7 +78,7 @@ private[spark] class AppStatusListener( override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { assert(event.appId.isDefined, "Application without IDs are not supported.") - val attempt = new v1.ApplicationAttemptInfo( + val attempt = ApplicationAttemptInfo( event.appAttemptId, new Date(event.time), new Date(-1), @@ -87,7 +88,7 @@ private[spark] class AppStatusListener( false, sparkVersion) - appInfo = new v1.ApplicationInfo( + appInfo = ApplicationInfo( event.appId.get, event.appName, None, @@ -103,12 +104,12 @@ private[spark] class AppStatusListener( val details = event.environmentDetails val jvmInfo = Map(details("JVM Information"): _*) - val runtime = new v1.RuntimeInfo( + val runtime = new RuntimeInfo( jvmInfo.get("Java Version").orNull, jvmInfo.get("Java Home").orNull, jvmInfo.get("Scala Version").orNull) - val envInfo = new v1.ApplicationEnvironmentInfo( + val envInfo = new ApplicationEnvironmentInfo( runtime, details.getOrElse("Spark Properties", Nil), details.getOrElse("System Properties", Nil), @@ -122,7 +123,7 @@ private[spark] class AppStatusListener( override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { val old = appInfo.attempts.head - val attempt = new v1.ApplicationAttemptInfo( + val attempt = ApplicationAttemptInfo( old.attemptId, old.startTime, new Date(event.time), @@ -132,7 +133,7 @@ private[spark] class AppStatusListener( true, old.appSparkVersion) - appInfo = new v1.ApplicationInfo( + appInfo = ApplicationInfo( appInfo.id, appInfo.name, None, diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 22d768b3cb990..fa638e0693ab6 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, ApplicationInfo} import org.apache.spark.ui.scope._ import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} @@ -36,11 +37,11 @@ private[spark] class AppStatusStore( val store: KVStore, listener: Option[AppStatusListener] = None) { - def applicationInfo(): v1.ApplicationInfo = { + def applicationInfo(): ApplicationInfo = { store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info } - def environmentInfo(): v1.ApplicationEnvironmentInfo = { + def environmentInfo(): ApplicationEnvironmentInfo = { val klass = classOf[ApplicationEnvironmentInfoWrapper] store.read(klass, klass.getName()).info } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 91660a524ca93..69054f2b771f1 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -20,8 +20,6 @@ import java.util.{Date, List => JList} import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} import javax.ws.rs.core.MediaType -import org.apache.spark.deploy.history.ApplicationHistoryInfo - @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ApplicationListResource extends ApiRequestContext { @@ -67,33 +65,3 @@ private[v1] class ApplicationListResource extends ApiRequestContext { startTimeOk && endTimeOk } } - -private[spark] object ApplicationsListResource { - def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = { - new ApplicationInfo( - id = app.id, - name = app.name, - coresGranted = None, - maxCores = None, - coresPerExecutor = None, - memoryPerExecutorMB = None, - attempts = app.attempts.map { internalAttemptInfo => - new ApplicationAttemptInfo( - attemptId = internalAttemptInfo.attemptId, - startTime = new Date(internalAttemptInfo.startTime), - endTime = new Date(internalAttemptInfo.endTime), - duration = - if (internalAttemptInfo.endTime > 0) { - internalAttemptInfo.endTime - internalAttemptInfo.startTime - } else { - 0 - }, - lastUpdated = new Date(internalAttemptInfo.lastUpdated), - sparkUser = internalAttemptInfo.sparkUser, - completed = internalAttemptInfo.completed, - appSparkVersion = internalAttemptInfo.appSparkVersion - ) - } - ) - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 14280099f6422..1b174eb6bbb9a 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -24,27 +24,32 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus -class ApplicationInfo private[spark]( - val id: String, - val name: String, - val coresGranted: Option[Int], - val maxCores: Option[Int], - val coresPerExecutor: Option[Int], - val memoryPerExecutorMB: Option[Int], - val attempts: Seq[ApplicationAttemptInfo]) +case class ApplicationInfo private[spark]( + id: String, + name: String, + coresGranted: Option[Int], + maxCores: Option[Int], + coresPerExecutor: Option[Int], + memoryPerExecutorMB: Option[Int], + attempts: Seq[ApplicationAttemptInfo]) { + + def completed: Boolean = { + attempts.nonEmpty && attempts.head.completed + } +} @JsonIgnoreProperties( value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"), allowGetters = true) -class ApplicationAttemptInfo private[spark]( - val attemptId: Option[String], - val startTime: Date, - val endTime: Date, - val lastUpdated: Date, - val duration: Long, - val sparkUser: String, - val completed: Boolean = false, - val appSparkVersion: String) { +case class ApplicationAttemptInfo private[spark]( + attemptId: Option[String], + startTime: Date, + endTime: Date, + lastUpdated: Date, + duration: Long, + sparkUser: String, + completed: Boolean = false, + appSparkVersion: String) { def getStartTimeEpoch: Long = startTime.getTime diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 86c8cdf43258c..84ee01c7f5aaf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.history import java.io._ import java.nio.charset.StandardCharsets +import java.util.Date import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} @@ -42,6 +43,7 @@ import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.security.GroupMappingServiceProvider import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { @@ -114,9 +116,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc end: Long, lastMod: Long, user: String, - completed: Boolean): ApplicationHistoryInfo = { - ApplicationHistoryInfo(id, name, - List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed, ""))) + completed: Boolean): ApplicationInfo = { + + val duration = if (end > 0) end - start else 0 + new ApplicationInfo(id, name, None, None, None, None, + List(ApplicationAttemptInfo(None, new Date(start), + new Date(end), new Date(lastMod), duration, user, completed, ""))) } // For completed files, lastUpdated would be lastModified time. @@ -667,7 +672,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc * } */ private def updateAndCheck(provider: FsHistoryProvider) - (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = { + (checkFn: Seq[ApplicationInfo] => Unit): Unit = { provider.checkForLogs() provider.cleanLogs() checkFn(provider.getListing().toSeq) From 29d8735e0272a84726604ab7cc344cc9abc088f8 Mon Sep 17 00:00:00 2001 From: Sandor Murakozi Date: Thu, 7 Dec 2017 21:24:16 +0100 Subject: [PATCH 2/4] Changes requested during PR review --- .../apache/spark/deploy/history/HistoryPage.scala | 9 ++++++++- .../spark/deploy/history/HistoryServer.scala | 2 +- .../apache/spark/status/AppStatusListener.scala | 15 +++++++-------- .../org/apache/spark/status/AppStatusStore.scala | 10 ++++------ .../org/apache/spark/status/api/v1/api.scala | 3 --- .../spark/deploy/history/HistoryServerSuite.scala | 9 +++++++-- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 6399dccc1676a..67ac430765139 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -21,6 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.status.api.v1.ApplicationInfo import org.apache.spark.ui.{UIUtils, WebUIPage} private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { @@ -30,7 +31,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(UIUtils.stripXSS(request.getParameter("showIncomplete"))).getOrElse("false").toBoolean - val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete) + val allAppsSize = parent.getApplicationList(). + count(isApplicationCompleted(_) != requestedIncomplete) val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess() val lastUpdatedTime = parent.getLastUpdatedTime() val providerConfig = parent.getProviderConfig() @@ -88,4 +90,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") private def makePageLink(showIncomplete: Boolean): String = { UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete) } + + private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = { + appInfo.attempts.nonEmpty && appInfo.attempts.head.completed + + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 5f11dc51fc0df..75484f5c9f30f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -192,7 +192,7 @@ class HistoryServer( } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { - provider.getApplicationInfo(appId) + provider.getApplicationInfo(appId) } override def writeEventLogs( diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 9580b4346d214..6da44cbc44c4d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -29,7 +29,6 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 -import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, RuntimeInfo} import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ @@ -51,7 +50,7 @@ private[spark] class AppStatusListener( import config._ private var sparkVersion = SPARK_VERSION - private var appInfo: ApplicationInfo = null + private var appInfo: v1.ApplicationInfo = null private var coresPerTask: Int = 1 // How often to update live entities. -1 means "never update" when replaying applications, @@ -78,7 +77,7 @@ private[spark] class AppStatusListener( override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { assert(event.appId.isDefined, "Application without IDs are not supported.") - val attempt = ApplicationAttemptInfo( + val attempt = v1.ApplicationAttemptInfo( event.appAttemptId, new Date(event.time), new Date(-1), @@ -88,7 +87,7 @@ private[spark] class AppStatusListener( false, sparkVersion) - appInfo = ApplicationInfo( + appInfo = v1.ApplicationInfo( event.appId.get, event.appName, None, @@ -104,12 +103,12 @@ private[spark] class AppStatusListener( val details = event.environmentDetails val jvmInfo = Map(details("JVM Information"): _*) - val runtime = new RuntimeInfo( + val runtime = new v1.RuntimeInfo( jvmInfo.get("Java Version").orNull, jvmInfo.get("Java Home").orNull, jvmInfo.get("Scala Version").orNull) - val envInfo = new ApplicationEnvironmentInfo( + val envInfo = new v1.ApplicationEnvironmentInfo( runtime, details.getOrElse("Spark Properties", Nil), details.getOrElse("System Properties", Nil), @@ -123,7 +122,7 @@ private[spark] class AppStatusListener( override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { val old = appInfo.attempts.head - val attempt = ApplicationAttemptInfo( + val attempt = v1.ApplicationAttemptInfo( old.attemptId, old.startTime, new Date(event.time), @@ -133,7 +132,7 @@ private[spark] class AppStatusListener( true, old.appSparkVersion) - appInfo = ApplicationInfo( + appInfo = v1.ApplicationInfo( appInfo.id, appInfo.name, None, diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index fa638e0693ab6..58e41a822e533 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -17,17 +17,15 @@ package org.apache.spark.status -import java.io.File -import java.util.{Arrays, List => JList} +import java.util.{List => JList} import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.api.v1 -import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, ApplicationInfo} import org.apache.spark.ui.scope._ -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.{Distribution} import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} /** @@ -37,11 +35,11 @@ private[spark] class AppStatusStore( val store: KVStore, listener: Option[AppStatusListener] = None) { - def applicationInfo(): ApplicationInfo = { + def applicationInfo(): v1.ApplicationInfo = { store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info } - def environmentInfo(): ApplicationEnvironmentInfo = { + def environmentInfo(): v1.ApplicationEnvironmentInfo = { val klass = classOf[ApplicationEnvironmentInfoWrapper] store.read(klass, klass.getName()).info } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 1b174eb6bbb9a..0c4ac90b1fe2d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -33,9 +33,6 @@ case class ApplicationInfo private[spark]( memoryPerExecutorMB: Option[Int], attempts: Seq[ApplicationAttemptInfo]) { - def completed: Boolean = { - attempts.nonEmpty && attempts.head.completed - } } @JsonIgnoreProperties( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index d22a19e8af74a..4e4395d0fb959 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -45,6 +45,7 @@ import org.scalatest.selenium.WebBrowser import org.apache.spark._ import org.apache.spark.deploy.history.config._ +import org.apache.spark.status.api.v1.ApplicationInfo import org.apache.spark.status.api.v1.JobData import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -505,6 +506,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers getAppUI.store.jobsList(List(JobExecutionStatus.RUNNING).asJava) } + def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = { + appInfo.attempts.nonEmpty && appInfo.attempts.head.completed + } + activeJobs() should have size 0 completedJobs() should have size 1 getNumJobs("") should be (1) @@ -537,7 +542,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n") } val jobcount = getNumJobs("/jobs") - assert(!provider.getListing().next.completed) + assert(!isApplicationCompleted(provider.getListing().next)) listApplications(false) should contain(appId) @@ -545,7 +550,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers resetSparkContext() // check the app is now found as completed eventually(stdTimeout, stdInterval) { - assert(provider.getListing().next.completed, + assert(isApplicationCompleted(provider.getListing().next), s"application never completed, server=$server\n") } From 9d46804e06668045e397b7c16f2b72fa1efecfbc Mon Sep 17 00:00:00 2001 From: Sandor Murakozi Date: Thu, 7 Dec 2017 23:16:07 +0100 Subject: [PATCH 3/4] Changes requested during PR review #2 --- .../scala/org/apache/spark/deploy/history/HistoryPage.scala | 5 ++--- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 67ac430765139..d3dd58996a0bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -31,8 +31,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(UIUtils.stripXSS(request.getParameter("showIncomplete"))).getOrElse("false").toBoolean - val allAppsSize = parent.getApplicationList(). - count(isApplicationCompleted(_) != requestedIncomplete) + val allAppsSize = parent.getApplicationList() + .count(isApplicationCompleted(_) != requestedIncomplete) val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess() val lastUpdatedTime = parent.getLastUpdatedTime() val providerConfig = parent.getProviderConfig() @@ -93,6 +93,5 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = { appInfo.attempts.nonEmpty && appInfo.attempts.head.completed - } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 58e41a822e533..22d768b3cb990 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -17,7 +17,8 @@ package org.apache.spark.status -import java.util.{List => JList} +import java.io.File +import java.util.{Arrays, List => JList} import scala.collection.JavaConverters._ @@ -25,7 +26,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.api.v1 import org.apache.spark.ui.scope._ -import org.apache.spark.util.{Distribution} +import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} /** From 68eb3b9607315bcdeab1e88a72b39d83e3eb56f6 Mon Sep 17 00:00:00 2001 From: Sandor Murakozi Date: Fri, 8 Dec 2017 09:25:36 +0100 Subject: [PATCH 4/4] Removed empty class body for ApplicationInfo --- core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 0c4ac90b1fe2d..45eaf935fb083 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -31,9 +31,7 @@ case class ApplicationInfo private[spark]( maxCores: Option[Int], coresPerExecutor: Option[Int], memoryPerExecutorMB: Option[Int], - attempts: Seq[ApplicationAttemptInfo]) { - -} + attempts: Seq[ApplicationAttemptInfo]) @JsonIgnoreProperties( value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"),