Skip to content

Commit

Permalink
log the events that take too long time to process
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxb1987 committed Sep 10, 2019
1 parent 52181fb commit 75f2781
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Expand Up @@ -585,6 +585,21 @@ package object config {
.intConf
.createWithDefault(128)

private[spark] val LISTENER_BUS_LOG_SLOW_EVENT_ENABLED =
ConfigBuilder("spark.scheduler.listenerbus.logSlowEvent.enabled")
.doc("When enabled, log the event name that takes too much time to process. This helps us " +
"discover the event types that cause performance bottlenecks. The time threshold is " +
"controlled by spark.scheduler.listenerbus.logSlowEvent.threshold.")
.booleanConf
.createWithDefault(true)

private[spark] val LISTENER_BUS_LOG_SLOW_EVENT_TIME_THRESHOLD =
ConfigBuilder("spark.scheduler.listenerbus.logSlowEvent.threshold")
.doc("The time threshold of whether a event is considered to be taking too much time to " +
"process. Log the event type if spark.scheduler.listenerbus.logSlowEvent.enabled is true.")
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("1s")

// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
Expand Down
Expand Up @@ -164,7 +164,6 @@ private class AsyncEventQueue(
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
logError("Current stack trace: \n" + dispatchThread.getStackTrace.mkString("\n"))
}
logTrace(s"Dropping event $event")

Expand All @@ -182,7 +181,6 @@ private class AsyncEventQueue(
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedCount events from $name since " +
s"${if (prevLastReportTimestamp == 0) "the application started" else s"$previous"}.")
logWarning("Current stack trace: \n" + dispatchThread.getStackTrace.mkString("\n"))
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Expand Up @@ -25,7 +25,8 @@ import scala.util.control.NonFatal

import com.codahale.metrics.Timer

import org.apache.spark.internal.Logging
import org.apache.spark.SparkEnv
import org.apache.spark.internal.{config, Logging}

/**
* An event bus which posts events to its listeners.
Expand All @@ -37,6 +38,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

private lazy val logSlowEventEnabled =
SparkEnv.get.conf.get(config.LISTENER_BUS_LOG_SLOW_EVENT_ENABLED)

private lazy val logSlowEventThreshold =
SparkEnv.get.conf.get(config.LISTENER_BUS_LOG_SLOW_EVENT_TIME_THRESHOLD)

/**
* Returns a CodaHale metrics Timer for measuring the listener's event processing time.
* This method is intended to be overridden by subclasses.
Expand Down Expand Up @@ -111,7 +118,10 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logError(s"Process of event ${event} took ${elapsed / 1000000000d}s.")
}
}
}
}
Expand Down

0 comments on commit 75f2781

Please sign in to comment.