Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24958][WIP] Report executors' process tree total memory information to heartbeat signals #21916

Closed
wants to merge 36 commits into from

Conversation

rezasafi
Copy link
Contributor

@rezasafi rezasafi commented Jul 30, 2018

This is a work in progress for SPARK-24958 and this PR is opened on top of the PR for SPARK-23429:
#21221
To view the changes that are only related to SPARK-24958 you can check the following view:
rezasafi#1
Spark executors' process tree total memory information can be really useful. Currently such information are not available. The goal of this PR is to compute such information for each executor, add these information to the heartbeat signals, and compute the peaks at the driver.

This PR is tested by running the current unit tests and the ones that are added by the PR for SPARK-23429. I have also tested this on our internal cluster and have verified that it is working.

edwinalu and others added 24 commits May 2, 2018 14:58
…xecutors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can be used to help determine good
values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory metric.

Add the new memory metrics to the executors REST API.
…enabled to enable/disable executor metrics update logging.

Code review comments.
… move logic for getting

metrics to Heartbeater), and modifiy tests for the new ExecutorMetrics format.
…xecutors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can be used to help determine good
values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory metric.

Add the new memory metrics to the executors REST API.
…enabled to enable/disable executor metrics update logging.

Code review comments.
… move logic for getting

metrics to Heartbeater), and modifiy tests for the new ExecutorMetrics format.
- remove timestamp
- change ExecutorMetrics to Array[Long]
- create new SparkListenerStageExecutorMetrics for recording stage executor metric peaks in
  the history log

Fix issue where metrics for a removed executor were ignored (save dead executors while
there currently active stages that the executor was alive for).
…nerExecutorMetricsUpdate

not optional. These are no longer logged, and backward compatibility should not be an issue.
These events should only be used to send task and executor updates for heartbeats, and
executors and driver should be the same Spark version.
… optional again, in

case of existing users of SparkListenerExecutorMetricsUpdate.
…nd add ExecutorMetrics, with getMetricValue()

method for accessing executor metric values. Rename MetricGetter to ExecutorMetricType.

Should ExecutorMetricType be moved to executor package, or ExecutorMetrics be moved to metrics package?
Should Json (de)serialization functions be moved from api.scala to ExecutorMetrics?
@holdensmagicalunicorn
Copy link

@rezasafi, thanks! I am a bot who has found some folks who might be able to help with the review:@JoshRosen, @vanzin and @pwendell

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@ankuriitg ankuriitg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Some minor comments about code structure.

}


def updateProcessTree(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just create a new Process tree instead of updating the existing tree?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ankuriitg for the review. I will apply your comments ASAP. For this one I may do some other improvements before just creating the process tree each time. I understand in this version updating looks more complex than just recreating it.


def getChildPIds(pid: Int): ArrayBuffer[Int] = {
val cmd = Array("pgrep", "-P", pid.toString)
val input = Runtime.getRuntime.exec(cmd).getInputStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be better to handle exceptions, if any

val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n")
val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]()
for(p <- childPids ) {
if (p.matches("[0-9][0-9]*")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it same as "[0-9]+"? If yes, then that will be concise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change this, since my goal was first to avoid cases when pgrep -P pid return error messages or something else, but it seems that it won't.

for(p <- childPids ) {
if (p.matches("[0-9][0-9]*")) {
childPidsInInt += Integer.parseInt(p)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG an error/warning in else?

new FileInputStream(
new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8"))
val in: BufferedReader = new BufferedReader(fReader)
val procInfo = in.readLine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment why we are reading just first line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what hadoop's ProcfsBasedProcessTree is doing as well. I wasn't able to find a reference, but in my testing also reading just oneline was enough.

* meaning not available
*/
final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems
if (pTreeInfo.isAvailable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create the process tree on instantiation of ProcfsBasedSystems class. You can maybe also make that a lazy val, so that it is instantiated only when needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change this as well. It is a final val since other metrics were also final. Will check the lazy val, but probably not much difference since this initiation will be executed just one time anyway.

Some improvements in integration

Integration with the unit tests of the upstream open PR

Fix an isuue with memory info computation.

Fix scalastyle errors

Some changes to address comments
@rezasafi
Copy link
Contributor Author

rezasafi commented Oct 2, 2018

I will close this and open the actual Pr shortly. Thank you everyone for the great reviews.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants