New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31711][CORE] Register the executor source with the metrics system when running in local mode. #28528
Conversation
ok to test |
@@ -803,6 +803,13 @@ package object config { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
private[spark] val METRICS_EXECUTOR_SOURCE_ENABLED = | |||
ConfigBuilder("spark.metrics.executorSource.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @LucaCanali .
This is a completely orthogonal topic from the PR title, Register the executor source with the metrics system when running in local mode
. Could you make a separate PR for this first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, spark.metrics.executorSource.enabled=false
doesn't make senses to me. Do you need to disable this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun for the quick reaction on this. My reasoning for adding the possibility of turning the executor source off in this PR, was that I thought it could help adoption, in the sense that it could be a safety net for those who may be impacted by the addition of this list of new driver metrics (in local mode). Also it can be seen as a "simmetric partner" of spark.metrics.executorMetricsSource.enabled
. I actually have no use case that needs to turn the executor source metrics off, so I can for sure split this part out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse Never mind.spark.metrics.executorMetricsSource.enabled
in the local mode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutorMetricsSource
is a new one in 3.0, but ExecutorSource
is very old one since Spark 1.x. If we don't have a use case, let's not add this configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
Test build #122624 has finished for PR 28528 at commit
|
a6e23f1
to
52c22ce
Compare
Test build #122631 has finished for PR 28528 at commit
|
FYI, the dependency failure will be fixed by the following. |
Retest this please. |
Test build #122651 has finished for PR 28528 at commit
|
Can we test this again please? |
Sorry for being late. |
Retest this please. |
Test build #123102 has finished for PR 28528 at commit
|
@@ -121,7 +121,7 @@ private[spark] class Executor( | |||
// create. The map key is a task id. | |||
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() | |||
|
|||
val executorMetricsSource = | |||
private val executorMetricsSource = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be irrelevant to this PR.
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
@@ -134,8 +134,11 @@ private[spark] class Executor( | |||
env.metricsSystem.registerSource(new JVMCPUSource()) | |||
executorMetricsSource.foreach(_.register(env.metricsSystem)) | |||
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) | |||
} else { | |||
Executor.executorSource = executorSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we call env.metricsSystem.registerSource(executorSource)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, this is the actual pain point this PR tries to solve: one cannot simply register metrics on env here when running in local mode, or otherwise the appId would not be available, so the resulting output would not be clearly usable (missing a key piece of info as the appId). That's why I propose to register the metrics in the Spark Context, together with other driver metrics. As you can see other metrics namespaces use a similar strategy.
To get around the issue, for local mode, I propose using the workaround of storing the executorSource in the object so it can be read later, I hope this is acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can think of other ways to do this more cleanly but it would be quite a bit more code change. I do think we should explicitly make a comment about the appId so that someone who comes and looks at this later realizes that
core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala
Outdated
Show resolved
Hide resolved
@LucaCanali . Could you add more concrete examples into the PR description for the following claim, please?
|
I have added to the PR description some additional context and a short explanation of why I think Spark users can find this useful. |
Test build #123122 has finished for PR 28528 at commit
|
Retest this please. |
Test build #124309 has finished for PR 28528 at commit
|
It looks like there was a problem with the Jenkins build system? |
e2ebe65
to
70d55ed
Compare
Test build #126177 has finished for PR 28528 at commit
|
Hi @dongjoon-hyun, do you have further comments or suggestions for improvements on this? |
70d55ed
to
ae1b206
Compare
@@ -1153,6 +1153,11 @@ This is the component with the largest amount of instrumented metrics | |||
- namespace=JVMCPU | |||
- jvmCpuTime | |||
|
|||
- namespace=executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks the same as the ExecutorMetrics but I think they are actually different in that this doesn't give you the JVM metrics - correct? Perhaps we need to update the one below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the naming could be improved, in particular metrics under namespace=executor and namespace=ExecutorMetrics are similar in scope, however the implementation goes via quite different paths [[ExecutorSource]] vs. [[ExecutoMetricsSource]]. Merging the two could be the subject for future refactoring.
Just to clarify: metrics in the namespace=ExecutorMetrics are already available in local mode. The goal of this PR is to make matrics in the namespace="executor" also available in local mode.
@@ -623,6 +623,9 @@ class SparkContext(config: SparkConf) extends Logging { | |||
|
|||
// Post init | |||
_taskScheduler.postStartHook() | |||
if (isLocal) { | |||
_env.metricsSystem.registerSource(Executor.executorSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I haven't went and looked but if I don't configure the metrics system files does this cause any overhead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not directly measured it, but I'd say the overhead from computing the metrics in the executor namespace is small. Also, we are talking about a change that only affects Spark running in local mode.
However, one important point related to the user-impact of this PR is a consequence of the fact that by default metrics are sunk using the MetricsServelet, which attaches to the WebUI this is because of the default value for "*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
and "*.sink.servlet.path" = "/metrics/json"
(see https://spark.apache.org/docs/latest/monitoring.html#metrics ). This means that executor metrics in local mode will be visible under the WebUI too (http://localhost:4040/metrics/json/)
If we want to be extra cautious about this we can introduce a config (this has been discussed above in this PR and discussion there pointed to not have the extra config).
@@ -135,6 +135,8 @@ private[spark] class Executor( | |||
env.metricsSystem.registerSource(new JVMCPUSource()) | |||
executorMetricsSource.foreach(_.register(env.metricsSystem)) | |||
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) | |||
} else { | |||
Executor.executorSource = executorSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we now have 2 executorSource variables, one in the object and one in the class. That seems a bit weird/confusing. I see the reason why but perhaps we can rename the object one to have LocalModeOnly in the name or something as to not accidentally be used in non-local mode.
@@ -134,8 +134,11 @@ private[spark] class Executor( | |||
env.metricsSystem.registerSource(new JVMCPUSource()) | |||
executorMetricsSource.foreach(_.register(env.metricsSystem)) | |||
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) | |||
} else { | |||
Executor.executorSource = executorSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can think of other ways to do this more cleanly but it would be quite a bit more code change. I do think we should explicitly make a comment about the appId so that someone who comes and looks at this later realizes that
Test build #128652 has finished for PR 28528 at commit
|
Test build #128650 has finished for PR 28528 at commit
|
… object following review.
Test build #128813 has finished for PR 28528 at commit
|
@tgraves, @dongjoon-hyun, thanks for the reviews and comments. Do you think this is getting ready or more work and changes are needed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine to me
Jenkins, test this please |
Test build #129351 has started for PR 28528 at commit |
Kubernetes integration test starting |
Kubernetes integration test status failure |
test this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130008 has finished for PR 28528 at commit
|
test this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130617 has finished for PR 28528 at commit
|
sorry for my delay on this, I'm going to merge |
merged to master, thanks @LucaCanali |
Thank you @tgravescs |
@@ -979,4 +984,7 @@ private[spark] object Executor { | |||
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be | |||
// used instead. | |||
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] | |||
|
|||
// Used to store executorSource, for local mode only | |||
var executorSourceLocalModeOnly: ExecutorSource = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Looks like this one is never been cleaned. It would be great to avoid using a global executorSourceLocalModeOnly
to save a state of a specific executor. Can we move this to SparkEnv so that a state of one test won't be leaked to other tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zsxwing , I'll have a look at it.
What changes were proposed in this pull request?
This PR proposes to register the executor source with the Spark metrics system when running in local mode.
Why are the changes needed?
The Apache Spark metrics system provides many useful insights on the Spark workload.
In particular, the executor source metrics provide detailed info, including the number of active tasks, I/O metrics, and several task metrics details. The executor source metrics, contrary to other sources (for example ExecutorMetrics source), is not available when running in local mode.
Having executor metrics in local mode can be useful when testing and troubleshooting Spark workloads in a development environment. The metrics can be fed to a dashboard to see the evolution of resource usage and can be used to troubleshoot performance,
as in this example.
Currently users will have to deploy on a cluster to be able to collect executor source metrics, while the possibility of having them in local mode is handy for testing.
Does this PR introduce any user-facing change?
How was this patch tested?
SourceConfigSuite