-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24958][CORE] Add memory from procfs to executor metrics. #22612
Conversation
Some improvements in integration Integration with the unit tests of the upstream open PR Fix an isuue with memory info computation. Fix scalastyle errors Some changes to address comments (cherry picked from commit 04875b8)
…able and some improvments (cherry picked from commit 29a44c7)
(cherry picked from commit 3671427)
… heartbeat signals. Spark executors' process tree total memory information can be really useful. Currently such information are not available. The goal of this PR is to compute such information for each executor, add these information to the heartbeat signals, and compute the peaks at the driver. This PR is tested by running the current unit tests and also some added ones. I have also tested this on our internal cluster and have verified the results.
ok to test |
import org.apache.spark.internal.{config, Logging} | ||
|
||
private[spark] case class ProcfsBasedSystemsMetrics(jvmVmemTotal: Long, | ||
jvmRSSTotal: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: each arg on its own line, just a double indent (4 spaces)
|
||
class ProcfsBasedSystemsSuite extends SparkFunSuite { | ||
|
||
val p = new ProcfsBasedSystems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation is off
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
@@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType { | |||
} | |||
} | |||
|
|||
case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { | |||
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | |||
ExecutorMetricType.pTreeInfo.updateAllMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't like how this is actually updating all the other metrics -- it makes the code more confusing to follow, as you have to know there is a relationship between all of the metrics. I understand that you want to do the work once and grab all the metrics, but we should find a better way to do that. I see how the current api makes that hard to do.
I have two suggestions:
-
change the api to have
getMetricValue()
also pass in the System.currentTimeInMillis(). Then every single metric type would compare the passed in time against the last time you computed the metrics -- if it was stale, it would recompute everything and update the time. -
Change the api to allow one "metric getter" object to actually supply multiple metrics. You'd have a simple implementation which woudl just provide one metric, and you'd change all the existing metrics to extend that simple case, but your implementation would provide multiple metrics in one go.
I actually think (2) is better (that is what I did in memory-monitor plugin) though its a bit more work. You might need to play with this a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will wait for other people comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense for the metrics provider API to return a Map[String, Long]
for a set of "named" metrics - we've talked before about attaching a schema to the metrics bundle passed around by this API. So, similar to option 2 above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should make ProcessTreeMemory extends ExecutorMetricType and individual metrics can be returned from it. This also makes the assumption of calculating the metrics only in the ProcessTreeJVMRSSMemory and subsequent calls using it. We shouldn't depend on the order here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delayed response. Thanks for adding the total memory metrics -- these will be very useful. Agreed that doing the work once is better, but that having ProcessTreeJVMRSSMemory.getMetricValue() update all the metrics is confusing, especially if a user at some point wants to call getMetricValue() for one of the other metrics, and not ProcessTreeJVMRSSMemory.
@squito 's #1 is probably the easiest to make the change for with the existing code. However, #2 with @mccheah's suggestion to return Map sounds best/cleanest as an API, with @dhruve 's suggestion to consolidate into ProcessTreeMemory -- I prefer this approach as well.
Right now the call for getMetricValue is done in Heartbeater.getCurrentMetrics(), and it's mapping ExecutorMetricType.values to the array of actual values. Translating the returned maps to an array (with index mapping to name rather than ExecutorMetricType) will involve some more code. In retrospect, getting the current metrics is probably better done by ExecutorMetrics iteself, rather than having Heartbeater exposed to the implementation details -- would you be able to move the logic there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
fReader.close | ||
val procInfoSplit = procInfo.split(" ") | ||
if ( procInfoSplit != null ) { | ||
if (procInfoSplit(1).toLowerCase.contains("java")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the checks for java & python seem error prone (maybe you have some external program called "pass-data-from-java-to-fizzbuzz-cpp") ... but I'm also not sure what else you could do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm checking for the child processes of a Spark process. And there you will see either java or pythonX.Y. I don't think that Spark will create a process with custom name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might not be as uncommon to have arbitrary forked processes from Spark executors as we think. Recent discussions from Spark Summit 2018 to my knowledge propose sending off work to be done by specialized hardware or specialized software. I'm not an expert in these use cases so someone can correct this observation if they like. Mostly just flagging that for completeness.
Test build #96900 has finished for PR 22612 at commit
|
Test build #96908 has finished for PR 22612 at commit
|
Didn't get this scala check error about using root.Locale when I run the test locally. Will think about whether I should use that suggestion later tonight. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An overall great addition to the Spark metrics system. Some feedback as follows.
// project. | ||
private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Logging { | ||
val procfsStatFile = "stat" | ||
var pageSize: Long = computePageSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't need type declarations, I think.
It might make sense for the constant fields to be separated into a companion object
but in practice I don't think we've been consistent in our usage of companion objects for constants and one-off computes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for the review. Will apply some of your comments soon and others later.
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
" As a result reporting of ProcessTree metrics is stopped", e) | ||
isAvailable = false | ||
return -1 | ||
case t: Throwable => logDebug("Some exception occurred when trying to" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching Throwable
is generally scary, can this mask out of memory and errors of that sort? Can we scope down the exception type to handle here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, sorry. I didn't want to push this. Just add it to avoid a test failure. will remove it in the next patch
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
@@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType { | |||
} | |||
} | |||
|
|||
case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { | |||
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | |||
ExecutorMetricType.pTreeInfo.updateAllMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense for the metrics provider API to return a Map[String, Long]
for a set of "named" metrics - we've talked before about attaching a schema to the metrics bundle passed around by this API. So, similar to option 2 above.
@@ -84,6 +122,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( | |||
"java.nio:type=BufferPool,name=mapped") | |||
|
|||
private[spark] object ExecutorMetricType { | |||
final val pTreeInfo = new ProcfsBasedSystems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can ProcfsBasedSystems just be an object
in and of itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I first considered this class to have a companion object, but it didn't work. Mostly related to how the ExecutorMetricType is defined. I don't remember the exact detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a weird place to keep this, unless there is some really good reason for it. I think it should go inside ProcessTreeMetrics
.
also I'm not sure what the problem was with making it an object. Seems to work for me. its a bit different now as there are arguments to the constructor for testing -- but you could still have an object which extends the class
private[spark] object ProcfsBasedSystems extends ProcfsBasedSystems("/proc/")
though doesn't really seem to have much value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the benefits of the companion object vs this current approach? I can revert to the companion object model and do testing again to see what was the problem before, but just wanted to understand the benefits of it before investing time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Today I spent sometime on the companion object solution and figured out the problem that I was facing before and was able to fix it. I will send the updated pr sometime tonight or tomorrow. thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally having an object helps make it clear that there is a singleton; its easier to share properly and easier to figure out how to get a handle on it. Given that we'll have a class anyway, I don't think there is a ton of value in having there be a companion object.
I do still think the instance you create here should go somewhere else.
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
fReader.close | ||
val procInfoSplit = procInfo.split(" ") | ||
if ( procInfoSplit != null ) { | ||
if (procInfoSplit(1).toLowerCase.contains("java")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might not be as uncommon to have arbitrary forked processes from Spark executors as we think. Recent discussions from Spark Summit 2018 to my knowledge propose sending off work to be done by specialized hardware or specialized software. I'm not an expert in these use cases so someone can correct this observation if they like. Mostly just flagging that for completeness.
Test build #96914 has finished for PR 22612 at commit
|
The test failure isn't related to this change and it is most likely flaky. I will push another patch shortly, but it will not address all the comments, but most of them. |
Test build #96921 has finished for PR 22612 at commit
|
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Outdated
Show resolved
Hide resolved
@@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType { | |||
} | |||
} | |||
|
|||
case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { | |||
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | |||
ExecutorMetricType.pTreeInfo.updateAllMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should make ProcessTreeMemory extends ExecutorMetricType and individual metrics can be returned from it. This also makes the assumption of calculating the metrics only in the ProcessTreeJVMRSSMemory and subsequent calls using it. We shouldn't depend on the order here.
Test build #97018 has finished for PR 22612 at commit
|
Test build #97015 has finished for PR 22612 at commit
|
if (procInfoSplit != null) { | ||
val vmem = procInfoSplit(22).toLong | ||
val rssPages = procInfoSplit(23).toLong | ||
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this just be vmem and rssPages, rather than splitting into JVM, Python, and other? Can you explain more about how the separate values would be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is separated since it turns out knowing main actors like jvm in seperation can have some value for the user. We just consider jvm (case of pur scala) and python (case of using pyspark). Other stuff can be added per interest in future, but for now we consider everything else under "Other" category
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edwinalu It would be nice to have a break up of the total memory being consumed. Its easier to tune the parameters knowing what is consuming all the memory. For example if your container died OOMing - it helps to know if it was because of python or JVM. Also R fits in the other category so it makes sense to have all 3 of them as of now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have much pyspark ourselves, but yes, it seems useful to have the breakdown, and it's easy to sum the values for the total.
…f an array of metrics
Test build #97166 has finished for PR 22612 at commit
|
The test failed becuase it assumes a certain order when metrics are being reported in json. I will think about changing the test since by using a map in ExecutorMetricType we don't assume any specific order when reporting the metrics. |
@@ -28,35 +30,63 @@ import org.apache.spark.metrics.ExecutorMetricType | |||
@DeveloperApi | |||
class ExecutorMetrics private[spark] extends Serializable { | |||
|
|||
// Metrics are indexed by MetricGetter.values | |||
private val metrics = new Array[Long](ExecutorMetricType.values.length) | |||
private var metrics = mutable.Map.empty[String, Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a fair amount of overhead in storing values in a Map, data structure and the String keys. All this would then be sent in the heartbeat, so this would add overhead to the heartbeats. The array representation is a lot more compact. Unfortunately there's no EnumMap in Scala. I'd prefer to keep the array storage, and use map for returning and passing in values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overhead is a valid concern. However, this isn't a huge overhead and it isn't always there as we have configs for whether user wants computing executor metrics. I can also make the map to be smaller by changing it from being Map[String, Long] to something like Map[somesmallertype, Long] and just having the mapping to metric names when we are writing to or reading from json.
I used map since I saw a few comments here and also in 21221 about using map instead of array. We should definitely decide about this now before the next release since we can't make changes to the API after that. I'm open to moving back to using Array if it is more reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have found a better solution to use an array like before and make use of a map for clarity. I will try to push this new solution by Wednesday night after finishing the tests. Thank you very much.
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
Test build #99515 has finished for PR 22612 at commit
|
The test is flaky and not related to my change. It passed when we re run it last time. Jenkins retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of minor things, but almost there
@tgravescs @mccheah do you want to take another look?
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
Test build #99526 has finished for PR 22612 at commit
|
Test build #99529 has finished for PR 22612 at commit
|
@squito @tgravescs @mccheah let me know if you have other concerns. Thank you in advance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one important comment about error handling, and a couple of other tiny style things.
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
for (p <- pids) { | ||
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) | ||
} | ||
allMetrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one weird thing about the error handling here -- if you get any error from computeProcessTree
through all calls to addProcsFsMetricsFromOneProcess
, then you'll set isAvailable = false
, but you'll still return allMetrics
. Depending on when the error occurs, you might have some partially accumulated metrics in there. I think if there is any error, you probably want to return empty metrics.
Also you'll leave isAvailable=false
forever after that -- is that OK? I guess I don't really have strong feelings about this one, maybe its OK to give up forever for this executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to be more accurate here when an error happens. Although I think I need to revert to some older version of method signature, since right now there is a cohesion around allMetrics. I will do something tomorrow.
I don't want to introduce overhead of keep checking, but I can think about it and implement some sort of a check after we set isAvailable = false because of an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (p <- pids) {
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
// if we had an error getting any of the metrics, we don't want to report partial metrics, as
// that would be misleading.
if (!isAvailable) {
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
}
}
or just after the loop, if you don't want to check at each iteration (though the overhead is pretty tiny compared to addProcfsMetricsFromOneProcess
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment about overhead was referring to your question about whether is it ok to keep isAvailable=false
after error. I guess we can go with the option of not changing isAvailable= false after an error in computing processTree and just set a flag to check afterward and return zero in case of error like the way you suggested in your above comment. That way we will not stop reporting for this executor and we only return zero in case of error. I'm fine with this as well.
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
Test build #99684 has finished for PR 22612 at commit
|
The failure isn't related to my change. Jenkins retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm (assuming tests pass). will give folks that commented on this previously a bit of time to comment on it -- @tgravescs @dhruve @edwinalu @LantaoJin
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
Outdated
Show resolved
Hide resolved
Test build #4454 has finished for PR 22612 at commit
|
Test build #99748 has finished for PR 22612 at commit
|
Test build #99772 has finished for PR 22612 at commit
|
Same flaky test. I think in the mornings there is probably more load and the flakiness is higher. Jenkins retest this please. |
Test build #4460 has finished for PR 22612 at commit
|
Thank you @squito for the reviews. I hope we can merge this soon if there is no concern from @tgravescs @dhruve @edwinalu @LantaoJin. Thanks every one in advance. |
merged to master, thanks @rezasafi |
Thank you very much @squito |
This adds the entire memory used by spark’s executor (as measured by procfs) to the executor metrics. The memory usage is collected from the entire process tree under the executor. The metrics are subdivided into memory used by java, by python, and by other processes, to aid users in diagnosing the source of high memory usage. The additional metrics are sent to the driver in heartbeats, using the mechanism introduced by SPARK-23429. This also slightly extends that approach to allow one ExecutorMetricType to collect multiple metrics. Added unit tests and also tested on a live cluster. Closes apache#22612 from rezasafi/ptreememory2. Authored-by: Reza Safi <rezasafi@cloudera.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
This adds the entire memory used by spark’s executor (as measured by procfs) to the executor metrics. The memory usage is collected from the entire process tree under the executor. The metrics are subdivided into memory used by java, by python, and by other processes, to aid users in diagnosing the source of high memory usage.
The additional metrics are sent to the driver in heartbeats, using the mechanism introduced by SPARK-23429. This also slightly extends that approach to allow one ExecutorMetricType to collect multiple metrics.
Added unit tests and also tested on a live cluster.