Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31711][CORE] Register the executor source with the metrics system when running in local mode. #28528

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -623,6 +623,9 @@ class SparkContext(config: SparkConf) extends Logging {

// Post init
_taskScheduler.postStartHook()
if (isLocal) {
_env.metricsSystem.registerSource(Executor.executorSourceLocalModeOnly)
}
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ private[spark] class Executor(
env.metricsSystem.registerSource(new JVMCPUSource())
executorMetricsSource.foreach(_.register(env.metricsSystem))
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
} else {
// This enable the registration of the executor source in local mode.
// The actual registration happens in SparkContext,
// it cannot be done here as the appId is not available yet
Executor.executorSourceLocalModeOnly = executorSource
}

// Whether to load classes in user jars before those in Spark jars
Expand Down Expand Up @@ -979,4 +984,7 @@ private[spark] object Executor {
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be
// used instead.
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]

// Used to store executorSource, for local mode only
var executorSourceLocalModeOnly: ExecutorSource = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Looks like this one is never been cleaned. It would be great to avoid using a global executorSourceLocalModeOnly to save a state of a specific executor. Can we move this to SparkEnv so that a state of one test won't be leaked to other tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zsxwing , I'll have a look at it.

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,16 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("SPARK-31711: Test executor source registration in local mode") {
val conf = new SparkConf()
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// Executor source should be registered
assert (metricsSystem.getSourcesByName("executor").nonEmpty)
} finally {
sc.stop()
}
}
}
8 changes: 6 additions & 2 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,11 @@ This is the component with the largest amount of instrumented metrics
- namespace=JVMCPU
- jvmCpuTime

- namespace=executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks the same as the ExecutorMetrics but I think they are actually different in that this doesn't give you the JVM metrics - correct? Perhaps we need to update the one below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the naming could be improved, in particular metrics under namespace=executor and namespace=ExecutorMetrics are similar in scope, however the implementation goes via quite different paths [[ExecutorSource]] vs. [[ExecutoMetricsSource]]. Merging the two could be the subject for future refactoring.

Just to clarify: metrics in the namespace=ExecutorMetrics are already available in local mode. The goal of this PR is to make matrics in the namespace="executor" also available in local mode.

- **note:** These metrics are available in the driver in local mode only.
- A full list of available metrics in this
namespace can be found in the corresponding entry for the Executor component instance.

- namespace=ExecutorMetrics
- **note:** these metrics are conditional to a configuration parameter:
`spark.metrics.executorMetricsSource.enabled` (default is true)
Expand All @@ -1165,8 +1170,7 @@ This is the component with the largest amount of instrumented metrics
custom plugins into Spark.

### Component instance = Executor
These metrics are exposed by Spark executors. Note, currently they are not available
when running in local mode.
These metrics are exposed by Spark executors.

- namespace=executor (metrics are of type counter or gauge)
- bytesRead.count
Expand Down