Skip to content

Commit

Permalink
[SPARK-17649][CORE] Log how many Spark events got dropped in Asynchro…
Browse files Browse the repository at this point in the history
…nousListenerBus (branch 1.6)

## What changes were proposed in this pull request?

Backport #15220 to 1.6.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15226 from zsxwing/SPARK-17649-branch-1.6.
  • Loading branch information
zsxwing committed Sep 26, 2016
1 parent 94524ce commit 7aded55
Showing 1 changed file with 26 additions and 1 deletion.
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.util

import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -51,6 +52,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L

// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
Expand Down Expand Up @@ -117,6 +124,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}

Expand Down

0 comments on commit 7aded55

Please sign in to comment.