Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17649][CORE] Log how many Spark events got dropped in AsynchronousListenerBus (branch 1.6) #15226

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.util

import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -51,6 +52,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L

// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
Expand Down Expand Up @@ -117,6 +124,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use nanotime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't nanotime be overkill ? Even if there is a single dropped event, this check will get executed with every post() so having currentTimeMillis (which is less costly) is preferable.

Copy link
Member Author

@zsxwing zsxwing Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin this is not for measuring accurate elapsed time and we also want to log it using Date. According to the javadoc:

This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.

It's better to use System.currentTimeMillis() since we want to report the timestamp in the log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}

Expand Down