From 657ec97e1e13abb9e379d45cb5a594779b81fca3 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 10 Sep 2015 12:50:15 +0800 Subject: [PATCH] SPARK-10526 Display cores/memory on ExecutorsTab --- .../spark/deploy/worker/ExecutorRunner.scala | 1 + .../CoarseGrainedExecutorBackend.scala | 16 +++++++--- .../apache/spark/scheduler/WorkerOffer.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 1 + .../CoarseGrainedSchedulerBackend.scala | 8 ++--- .../scheduler/cluster/ExecutorData.scala | 3 +- .../scheduler/cluster/ExecutorInfo.scala | 1 + .../cluster/SparkDeploySchedulerBackend.scala | 1 + .../mesos/CoarseMesosSchedulerBackend.scala | 4 +++ .../cluster/mesos/MesosSchedulerBackend.scala | 10 ++++--- .../spark/scheduler/local/LocalBackend.scala | 4 +-- .../status/api/v1/ExecutorListResource.scala | 4 +-- .../org/apache/spark/status/api/v1/api.scala | 2 ++ .../apache/spark/ui/exec/ExecutorsPage.scala | 27 +++++++++++++---- .../apache/spark/ui/exec/ExecutorsTab.scala | 8 +++-- .../org/apache/spark/ui/jobs/UIData.scala | 2 ++ .../org/apache/spark/util/JsonProtocol.scala | 4 ++- .../ExecutorAllocationManagerSuite.scala | 12 ++++---- .../apache/spark/HeartbeatReceiverSuite.scala | 4 +-- .../StandaloneDynamicAllocationSuite.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 30 +++++++++++-------- .../mesos/MesosSchedulerBackendSuite.scala | 17 ++++++----- .../apache/spark/util/JsonProtocolSuite.scala | 5 ++-- .../spark/deploy/yarn/ExecutorRunnable.scala | 1 + 24 files changed, 112 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 3aef0515cbf6e..304dda7e4af32 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -116,6 +116,7 @@ private[deploy] class ExecutorRunner( case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => host case "{{CORES}}" => cores.toString + case "{{MEMORY}}" => memory + "MB" case "{{APP_ID}}" => appId case other => other } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fcd76ec52742a..c239edb5b40e7 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{MemoryParam, ThreadUtils, SignalLogger, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -41,6 +41,7 @@ private[spark] class CoarseGrainedExecutorBackend( executorId: String, hostPort: String, cores: Int, + memory: Long, userClassPath: Seq[URL], env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { @@ -60,7 +61,7 @@ private[spark] class CoarseGrainedExecutorBackend( // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[RegisteredExecutor.type]( - RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) + RegisterExecutor(executorId, self, hostPort, cores, memory, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { @@ -140,6 +141,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { executorId: String, hostname: String, cores: Int, + memory: Long, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { @@ -190,7 +192,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Start the CoarseGrainedExecutorBackend endpoint. val sparkHostPort = hostname + ":" + boundPort env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( - env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) + env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, memory, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } @@ -204,6 +206,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var executorId: String = null var hostname: String = null var cores: Int = 0 + var memory: Long = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -223,6 +226,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--cores") :: value :: tail => cores = value.toInt argv = tail + case ("--memory") :: MemoryParam(value) :: tail => + // convert from megabyte to byte + memory = value.toLong * 1024 * 1024 + argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail @@ -247,7 +254,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } - run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) + run(driverUrl, executorId, hostname, cores, memory, appId, workerUrl, userClassPath) } private def printUsageAndExit() = { @@ -261,6 +268,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --executor-id | --hostname | --cores + | --memory | --app-id | --worker-url | --user-class-path diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index 810b36cddf835..699a8746ac6d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,4 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -case class WorkerOffer(executorId: String, host: String, cores: Int) +case class WorkerOffer(executorId: String, host: String, cores: Int, memory: Long) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 06f5438433b6e..94f99108e948b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -45,6 +45,7 @@ private[spark] object CoarseGrainedClusterMessages { executorRef: RpcEndpointRef, hostPort: String, cores: Int, + memory: Long, logUrls: Map[String, String]) extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5730a87f960a0..20d7b0f02d945 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -128,7 +128,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) => + case RegisterExecutor(executorId, executorRef, hostPort, cores, memory, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) @@ -138,7 +138,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) - val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls) + val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, memory, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { @@ -179,7 +179,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_)) val workOffers = activeExecutors.map { case (id, executorData) => - new WorkerOffer(id, executorData.executorHost, executorData.freeCores) + new WorkerOffer(id, executorData.executorHost, executorData.freeCores, executorData.totalMemory) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) } @@ -195,7 +195,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (!executorsPendingToRemove.contains(executorId)) { val executorData = executorDataMap(executorId) val workOffers = Seq( - new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) + new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, executorData.totalMemory)) launchTasks(scheduler.resourceOffers(workOffers)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index 626a2b7d69abe..47d2d4c7f23c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -34,5 +34,6 @@ private[cluster] class ExecutorData( override val executorHost: String, var freeCores: Int, override val totalCores: Int, + override val totalMemory: Long, override val logUrlMap: Map[String, String] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap) +) extends ExecutorInfo(executorHost, totalCores, totalMemory, logUrlMap) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 7f218566146a1..f108881aaf269 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -26,6 +26,7 @@ import org.apache.spark.annotation.DeveloperApi class ExecutorInfo( val executorHost: String, val totalCores: Int, + val totalMemory: Long, val logUrlMap: Map[String, String]) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bbe51b4a09a22..2a05f0034a7dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -57,6 +57,7 @@ private[spark] class SparkDeploySchedulerBackend( "--executor-id", "{{EXECUTOR_ID}}", "--hostname", "{{HOSTNAME}}", "--cores", "{{CORES}}", + "--memory", "{{MEMORY}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 452c32d5411cd..77a71f7791e6c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -168,6 +168,7 @@ private[spark] class CoarseMesosSchedulerBackend( val uri = conf.getOption("spark.executor.uri") .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) + val memory = getResource(offer.getResourcesList, "mem").toLong*1024*1024 if (uri.isEmpty) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath @@ -178,7 +179,9 @@ private[spark] class CoarseMesosSchedulerBackend( s" --executor-id ${offer.getSlaveId.getValue}" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + + s" --memory $memory" + s" --app-id $appId") + } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". @@ -191,6 +194,7 @@ private[spark] class CoarseMesosSchedulerBackend( s" --executor-id $executorId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + + s" --memory $memory" + s" --app-id $appId") command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 2e424054be785..e58fd3fef1af1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -246,10 +246,12 @@ private[spark] class MesosSchedulerBackend( // cores for the Mesos executor by offering fewer cores to the Spark executor (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt } + val memory = getResource(o.getResourcesList, "mem").toLong*1024*1024 new WorkerOffer( o.getSlaveId.getValue, o.getHostname, - cpus) + cpus, + memory) } val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap @@ -285,11 +287,11 @@ private[spark] class MesosSchedulerBackend( val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? mesosTasks.foreach { case (slaveId, tasks) => - slaveIdToWorkerOffer.get(slaveId).foreach(o => + slaveIdToWorkerOffer.get(slaveId).foreach(o => { listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, // TODO: Add support for log urls for Mesos - new ExecutorInfo(o.host, o.cores, Map.empty))) - ) + new ExecutorInfo(o.host, o.cores, o.memory, Map.empty))) + }) logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 4d48fcfea44e7..35f13b4e8d407 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -79,7 +79,7 @@ private[spark] class LocalEndpoint( } def reviveOffers() { - val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) + val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores, Runtime.getRuntime.freeMemory())) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, @@ -121,7 +121,7 @@ private[spark] class LocalBackend( listenerBus.post(SparkListenerExecutorAdded( System.currentTimeMillis, executorEndpoint.localExecutorId, - new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) + new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Runtime.getRuntime.maxMemory(), Map.empty))) } override def stop() { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 8ad4656b4dada..76dc1774970f5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -29,8 +29,8 @@ private[v1] class ExecutorListResource(ui: SparkUI) { def executorList(): Seq[ExecutorSummary] = { val listener = ui.executorsListener val storageStatusList = listener.storageStatusList - (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId) + storageStatusList.map { status => + ExecutorsPage.getExecInfo(listener, status) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 2bec64f2ef02b..089347d214153 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -48,6 +48,8 @@ class ExecutorStageSummary private[spark]( class ExecutorSummary private[spark]( val id: String, val hostPort: String, + val totalCores: Int, + val totalMemory: Long, val rddBlocks: Int, val memoryUsed: Long, val diskUsed: Long, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 01cddda4c62cd..91498083046f4 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -20,6 +20,9 @@ package org.apache.spark.ui.exec import java.net.URLEncoder import javax.servlet.http.HttpServletRequest +import org.apache.spark.storage.StorageStatus +import org.apache.spark.ui.jobs.UIData.ExecutorUIData + import scala.xml.Node import org.apache.spark.status.api.v1.ExecutorSummary @@ -52,12 +55,12 @@ private[ui] class ExecutorsPage( private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.storageStatusList.filter(!_.blockManagerId.isDriver) val maxMem = storageStatusList.map(_.maxMem).sum val memUsed = storageStatusList.map(_.memUsed).sum val diskUsed = storageStatusList.map(_.diskUsed).sum - val execInfo = for (statusId <- 0 until storageStatusList.size) yield - ExecutorsPage.getExecInfo(listener, statusId) + val execInfo = storageStatusList.map(status=> + ExecutorsPage.getExecInfo(listener, status)) val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty @@ -66,6 +69,8 @@ private[ui] class ExecutorsPage( Executor ID Address + Cores + Memory RDD Blocks Storage Memory Disk Used @@ -120,6 +125,8 @@ private[ui] class ExecutorsPage( {info.id} {info.hostPort} + {info.totalCores} + {Utils.bytesToString(info.totalMemory)} {info.rddBlocks} {Utils.bytesToString(memoryUsed)} / @@ -176,10 +183,18 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { - val status = listener.storageStatusList(statusId) + def getExecInfo(listener: ExecutorsListener, status: StorageStatus): ExecutorSummary = { val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort + val executorData = listener.executorIdToData.get(execId) + var totalCores = 0 + var totalMemory = 0L + listener.executorIdToData.get(execId) match { + case Some(d) => + totalCores = d.cores + totalMemory = d.memory + case _ => + } val rddBlocks = status.numBlocks val memUsed = status.memUsed val maxMem = status.maxMem @@ -197,6 +212,8 @@ private[spark] object ExecutorsPage { new ExecutorSummary( execId, hostPort, + totalCores, + totalMemory, rddBlocks, memUsed, diskUsed, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index a88fc4c37d3c9..ecd20aad14b57 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.HashMap -import org.apache.spark.{Resubmitted, ExceptionFailure, SparkContext} +import org.apache.spark.{Logging, Resubmitted, ExceptionFailure, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -43,7 +43,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec * A SparkListener that prepares information to be displayed on the ExecutorsTab */ @DeveloperApi -class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { +class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener with Logging { val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() @@ -62,7 +62,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap - executorIdToData(eid) = ExecutorUIData(executorAdded.time) + log.info("executor added:" + executorAdded.executorId) + executorIdToData(eid) = ExecutorUIData(executorAdded.executorInfo.totalCores, + executorAdded.executorInfo.totalMemory, executorAdded.time) } override def onExecutorRemoved( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index f008d40180611..22ea2170f3570 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -111,6 +111,8 @@ private[spark] object UIData { var errorMessage: Option[String] = None) case class ExecutorUIData( + val cores: Int, + val memory: Long, val startTime: Long, var finishTime: Option[Long] = None, var finishReason: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index cbc94fd6d54d9..e445b8102f9d7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -419,6 +419,7 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ ("Total Cores" -> executorInfo.totalCores) ~ + ("Total Memory" -> executorInfo.totalMemory) ~ ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) } @@ -873,8 +874,9 @@ private[spark] object JsonProtocol { def executorInfoFromJson(json: JValue): ExecutorInfo = { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] + val totalMemory = (json \ "Total Memory").extract[Long] val logUrls = mapFromJson(json \ "Log Urls").toMap - new ExecutorInfo(executorHost, totalCores, logUrls) + new ExecutorInfo(executorHost, totalCores, totalMemory, logUrls) } /** -------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 116f027a0f987..26ae8fa2a3bc4 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -155,7 +155,7 @@ class ExecutorAllocationManagerSuite // Verify that running a task doesn't affect the target sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) sc.listenerBus.postToAll(SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, 1024*1024*1024L, Map.empty))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) @@ -615,13 +615,13 @@ class ExecutorAllocationManagerSuite // New executors have registered sc.listenerBus.postToAll(SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, 1024*1024*1024L, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) sc.listenerBus.postToAll(SparkListenerExecutorAdded( - 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty))) + 0L, "executor-2", new ExecutorInfo("host2", 1, 1024*1024*1024L, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 2) @@ -648,7 +648,7 @@ class ExecutorAllocationManagerSuite sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) sc.listenerBus.postToAll(SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, 1024*1024*1024L, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) @@ -660,7 +660,7 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) sc.listenerBus.postToAll(SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, 1024*1024*1024L, Map.empty))) sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) @@ -668,7 +668,7 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 0) sc.listenerBus.postToAll(SparkListenerExecutorAdded( - 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-2", new ExecutorInfo("host1", 1, 1024*1024*1024L, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 1) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 139b8dc25f4b4..b0b7918efdbe8 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -171,9 +171,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( - RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty)) + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, 0, Map.empty)) fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( - RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty)) + RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null)) heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null)) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0d309ce..27735970b68e1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -374,7 +374,7 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty) + val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, 1024*1024*1024L, Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index c2edd4c317d6e..4700ae7b06955 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -39,8 +39,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L } val numFreeCores = 1 - val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores), - new WorkerOffer("executor1", "host1", numFreeCores)) + val totalMemory = 1024*1024*1024L + val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores, totalMemory), + new WorkerOffer("executor1", "host1", numFreeCores, totalMemory)) // Repeatedly try to schedule a 1-task job, and make sure that it doesn't always // get scheduled on the same executor. While there is a chance this test will fail // because the task randomly gets placed on the first executor all 1000 times, the @@ -62,6 +63,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L test("Scheduler correctly accounts for multiple CPUs per task") { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskCpus = 2 + val totalMemory = 1024*1024*1024L sc.conf.set("spark.task.cpus", taskCpus.toString) val taskScheduler = new TaskSchedulerImpl(sc) @@ -72,8 +74,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L override def executorAdded(execId: String, host: String) {} } // Give zero core offers. Should not generate any tasks - val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), - new WorkerOffer("executor1", "host1", 0)) + val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0, totalMemory), + new WorkerOffer("executor1", "host1", 0, totalMemory)) val taskSet = FakeTask.createTaskSet(1) taskScheduler.submitTasks(taskSet) var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten @@ -81,16 +83,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L // No tasks should run as we only have 1 core free. val numFreeCores = 1 - val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores), - new WorkerOffer("executor1", "host1", numFreeCores)) + val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores, totalMemory), + new WorkerOffer("executor1", "host1", numFreeCores, totalMemory)) taskScheduler.submitTasks(taskSet) taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten assert(0 === taskDescriptions.length) // Now change the offers to have 2 cores in one executor and verify if it // is chosen. - val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), - new WorkerOffer("executor1", "host1", numFreeCores)) + val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus, totalMemory), + new WorkerOffer("executor1", "host1", numFreeCores, totalMemory)) taskScheduler.submitTasks(taskSet) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(1 === taskDescriptions.length) @@ -100,6 +102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L test("Scheduler does not crash when tasks are not serializable") { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskCpus = 2 + val totalMemory = 1024*1024*1024L sc.conf.set("spark.task.cpus", taskCpus.toString) val taskScheduler = new TaskSchedulerImpl(sc) @@ -113,8 +116,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L taskScheduler.setDAGScheduler(dagScheduler) val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) - val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), - new WorkerOffer("executor1", "host1", numFreeCores)) + val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus, totalMemory), + new WorkerOffer("executor1", "host1", numFreeCores, totalMemory)) taskScheduler.submitTasks(taskSet) var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(0 === taskDescriptions.length) @@ -158,6 +161,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. new DAGScheduler(sc, taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} @@ -165,7 +169,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L } val numFreeCores = 1 - val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) + val totalMemory = 1024*1024*1024L + val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores, totalMemory)) val attempt1 = FakeTask.createTaskSet(10) // submit attempt 1, offer some resources, some tasks get scheduled @@ -203,7 +208,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L } val numFreeCores = 10 - val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) + val totalMemory = 1024*1024*1024L + val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores, totalMemory)) val attempt1 = FakeTask.createTaskSet(10) // submit attempt 1, offer some resources, some tasks get scheduled diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index 319b3173e7a6e..ebd2428cae89d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -48,7 +48,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 1024*1024*1024L, Map.empty))) val sc = mock[SparkContext] when(sc.getSparkHome()).thenReturn(Option("/spark-home")) @@ -87,7 +87,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 1024*1024*1024L, Map.empty))) val sc = mock[SparkContext] when(sc.executorMemory).thenReturn(100) @@ -146,7 +146,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 1024*1024*1024L, Map.empty))) val sc = mock[SparkContext] when(sc.executorMemory).thenReturn(100) @@ -169,12 +169,14 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, - (minCpu - backend.mesosExecutorCores).toInt + (minCpu - backend.mesosExecutorCores).toInt, + 1024*1024*1024L )) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(2).getSlaveId.getValue, mesosOffers.get(2).getHostname, - (minCpu - backend.mesosExecutorCores).toInt + (minCpu - backend.mesosExecutorCores).toInt, + 1024*1024*1024L )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) @@ -227,7 +229,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 1024*1024*1204L, Map.empty))) val sc = mock[SparkContext] when(sc.executorMemory).thenReturn(100) @@ -273,7 +275,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, - 2 // Deducting 1 for executor + 2, // Deducting 1 for executor + 1024*1024*1024L )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 343a4139b0ca8..f15557a9beb0d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -81,7 +81,7 @@ class JsonProtocolSuite extends SparkFunSuite { 42L, "Garfield", Some("appAttempt"), Some(logUrlMap)) val applicationEnd = SparkListenerApplicationEnd(42L) val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) + new ExecutorInfo("Hostee.awesome.com", 11, 1024*1024*1024L, logUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq( (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, @@ -116,7 +116,7 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) - testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap)) + testExecutorInfo(new ExecutorInfo("host", 43, 1024*1024*1024L, logUrlMap)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -1596,6 +1596,7 @@ class JsonProtocolSuite extends SparkFunSuite { | "Executor Info": { | "Host": "Hostee.awesome.com", | "Total Cores": 11, + | "Total Memory": 1073741824, | "Log Urls" : { | "stderr" : "mystderr", | "stdout" : "mystdout" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 9abd09b3cc7a5..af86a52046db5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -225,6 +225,7 @@ class ExecutorRunnable( "--executor-id", slaveId.toString, "--hostname", hostname.toString, "--cores", executorCores.toString, + "--memory", executorMemoryString, "--app-id", appId) ++ userClassPath ++ Seq(