Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221

Closed
wants to merge 31 commits into from

Conversation

edwinalu
Copy link
Contributor

@edwinalu edwinalu commented May 2, 2018

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory, on/off heap unified memory, direct memory, and mapped memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

What changes were proposed in this pull request?

An ExecutorMetrics class is added, with jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is sent by executors to the driver as part of the Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver.

The EventLoggingListener store information about the peak values for each metric, per active stage and executor. When a StageCompleted event is seen, a StageExecutorsMetrics event will be logged for each executor, with peak values for the stage.

The AppStatusListener records the peak values for each memory metric.

The new memory metrics are added to the executors REST API.

How was this patch tested?

New unit tests have been added. This was also tested on our cluster.

…xecutors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can be used to help determine good
values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory metric.

Add the new memory metrics to the executors REST API.
@squito
Copy link
Contributor

squito commented May 3, 2018

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented May 3, 2018

Test build #90087 has finished for PR 21221 at commit ad10d28.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented May 9, 2018

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented May 9, 2018

Test build #90422 has finished for PR 21221 at commit ad10d28.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this for driver ? If spark run in local mode, there's a local executor, which will report heartbeat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With cluster mode, including YARN, there isn't a local executor, so the metrics for the driver would not be collected. Perhaps this could be modified to skip this step for local mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With cluster mode, including YARN, there isn't a local executor, so the metrics for the driver would not be collected.

Yes. But the problem is can we use executor's getCurrentExecutorMetrics() method for collecting memory metrics for driver ? IIRC, driver do not acqurie memory from execution memory pool at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit redundant for fields that aren't used by the driver -- for the driver, execution memory gets set to 0.

*/
private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) {
// Executor for the heartbeat task
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering should the prefix name of heartbeater thread be "executor-heartbeater" ?

Copy link
Contributor Author

@edwinalu edwinalu May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "heartbeater", since it could be for the driver as well? Alternatively, we can also pass in the name to the constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"pass in the name to the constructor" is better(if we do need to do this for the driver)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
executorMap.foreach {
executorEntry => {
for ((executorId, peakExecutorMetrics) <- executorEntry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about case (executorId, peakExecutorMetrics) => ? It would be more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for loop (line 187) is going through the hashmap entries of executorId to peakExecutorMetrics, so there are multiple values. Could you please provide more detail for how "case (executorId, peakExecutorMetrics) =>" would work? If the for loop is OK, then I can add some comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revisited the code, I think you're right. My mistake, sorry.

@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener(

// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
// log the peak executor metrics for the stage, for each executor
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]()
val executorMap = liveStageExecutorMetrics.remove(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always post a SparkListenerStageCompleted event for failed satges (I can't rememer clearly)? If not, I think we should clean up other attempts of the same stage here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's safer to clean up earlier attempts -- I can add some code to iterate through earlier attemptIDs.

* Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no
* values have been recorded yet.
*/
private[spark] class PeakExecutorMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this class? It seems ExecutorMetrics can already do the same work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got some errors when trying to add methods to ExecutorMetrics. I don't remember the details, but can try this again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you revisit this given the other refactoring that has taken place?

and if you do need this extra class, please include a comment here explaining the metrics array and referencing MetricGetter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With ExecutorMetrics removed, it seems useful to have a class for tracking and setting peak metric values, that can be used by both EventLoggingListener and AppStatusListener.

@@ -93,6 +94,10 @@ private[spark] class EventLoggingListener(
// Visible for tests only.
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

// map of live stages, to peak executor metrics for the stage
private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should track executor's memory metrics for each stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tracking peak metric values for executors for each stage, so that the peak values for the stage can be dumped at stage end. The purpose is to reduce the amount of logging, to only number of stages * number of executors ExecutorMetricsUpdate events.

I originally tried logging for new peak values, resetting when a new stage begins -- this is simpler, but can lead to more events being logged.

Having stage level information is useful for users trying to identify which stages are more memory intensive. This information could be useful they are trying to reduce the amount of memory used, since they would know which stages (and the relevant code) to focus on.

…enabled to enable/disable executor metrics update logging.

Code review comments.
@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90613 has finished for PR 21221 at commit 10ed328.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor things as I do another round and page things back in

def getCurrentExecutorMetrics(
memoryManager: MemoryManager,
direct: BufferPoolMXBean,
mapped: BufferPoolMXBean) : ExecutorMetrics = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make more sense to move this inside Heartbeater? Then you don't need to pass in any BufferPoolMXBeans. also rename to "getCurrentMemoryMetrics"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and easier to share the code between driver and executor.

@@ -81,7 +84,7 @@ private[spark] class EventLoggingListener(
private val compressionCodecName = compressionCodec.map { c =>
CompressionCodec.getShortName(c.getClass.getName)
}

logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecutorMetricsUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't really seem necessary at all, definitely not at INFO level (and indentation is wrong).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks, I hadn't meant to push that.

@@ -93,6 +96,10 @@ private[spark] class EventLoggingListener(
// Visible for tests only.
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

// map of live stages, to peak executor metrics for the stage
private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
mutable.HashMap[String, PeakExecutorMetrics]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could just import mutable.HashMap (added bonus -- fits on one line)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId))
}

// log the peak executor metrics for the stage, for each executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment here that this will log metrics for all executors that were alive while the stage was running, whether or not they ran any tasks for that stage (I think that's what it will do here, right?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's all running executors, and does not filter based on if they have tasks for the stage. I've updated the comment.

@@ -209,6 +210,16 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

/** driver heartbeat for collecting metrics */
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets not put this in the DAGScheduler please -- this class is fragile enough as it is :)

I think this should just go in SparkContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.

SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty))
}

/** Create an executor added event for the specified executor Id. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added -> removed

though for that matter -- I'd just remove the doc comments on all these teeny helper methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove -- they are pretty self-explanatory.

i += 1
}
checkEvent(lines(i), event)
i += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this pretty confusing at first. I suggest renaming i to logIdx and including a comment about the j loop. Also we tend to use (1 to 2).foreach. eg.

// just before the SparkListenerStageCompleted gets logged, we expect to get a 
// SparkListenerExecutorMetricsUpdate for each executor
(1 to 2).foreach { _ =>
  checkExecutorMetricsUpdate(lines(logIdx), stageCompleted.stageInfo.stageId,
    expectedMetricsEvents)
  logIdx += 1
}
// also check that we get the expected SparkListenerStageCompleted
checkEvent(lines(logIdx), event)
logIdx += 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed for both.

assert(line.contains(event.getClass.toString.split("\\.").last))
event match {
case executorMetrics: SparkListenerExecutorMetricsUpdate =>
JsonProtocol.sparkEventFromJson(parse(line)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can pull JsonProtocol.sparkEventFromJson(parse(line)) out to avoid repeating, along with the type comparison.

val parsed = JsonProtocol.sparkEventFromJson(parse(line))
assert(parsed.getClass === event.getClass)
event match {
 ...

(also assertTypeError does something else entirely: http://doc.scalatest.org/2.2.6/#org.scalatest.Assertions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, modified.

private def checkEvent(line: String, event: SparkListenerEvent): Unit = {
assert(line.contains(event.getClass.toString.split("\\.").last))
event match {
case executorMetrics: SparkListenerExecutorMetricsUpdate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're never using this w/ SparkListenerExecutorMetricsUpdate, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, with the change in design to logging the executor metrics updates at stage end, this part is skipped -- I'll remove this.

updated = true
}
if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
_offHeapStorageMemory = executorMetrics.offHeapStorageMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know spark has this kind of code all over the place already, but I really hate how error prone it is -- way too easy for a copy paste error to result in comparing the wrong two metrics, or updating the wrong value, or forgetting to update this when another metric is added, etc.

I just opened this edwinalu#1 as another way to do this that would eliminate a ton of boilerplate IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is cleaner, and will make it easier to add new metrics. It is very easy to have a copy/paste error. I can merge and make the test changes -- let me know if that sounds good, or if you'd like to make some more changes first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more you can take it over from here, the better :) But let me know if there is anything which is confusing, or if the TODOs that I've left actually don't seem possible etc. and I can take a closer look.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Thanks!

@SparkQA
Copy link

SparkQA commented Jun 3, 2018

Test build #91424 has finished for PR 21221 at commit 7879e66.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait MetricGetter
  • abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter
  • abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter

@felixcheung
Copy link
Member

ok to test

@felixcheung
Copy link
Member

probably need to be rebased

@SparkQA
Copy link

SparkQA commented Jun 10, 2018

Test build #91642 has finished for PR 21221 at commit 7879e66.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait MetricGetter
  • abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter
  • abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter

@edwinalu
Copy link
Contributor Author

@squito , I'm modifying ExecutorMetrics to take in the metrics array -- this will be easier for tests where we pass in set values, and seems fine for the actual code. It will check that the length of the passed in array is the same as MetricGetter.values.length. Let me know if you have any concerns.

@felixcheung , I'll finish the current changes, then rebase.

@edwinalu
Copy link
Contributor Author

@squito For PeakMemoryMetrics in api.scala, changing to the array gives REST API output of:

"peakMemoryMetrics" : {
"metrics" : [ 755008624, 100519936, 0, 0, 47962185, 0, 47962185, 0, 98230, 0 ]
}

instead of:

"peakMemoryMetrics" : {
"jvmUsedHeapMemory" : 629553808,
"jvmUsedNonHeapMemory" : 205304696,
"onHeapExecutionMemory" : 0,
"offHeapExecutionMemory" : 0,
"onHeapStorageMemory" : 905801,
"offHeapStorageMemory" : 0,
"onHeapUnifiedMemory" : 905801,
"offHeapUnifiedMemory" : 0,
"directMemory" : 397602,
"mappedMemory" : 0
}

Would it be OK to revert back to the original version of PeakMemoryMetrics, where each field is listed as a separate element?

@squito
Copy link
Contributor

squito commented Jun 11, 2018

well, I think you should change the way PeakExecutorMetrics gets converted to json, so that it uses a name from the relevant MetricGetter. You should be able to customize the way it gets converted to json here:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala#L50

edwinalu and others added 6 commits June 13, 2018 16:23
… move logic for getting

metrics to Heartbeater), and modifiy tests for the new ExecutorMetrics format.
…xecutors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can be used to help determine good
values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory metric.

Add the new memory metrics to the executors REST API.
…enabled to enable/disable executor metrics update logging.

Code review comments.
Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, minor comments

@@ -216,8 +217,7 @@ private[spark] class Executor(

def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
heartbeater.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future: try {} catch { case NonFatal(e)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be "false" for now until we could test this out more, just to be on the safe side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be safer. I'll change to false, and we can change change to true after people have had a chance to test it out.

@felixcheung
Copy link
Member

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94842 has finished for PR 21221 at commit a14b82a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 17, 2018

Test build #94865 has finished for PR 21221 at commit 2897281.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor

mccheah commented Aug 20, 2018

We're going to delay on merging this until after the 2.4 branch is cut. We can include this in Spark 2.5.

@mccheah
Copy link
Contributor

mccheah commented Sep 6, 2018

@edwinalu - this can merge now that Spark 2.4's release branch has been cut, but there's conflicting files now. Can we clear the conflicts and then we can merge this?

@SparkQA
Copy link

SparkQA commented Sep 7, 2018

Test build #95776 has finished for PR 21221 at commit ee4aa1d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate(
onEvent(executorMetricsUpdate);
}

@Override
public final void onStageExecutorMetrics(
SparkListenerStageExecutorMetrics executorMetrics) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra spaces for better indent

@SparkQA
Copy link

SparkQA commented Sep 7, 2018

Test build #95801 has finished for PR 21221 at commit 571285b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor

mccheah commented Sep 7, 2018

Thanks, I think this looks good. With a prior +1 from @felixcheung and @squito I'm going to merge this now. Let us know if there are any further concerns and we can follow up.

@asfgit asfgit closed this in 9241e1e Sep 7, 2018
@edwinalu
Copy link
Contributor Author

edwinalu commented Sep 7, 2018

Thanks!

@gatorsmile
Copy link
Member

For the other reviewers, this was merged to master (not 2.4)

@gatorsmile
Copy link
Member

@mccheah When you merged the code, could you also leave the comments about which branches you did the merge?

dbtsai pushed a commit that referenced this pull request Sep 13, 2019
### What changes were proposed in this pull request?

At Apache Spark 3.0.0, [SPARK-23429](#21221) added the ability to collect executor metrics via heartbeats and to expose it as a REST API. This PR aims to extend it to support `Prometheus` format additionally.

### Why are the changes needed?

Prometheus.io is a CNCF project used widely with K8s.
- https://github.com/prometheus/prometheus

### Does this PR introduce any user-facing change?

Yes. New web interfaces are added along with the existing JSON API.

|              |                JSON End Point                    |            Prometheus End Point         |
| ------- | ------------------------------------ | --------------------------------- |
| Driver   | /api/v1/applications/{id}/executors/   | /metrics/executors/prometheus/   |

### How was this patch tested?

Manually connect to the new end-points with `curl` and compare with JSON.

**SETUP**
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://`hostname`:7077
$ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true
```

**JSON (existing after SPARK-23429)**
```
$ curl -s http://localhost:4040/api/v1/applications/app-20190911204823-0000/executors
[ {
  "id" : "driver",
  "hostPort" : "localhost:52615",
  "isActive" : true,
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "totalCores" : 0,
  "maxTasks" : 0,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalGCTime" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "isBlacklisted" : false,
  "maxMemory" : 384093388,
  "addTime" : "2019-09-12T03:48:23.875GMT",
  "executorLogs" : { },
  "memoryMetrics" : {
    "usedOnHeapStorageMemory" : 0,
    "usedOffHeapStorageMemory" : 0,
    "totalOnHeapStorageMemory" : 384093388,
    "totalOffHeapStorageMemory" : 0
  },
  "blacklistedInStages" : [ ],
  "peakMemoryMetrics" : {
    "JVMHeapMemory" : 229995952,
    "JVMOffHeapMemory" : 145872280,
    "OnHeapExecutionMemory" : 0,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 0,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 0,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 75891,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 0,
    "ProcessTreeJVMRSSMemory" : 0,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 8,
    "MinorGCTime" : 82,
    "MajorGCCount" : 3,
    "MajorGCTime" : 128
  },
  "attributes" : { },
  "resources" : { }
}, {
  "id" : "0",
  "hostPort" : "127.0.0.1:52619",
  "isActive" : true,
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "totalCores" : 16,
  "maxTasks" : 16,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalGCTime" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "isBlacklisted" : false,
  "maxMemory" : 384093388,
  "addTime" : "2019-09-12T03:48:25.907GMT",
  "executorLogs" : {
    "stdout" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stdout",
    "stderr" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stderr"
  },
  "memoryMetrics" : {
    "usedOnHeapStorageMemory" : 0,
    "usedOffHeapStorageMemory" : 0,
    "totalOnHeapStorageMemory" : 384093388,
    "totalOffHeapStorageMemory" : 0
  },
  "blacklistedInStages" : [ ],
  "attributes" : { },
  "resources" : { }
} ]
```

**Prometheus**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus
metrics_app_20190911204823_0000_driver_executor_rddBlocks_Count 0
metrics_app_20190911204823_0000_driver_executor_memoryUsed_Count 0
metrics_app_20190911204823_0000_driver_executor_diskUsed_Count 0
metrics_app_20190911204823_0000_driver_executor_totalCores_Count 0
metrics_app_20190911204823_0000_driver_executor_maxTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_activeTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_failedTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_completedTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_totalTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_totalDuration_Value 0
metrics_app_20190911204823_0000_driver_executor_totalGCTime_Value 0
metrics_app_20190911204823_0000_driver_executor_totalInputBytes_Count 0
metrics_app_20190911204823_0000_driver_executor_totalShuffleRead_Count 0
metrics_app_20190911204823_0000_driver_executor_totalShuffleWrite_Count 0
metrics_app_20190911204823_0000_driver_executor_maxMemory_Count 384093388
metrics_app_20190911204823_0000_driver_executor_usedOnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_usedOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_totalOnHeapStorageMemory_Count 384093388
metrics_app_20190911204823_0000_driver_executor_totalOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_JVMHeapMemory_Count 230406336
metrics_app_20190911204823_0000_driver_executor_JVMOffHeapMemory_Count 146132592
metrics_app_20190911204823_0000_driver_executor_OnHeapExecutionMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapExecutionMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OnHeapUnifiedMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapUnifiedMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_DirectPoolMemory_Count 97049
metrics_app_20190911204823_0000_driver_executor_MappedPoolMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_MinorGCCount_Count 8
metrics_app_20190911204823_0000_driver_executor_MinorGCTime_Count 82
metrics_app_20190911204823_0000_driver_executor_MajorGCCount_Count 3
metrics_app_20190911204823_0000_driver_executor_MajorGCTime_Count 128
metrics_app_20190911204823_0000_0_executor_rddBlocks_Count 0
metrics_app_20190911204823_0000_0_executor_memoryUsed_Count 0
metrics_app_20190911204823_0000_0_executor_diskUsed_Count 0
metrics_app_20190911204823_0000_0_executor_totalCores_Count 16
metrics_app_20190911204823_0000_0_executor_maxTasks_Count 16
metrics_app_20190911204823_0000_0_executor_activeTasks_Count 0
metrics_app_20190911204823_0000_0_executor_failedTasks_Count 0
metrics_app_20190911204823_0000_0_executor_completedTasks_Count 0
metrics_app_20190911204823_0000_0_executor_totalTasks_Count 0
metrics_app_20190911204823_0000_0_executor_totalDuration_Value 0
metrics_app_20190911204823_0000_0_executor_totalGCTime_Value 0
metrics_app_20190911204823_0000_0_executor_totalInputBytes_Count 0
metrics_app_20190911204823_0000_0_executor_totalShuffleRead_Count 0
metrics_app_20190911204823_0000_0_executor_totalShuffleWrite_Count 0
metrics_app_20190911204823_0000_0_executor_maxMemory_Count 384093388
metrics_app_20190911204823_0000_0_executor_usedOnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_0_executor_usedOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_0_executor_totalOnHeapStorageMemory_Count 384093388
metrics_app_20190911204823_0000_0_executor_totalOffHeapStorageMemory_Count 0
```

Closes #25770 from dongjoon-hyun/SPARK-29064.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Sep 15, 2019
### What changes were proposed in this pull request?

At Apache Spark 3.0.0, [SPARK-23429](apache#21221) added the ability to collect executor metrics via heartbeats and to expose it as a REST API. This PR aims to extend it to support `Prometheus` format additionally.

### Why are the changes needed?

Prometheus.io is a CNCF project used widely with K8s.
- https://github.com/prometheus/prometheus

### Does this PR introduce any user-facing change?

Yes. New web interfaces are added along with the existing JSON API.

|              |                JSON End Point                    |            Prometheus End Point         |
| ------- | ------------------------------------ | --------------------------------- |
| Driver   | /api/v1/applications/{id}/executors/   | /metrics/executors/prometheus/   |

### How was this patch tested?

Manually connect to the new end-points with `curl` and compare with JSON.

**SETUP**
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://`hostname`:7077
$ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true
```

**JSON (existing after SPARK-23429)**
```
$ curl -s http://localhost:4040/api/v1/applications/app-20190911204823-0000/executors
[ {
  "id" : "driver",
  "hostPort" : "localhost:52615",
  "isActive" : true,
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "totalCores" : 0,
  "maxTasks" : 0,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalGCTime" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "isBlacklisted" : false,
  "maxMemory" : 384093388,
  "addTime" : "2019-09-12T03:48:23.875GMT",
  "executorLogs" : { },
  "memoryMetrics" : {
    "usedOnHeapStorageMemory" : 0,
    "usedOffHeapStorageMemory" : 0,
    "totalOnHeapStorageMemory" : 384093388,
    "totalOffHeapStorageMemory" : 0
  },
  "blacklistedInStages" : [ ],
  "peakMemoryMetrics" : {
    "JVMHeapMemory" : 229995952,
    "JVMOffHeapMemory" : 145872280,
    "OnHeapExecutionMemory" : 0,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 0,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 0,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 75891,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 0,
    "ProcessTreeJVMRSSMemory" : 0,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 8,
    "MinorGCTime" : 82,
    "MajorGCCount" : 3,
    "MajorGCTime" : 128
  },
  "attributes" : { },
  "resources" : { }
}, {
  "id" : "0",
  "hostPort" : "127.0.0.1:52619",
  "isActive" : true,
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "totalCores" : 16,
  "maxTasks" : 16,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalGCTime" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "isBlacklisted" : false,
  "maxMemory" : 384093388,
  "addTime" : "2019-09-12T03:48:25.907GMT",
  "executorLogs" : {
    "stdout" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stdout",
    "stderr" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stderr"
  },
  "memoryMetrics" : {
    "usedOnHeapStorageMemory" : 0,
    "usedOffHeapStorageMemory" : 0,
    "totalOnHeapStorageMemory" : 384093388,
    "totalOffHeapStorageMemory" : 0
  },
  "blacklistedInStages" : [ ],
  "attributes" : { },
  "resources" : { }
} ]
```

**Prometheus**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus
metrics_app_20190911204823_0000_driver_executor_rddBlocks_Count 0
metrics_app_20190911204823_0000_driver_executor_memoryUsed_Count 0
metrics_app_20190911204823_0000_driver_executor_diskUsed_Count 0
metrics_app_20190911204823_0000_driver_executor_totalCores_Count 0
metrics_app_20190911204823_0000_driver_executor_maxTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_activeTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_failedTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_completedTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_totalTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_totalDuration_Value 0
metrics_app_20190911204823_0000_driver_executor_totalGCTime_Value 0
metrics_app_20190911204823_0000_driver_executor_totalInputBytes_Count 0
metrics_app_20190911204823_0000_driver_executor_totalShuffleRead_Count 0
metrics_app_20190911204823_0000_driver_executor_totalShuffleWrite_Count 0
metrics_app_20190911204823_0000_driver_executor_maxMemory_Count 384093388
metrics_app_20190911204823_0000_driver_executor_usedOnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_usedOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_totalOnHeapStorageMemory_Count 384093388
metrics_app_20190911204823_0000_driver_executor_totalOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_JVMHeapMemory_Count 230406336
metrics_app_20190911204823_0000_driver_executor_JVMOffHeapMemory_Count 146132592
metrics_app_20190911204823_0000_driver_executor_OnHeapExecutionMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapExecutionMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OnHeapUnifiedMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapUnifiedMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_DirectPoolMemory_Count 97049
metrics_app_20190911204823_0000_driver_executor_MappedPoolMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_MinorGCCount_Count 8
metrics_app_20190911204823_0000_driver_executor_MinorGCTime_Count 82
metrics_app_20190911204823_0000_driver_executor_MajorGCCount_Count 3
metrics_app_20190911204823_0000_driver_executor_MajorGCTime_Count 128
metrics_app_20190911204823_0000_0_executor_rddBlocks_Count 0
metrics_app_20190911204823_0000_0_executor_memoryUsed_Count 0
metrics_app_20190911204823_0000_0_executor_diskUsed_Count 0
metrics_app_20190911204823_0000_0_executor_totalCores_Count 16
metrics_app_20190911204823_0000_0_executor_maxTasks_Count 16
metrics_app_20190911204823_0000_0_executor_activeTasks_Count 0
metrics_app_20190911204823_0000_0_executor_failedTasks_Count 0
metrics_app_20190911204823_0000_0_executor_completedTasks_Count 0
metrics_app_20190911204823_0000_0_executor_totalTasks_Count 0
metrics_app_20190911204823_0000_0_executor_totalDuration_Value 0
metrics_app_20190911204823_0000_0_executor_totalGCTime_Value 0
metrics_app_20190911204823_0000_0_executor_totalInputBytes_Count 0
metrics_app_20190911204823_0000_0_executor_totalShuffleRead_Count 0
metrics_app_20190911204823_0000_0_executor_totalShuffleWrite_Count 0
metrics_app_20190911204823_0000_0_executor_maxMemory_Count 384093388
metrics_app_20190911204823_0000_0_executor_usedOnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_0_executor_usedOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_0_executor_totalOnHeapStorageMemory_Count 384093388
metrics_app_20190911204823_0000_0_executor_totalOffHeapStorageMemory_Count 0
```

Closes apache#25770 from dongjoon-hyun/SPARK-29064.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
10 participants