From 0b7e7685c5c1c7dda73c64a9b5ed929ff3484a8d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 15:19:42 -0700 Subject: [PATCH 1/6] Block on cleaning tasks by default + log error on queue full --- .../org/apache/spark/ContextCleaner.scala | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index bf3c3a6ceb5ef..b134e66395ce9 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -64,12 +64,21 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + // Capacity of the reference buffer before we log an error message + private val referenceBufferCapacity = 10000 + private var queueFullErrorMessageLogged = false + /** * Whether the cleaning thread will block on cleanup tasks. - * This is set to true only for tests. + * + * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary + * workaround for the issue, which is ultimately caused by the way the BlockManager actors + * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens, + * for instance, when the driver performs a GC and cleans up all broadcast blocks that are no + * longer in scope. */ private val blockOnCleanupTasks = sc.conf.getBoolean( - "spark.cleaner.referenceTracking.blocking", false) + "spark.cleaner.referenceTracking.blocking", true) @volatile private var stopped = false @@ -108,6 +117,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Register an object for cleanup. */ private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) { referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) + if (referenceBuffer.size > referenceBufferCapacity) { + logQueueFullErrorMessage() + } } /** Keep cleaning RDD, shuffle, and broadcast state. */ @@ -119,6 +131,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { reference.map(_.task).foreach { task => logDebug("Got cleaning task " + task) referenceBuffer -= reference.get + logDebug("There are " + referenceBuffer.size + " more tasks in queue") task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) @@ -171,12 +184,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } + /** + * Log an error message to indicate that the queue has exceeded its capacity. Do this only once. + */ + private def logQueueFullErrorMessage(): Unit = { + if (!queueFullErrorMessageLogged) { + queueFullErrorMessageLogged = true + logError(s"Reference queue size in ContextCleaner has exceeded $referenceBufferCapacity! " + + "This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.") + if (!blockOnCleanupTasks) { + logError("Consider setting spark.cleaner.referenceTracking.blocking to false." + + "Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " + + "the workload involves creating many RDDs in quick successions.") + } + } + } + private def blockManagerMaster = sc.env.blockManager.master private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - - // Used for testing. These methods explicitly blocks until cleanup is completed - // to ensure that more reliable testing. } private object ContextCleaner { From 104b36666ae5b490c99b28cf204c6892fd85a1ae Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 18:09:47 -0700 Subject: [PATCH 2/6] Use the actual reference queue length The previous code used the length of the referenceBuffer, which is the number of elements registered for clean-up, rather than the number of elements registered AND de-referenced. What we want is the length of the referenceQueue. However, Java does not expose this, so we must access it through reflection. Since this is potentially expensive, we need to limit the number of times we access the queue length this way. --- .../org/apache/spark/ContextCleaner.scala | 53 ++++++++++++++++--- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index b134e66395ce9..d5cc14fa30bc9 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} +import java.lang.reflect.Field import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} @@ -64,9 +65,26 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} - // Capacity of the reference buffer before we log an error message - private val referenceBufferCapacity = 10000 + /** + * Keep track of the reference queue length and log an error if this exceeds a certain capacity. + * Unfortunately, Java's ReferenceQueue exposes neither the queue length nor the enqueue method, + * so we have to do this through reflection. This is expensive, however, so we should access + * this field only once in a while. + */ + private val queueCapacity = 10000 private var queueFullErrorMessageLogged = false + private val queueLengthAccessor: Option[Field] = { + try { + val f = classOf[ReferenceQueue[AnyRef]].getDeclaredField("queueLength") + f.setAccessible(true) + Some(f) + } catch { + case e: Exception => + logDebug("Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e) + None + } + } + private val logQueueLengthInterval = 1000 /** * Whether the cleaning thread will block on cleanup tasks. @@ -117,13 +135,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Register an object for cleanup. */ private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) { referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) - if (referenceBuffer.size > referenceBufferCapacity) { - logQueueFullErrorMessage() - } } /** Keep cleaning RDD, shuffle, and broadcast state. */ private def keepCleaning(): Unit = Utils.logUncaughtExceptions { + var iteration = 0 while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) @@ -140,10 +156,14 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } + if (iteration % logQueueLengthInterval == 0) { + logQueueLength() + } } } catch { case e: Exception => logError("Error in cleaning thread", e) } + iteration += 1 } } @@ -190,9 +210,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private def logQueueFullErrorMessage(): Unit = { if (!queueFullErrorMessageLogged) { queueFullErrorMessageLogged = true - logError(s"Reference queue size in ContextCleaner has exceeded $referenceBufferCapacity! " + + logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " + "This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.") - if (!blockOnCleanupTasks) { + if (blockOnCleanupTasks) { logError("Consider setting spark.cleaner.referenceTracking.blocking to false." + "Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " + "the workload involves creating many RDDs in quick successions.") @@ -200,6 +220,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } + /** + * Log the length of the reference queue through reflection. + * This is an expensive operation and should be called sparingly. + */ + private def logQueueLength(): Unit = { + try { + queueLengthAccessor.foreach { field => + val length = field.getLong(referenceQueue) + logDebug("Reference queue size is " + length) + if (length > queueCapacity) { + logQueueFullErrorMessage() + } + } + } catch { + case e: Exception => + logDebug("Failed to access reference queue's length through reflection: " + e) + } + } + private def blockManagerMaster = sc.env.blockManager.master private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] From 9fd1fe6e2adc2dca1d1dedc9ec3af430f7c3da25 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 18:14:10 -0700 Subject: [PATCH 3/6] Remove outdated log --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index d5cc14fa30bc9..56bf76257628a 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -147,7 +147,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { reference.map(_.task).foreach { task => logDebug("Got cleaning task " + task) referenceBuffer -= reference.get - logDebug("There are " + referenceBuffer.size + " more tasks in queue") task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) From a183b836f9e2b736e4103e0b78a55a9117b60d6a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 18:15:08 -0700 Subject: [PATCH 4/6] Switch order of code blocks (minor) --- .../org/apache/spark/ContextCleaner.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 56bf76257628a..b12df49595b7b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -203,22 +203,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - /** - * Log an error message to indicate that the queue has exceeded its capacity. Do this only once. - */ - private def logQueueFullErrorMessage(): Unit = { - if (!queueFullErrorMessageLogged) { - queueFullErrorMessageLogged = true - logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " + - "This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.") - if (blockOnCleanupTasks) { - logError("Consider setting spark.cleaner.referenceTracking.blocking to false." + - "Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " + - "the workload involves creating many RDDs in quick successions.") - } - } - } - /** * Log the length of the reference queue through reflection. * This is an expensive operation and should be called sparingly. @@ -238,6 +222,22 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } + /** + * Log an error message to indicate that the queue has exceeded its capacity. Do this only once. + */ + private def logQueueFullErrorMessage(): Unit = { + if (!queueFullErrorMessageLogged) { + queueFullErrorMessageLogged = true + logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " + + "This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.") + if (blockOnCleanupTasks) { + logError("Consider setting spark.cleaner.referenceTracking.blocking to false." + + "Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " + + "the workload involves creating many RDDs in quick successions.") + } + } + } + private def blockManagerMaster = sc.env.blockManager.master private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] From 111192a3e578eec67fada6ef2750418f10d4fd15 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 18:16:27 -0700 Subject: [PATCH 5/6] Add missing space in log message (minor) --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index b12df49595b7b..1b75687f55b24 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -231,7 +231,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " + "This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.") if (blockOnCleanupTasks) { - logError("Consider setting spark.cleaner.referenceTracking.blocking to false." + + logError("Consider setting spark.cleaner.referenceTracking.blocking to false. " + "Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " + "the workload involves creating many RDDs in quick successions.") } From ce9daf5ec47209a32ac61485d2f6e9bb26cc9d37 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 15 Aug 2014 18:32:46 -0700 Subject: [PATCH 6/6] Remove logic for logging queue length This simplifies the PR significantly. Now it's strictly a bug fix. --- .../org/apache/spark/ContextCleaner.scala | 62 ------------------- 1 file changed, 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 1b75687f55b24..3848734d6f639 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,7 +18,6 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} -import java.lang.reflect.Field import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} @@ -65,27 +64,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} - /** - * Keep track of the reference queue length and log an error if this exceeds a certain capacity. - * Unfortunately, Java's ReferenceQueue exposes neither the queue length nor the enqueue method, - * so we have to do this through reflection. This is expensive, however, so we should access - * this field only once in a while. - */ - private val queueCapacity = 10000 - private var queueFullErrorMessageLogged = false - private val queueLengthAccessor: Option[Field] = { - try { - val f = classOf[ReferenceQueue[AnyRef]].getDeclaredField("queueLength") - f.setAccessible(true) - Some(f) - } catch { - case e: Exception => - logDebug("Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e) - None - } - } - private val logQueueLengthInterval = 1000 - /** * Whether the cleaning thread will block on cleanup tasks. * @@ -139,7 +117,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Keep cleaning RDD, shuffle, and broadcast state. */ private def keepCleaning(): Unit = Utils.logUncaughtExceptions { - var iteration = 0 while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) @@ -155,14 +132,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } - if (iteration % logQueueLengthInterval == 0) { - logQueueLength() - } } } catch { case e: Exception => logError("Error in cleaning thread", e) } - iteration += 1 } } @@ -203,41 +176,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - /** - * Log the length of the reference queue through reflection. - * This is an expensive operation and should be called sparingly. - */ - private def logQueueLength(): Unit = { - try { - queueLengthAccessor.foreach { field => - val length = field.getLong(referenceQueue) - logDebug("Reference queue size is " + length) - if (length > queueCapacity) { - logQueueFullErrorMessage() - } - } - } catch { - case e: Exception => - logDebug("Failed to access reference queue's length through reflection: " + e) - } - } - - /** - * Log an error message to indicate that the queue has exceeded its capacity. Do this only once. - */ - private def logQueueFullErrorMessage(): Unit = { - if (!queueFullErrorMessageLogged) { - queueFullErrorMessageLogged = true - logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " + - "This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.") - if (blockOnCleanupTasks) { - logError("Consider setting spark.cleaner.referenceTracking.blocking to false. " + - "Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " + - "the workload involves creating many RDDs in quick successions.") - } - } - } - private def blockManagerMaster = sc.env.blockManager.master private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]