Skip to content

Commit

Permalink
Use the actual reference queue length
Browse files Browse the repository at this point in the history
The previous code used the length of the referenceBuffer, which is
the number of elements registered for clean-up, rather than the
number of elements registered AND de-referenced.

What we want is the length of the referenceQueue. However, Java
does not expose this, so we must access it through reflection.
Since this is potentially expensive, we need to limit the number
of times we access the queue length this way.
  • Loading branch information
andrewor14 committed Aug 14, 2014
1 parent 0b7e768 commit 104b366
Showing 1 changed file with 46 additions and 7 deletions.
53 changes: 46 additions & 7 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}
import java.lang.reflect.Field

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

Expand Down Expand Up @@ -64,9 +65,26 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

// Capacity of the reference buffer before we log an error message
private val referenceBufferCapacity = 10000
/**
* Keep track of the reference queue length and log an error if this exceeds a certain capacity.
* Unfortunately, Java's ReferenceQueue exposes neither the queue length nor the enqueue method,
* so we have to do this through reflection. This is expensive, however, so we should access
* this field only once in a while.
*/
private val queueCapacity = 10000
private var queueFullErrorMessageLogged = false
private val queueLengthAccessor: Option[Field] = {
try {
val f = classOf[ReferenceQueue[AnyRef]].getDeclaredField("queueLength")
f.setAccessible(true)
Some(f)
} catch {
case e: Exception =>
logDebug("Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e)
None
}
}
private val logQueueLengthInterval = 1000

/**
* Whether the cleaning thread will block on cleanup tasks.
Expand Down Expand Up @@ -117,13 +135,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
if (referenceBuffer.size > referenceBufferCapacity) {
logQueueFullErrorMessage()
}
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
var iteration = 0
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand All @@ -140,10 +156,14 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
if (iteration % logQueueLengthInterval == 0) {
logQueueLength()
}
}
} catch {
case e: Exception => logError("Error in cleaning thread", e)
}
iteration += 1
}
}

Expand Down Expand Up @@ -190,16 +210,35 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def logQueueFullErrorMessage(): Unit = {
if (!queueFullErrorMessageLogged) {
queueFullErrorMessageLogged = true
logError(s"Reference queue size in ContextCleaner has exceeded $referenceBufferCapacity! " +
logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " +
"This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.")
if (!blockOnCleanupTasks) {
if (blockOnCleanupTasks) {
logError("Consider setting spark.cleaner.referenceTracking.blocking to false." +
"Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " +
"the workload involves creating many RDDs in quick successions.")
}
}
}

/**
* Log the length of the reference queue through reflection.
* This is an expensive operation and should be called sparingly.
*/
private def logQueueLength(): Unit = {
try {
queueLengthAccessor.foreach { field =>
val length = field.getLong(referenceQueue)
logDebug("Reference queue size is " + length)
if (length > queueCapacity) {
logQueueFullErrorMessage()
}
}
} catch {
case e: Exception =>
logDebug("Failed to access reference queue's length through reflection: " + e)
}
}

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
Expand Down

0 comments on commit 104b366

Please sign in to comment.