Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34898][CORE] We should log SparkListenerExecutorMetricsUpdateEvent of driver appropriately when spark.eventLog.logStageExecutorMetrics is true #31992

Closed
wants to merge 8 commits into from
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.EventLogFileWriter
import org.apache.spark.executor.ExecutorMetrics
Expand Down Expand Up @@ -249,6 +249,9 @@ private[spark] class EventLoggingListener(
}

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
if (event.execId == SparkContext.DRIVER_IDENTIFIER) {
logEvent(event)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this only when shouldLogStageExecutorMetrics is enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this only when shouldLogStageExecutorMetrics is enabled ?

I don't thinks it should be controlled by shouldLogStageExecutorMetrics . since driver's metrics is not related to stage executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we have a single event for both driver and executor metrics update - differentiated by exec id.
I dont have strong opinions on this, but if we have a flag (shouldLogStageExecutorMetrics) controlling whether metrics are to be updated, we should consistently apply it IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we have a single event for both driver and executor metrics update - differentiated by exec id.
I dont have strong opinions on this, but if we have a flag (shouldLogStageExecutorMetrics) controlling whether metrics are to be updated, we should consistently apply it IMO.

@mridulm Follow this comment, how about current.

if (shouldLogStageExecutorMetrics) {
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
Expand Down
Expand Up @@ -515,14 +515,15 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
try {
val lines = readLines(logData)
val logStart = SparkListenerLogStart(SPARK_VERSION)
assert(lines.size === 22)
assert(lines.size === 25)
assert(lines(0).contains("SparkListenerLogStart"))
assert(lines(1).contains("SparkListenerApplicationStart"))
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
var logIdx = 1
events.foreach { event =>
event match {
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
case metricsUpdate: SparkListenerExecutorMetricsUpdate
if metricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER =>
case stageCompleted: SparkListenerStageCompleted =>
val execIds = Set[String]()
(1 to 3).foreach { _ =>
Expand Down Expand Up @@ -618,6 +619,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
assert(expected.stageInfo.stageId === actual.stageInfo.stageId)
case (expected: SparkListenerTaskEnd, actual: SparkListenerTaskEnd) =>
assert(expected.stageId === actual.stageId)
case (expected: SparkListenerExecutorMetricsUpdate,
actual: SparkListenerExecutorMetricsUpdate) =>
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
assert(expected.execId == actual.execId)
assert(expected.execId == SparkContext.DRIVER_IDENTIFIER)
case (expected: SparkListenerEvent, actual: SparkListenerEvent) =>
assert(expected === actual)
}
Expand Down