From 624cb3b7340f2d6624a4bf35cc4cd9f59739fa9c Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 5 Oct 2015 11:56:44 -0400 Subject: [PATCH] [SPARK-10926][CORE] Create WeakReferenceCleaner interface that ContextCleaner extends Preparing the way for SPARK-10250 which will introduce other objects that will be cleaned via detecting their being cleaned up. --- .../org/apache/spark/ContextCleaner.scala | 116 ++++-------------- .../apache/spark/WeakReferenceCleaner.scala | 91 ++++++++++++++ .../spark/util/cleanup/CleanupTasks.scala | 41 +++++++ project/MimaExcludes.scala | 29 +++++ 4 files changed, 184 insertions(+), 93 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/WeakReferenceCleaner.scala create mode 100644 core/src/main/scala/org/apache/spark/util/cleanup/CleanupTasks.scala diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index d23c1533db758..a14a55ec352d3 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -17,35 +17,13 @@ package org.apache.spark -import java.lang.ref.{ReferenceQueue, WeakReference} - -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} - import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.util.Utils +import org.apache.spark.util.cleanup.{ CleanAccum, CleanBroadcast, CleanCheckpoint } +import org.apache.spark.util.cleanup.{ CleanRDD, CleanShuffle, CleanupTask } -/** - * Classes that represent cleaning tasks. - */ -private sealed trait CleanupTask -private case class CleanRDD(rddId: Int) extends CleanupTask -private case class CleanShuffle(shuffleId: Int) extends CleanupTask -private case class CleanBroadcast(broadcastId: Long) extends CleanupTask -private case class CleanAccum(accId: Long) extends CleanupTask -private case class CleanCheckpoint(rddId: Int) extends CleanupTask - -/** - * A WeakReference associated with a CleanupTask. - * - * When the referent object becomes only weakly reachable, the corresponding - * CleanupTaskWeakReference is automatically added to the given reference queue. - */ -private class CleanupTaskWeakReference( - val task: CleanupTask, - referent: AnyRef, - referenceQueue: ReferenceQueue[AnyRef]) - extends WeakReference(referent, referenceQueue) +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} /** * An asynchronous cleaner for RDD, shuffle, and broadcast state. @@ -54,18 +32,11 @@ private class CleanupTaskWeakReference( * to be processed when the associated object goes out of scope of the application. Actual * cleanup is performed in a separate daemon thread. */ -private[spark] class ContextCleaner(sc: SparkContext) extends Logging { - - private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference] - with SynchronizedBuffer[CleanupTaskWeakReference] - - private val referenceQueue = new ReferenceQueue[AnyRef] +private[spark] class ContextCleaner(sc: SparkContext) extends WeakReferenceCleaner { private val listeners = new ArrayBuffer[CleanerListener] with SynchronizedBuffer[CleanerListener] - private val cleaningThread = new Thread() { override def run() { keepCleaning() }} - /** * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). @@ -92,35 +63,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val blockOnShuffleCleanupTasks = sc.conf.getBoolean( "spark.cleaner.referenceTracking.blocking.shuffle", false) - @volatile private var stopped = false - /** Attach a listener object to get information of when objects are cleaned. */ def attachListener(listener: CleanerListener): Unit = { listeners += listener } - /** Start the cleaner. */ - def start(): Unit = { - cleaningThread.setDaemon(true) - cleaningThread.setName("Spark Context Cleaner") - cleaningThread.start() - } - - /** - * Stop the cleaning thread and wait until the thread has finished running its current task. - */ - def stop(): Unit = { - stopped = true - // Interrupt the cleaning thread, but wait until the current task has finished before - // doing so. This guards against the race condition where a cleaning thread may - // potentially clean similarly named variables created by a different SparkContext, - // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). - synchronized { - cleaningThread.interrupt() - } - cleaningThread.join() - } - /** Register a RDD for cleanup when it is garbage collected. */ def registerRDDForCleanup(rdd: RDD[_]): Unit = { registerForCleanup(rdd, CleanRDD(rdd.id)) @@ -145,43 +92,30 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { registerForCleanup(rdd, CleanCheckpoint(parentId)) } - /** Register an object for cleanup. */ - private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { - referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) + /** Keep cleaning RDD, shuffle, and broadcast state. */ + override protected def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { + super.keepCleaning() } - /** Keep cleaning RDD, shuffle, and broadcast state. */ - private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { - while (!stopped) { - try { - val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) - .map(_.asInstanceOf[CleanupTaskWeakReference]) - // Synchronize here to avoid being interrupted on stop() - synchronized { - reference.map(_.task).foreach { task => - logDebug("Got cleaning task " + task) - referenceBuffer -= reference.get - task match { - case CleanRDD(rddId) => - doCleanupRDD(rddId, blocking = blockOnCleanupTasks) - case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) - case CleanBroadcast(broadcastId) => - doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) - case CleanAccum(accId) => - doCleanupAccum(accId, blocking = blockOnCleanupTasks) - case CleanCheckpoint(rddId) => - doCleanCheckpoint(rddId) - } - } - } - } catch { - case ie: InterruptedException if stopped => // ignore - case e: Exception => logError("Error in cleaning thread", e) - } + protected def handleCleanupForSpecificTask(task: CleanupTask): Unit = { + task match { + case CleanRDD(rddId) => + doCleanupRDD(rddId, blocking = blockOnCleanupTasks) + case CleanShuffle(shuffleId) => + doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) + case CleanBroadcast(broadcastId) => + doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) + case CleanAccum(accId) => + doCleanupAccum(accId, blocking = blockOnCleanupTasks) + case CleanCheckpoint(rddId) => + doCleanCheckpoint(rddId) + case unknown => + logWarning(s"Got a cleanup task $unknown that cannot be handled by ContextCleaner,") } } + protected def cleanupThreadName(): String = "Context Cleaner" + /** Perform RDD cleanup. */ def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = { try { @@ -252,10 +186,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] } -private object ContextCleaner { - private val REF_QUEUE_POLL_TIMEOUT = 100 -} - /** * Listener class used for testing when any item has been cleaned by the Cleaner class. */ diff --git a/core/src/main/scala/org/apache/spark/WeakReferenceCleaner.scala b/core/src/main/scala/org/apache/spark/WeakReferenceCleaner.scala new file mode 100644 index 0000000000000..0dd6d4773dcb6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/WeakReferenceCleaner.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark + +import java.lang.ref.ReferenceQueue + +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import org.apache.spark.util.cleanup.{CleanupTask, CleanupTaskWeakReference} + +/** + * Utility trait that keeps a long running thread for cleaning up weak references + * after they are GCed. Currently implemented by ContextCleaner and ExecutorCleaner + * only. + */ +private[spark] trait WeakReferenceCleaner extends Logging { + + private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference] + with SynchronizedBuffer[CleanupTaskWeakReference] + + private val referenceQueue = new ReferenceQueue[AnyRef] + + private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + + private var stopped = false + + /** Start the cleaner. */ + def start(): Unit = { + cleaningThread.setDaemon(true) + cleaningThread.setName(cleanupThreadName()) + cleaningThread.start() + } + + def stop(): Unit = { + stopped = true + synchronized { + // Interrupt the cleaning thread, but wait until the current task has finished before + // doing so. This guards against the race condition where a cleaning thread may + // potentially clean similarly named variables created by a different SparkContext, + // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). + cleaningThread.interrupt() + } + cleaningThread.join() + } + + protected def keepCleaning(): Unit = { + while (!stopped) { + try { + val reference = Option(referenceQueue.remove(WeakReferenceCleaner.REF_QUEUE_POLL_TIMEOUT)) + .map(_.asInstanceOf[CleanupTaskWeakReference]) + // Synchronize here to avoid being interrupted on stop() + synchronized { + reference.map(_.task).foreach { task => + logDebug("Got cleaning task " + task) + referenceBuffer -= reference.get + handleCleanupForSpecificTask(task) + } + } + } catch { + case ie: InterruptedException if stopped => // ignore + case e: Exception => logError("Error in cleaning thread", e) + } + } + } + + /** Register an object for cleanup. */ + protected def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { + referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) + } + + protected def handleCleanupForSpecificTask(task: CleanupTask) + protected def cleanupThreadName(): String +} + +private object WeakReferenceCleaner { + private val REF_QUEUE_POLL_TIMEOUT = 100 +} diff --git a/core/src/main/scala/org/apache/spark/util/cleanup/CleanupTasks.scala b/core/src/main/scala/org/apache/spark/util/cleanup/CleanupTasks.scala new file mode 100644 index 0000000000000..76dbdd8d3c44f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/cleanup/CleanupTasks.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.cleanup + +import java.lang.ref.{ReferenceQueue, WeakReference} + +/** + * Classes that represent cleaning tasks. + */ +private[spark] sealed trait CleanupTask +private[spark] case class CleanRDD(rddId: Int) extends CleanupTask +private[spark] case class CleanShuffle(shuffleId: Int) extends CleanupTask +private[spark] case class CleanBroadcast(broadcastId: Long) extends CleanupTask +private[spark] case class CleanAccum(accId: Long) extends CleanupTask +private[spark] case class CleanCheckpoint(rddId: Int) extends CleanupTask + +/** + * A WeakReference associated with a CleanupTask. + * + * When the referent object becomes only weakly reachable, the corresponding + * CleanupTaskWeakReference is automatically added to the given reference queue. + */ +private[spark] class CleanupTaskWeakReference( + val task: CleanupTask, + referent: AnyRef, + referenceQueue: ReferenceQueue[AnyRef]) + extends WeakReference(referent, referenceQueue) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2d4d146f51339..14e1b05fcd9f5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -80,6 +80,35 @@ object MimaExcludes { "org.apache.spark.ml.regression.LeastSquaresAggregator.add"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.ml.regression.LeastSquaresCostFun.this") + ) ++ Seq( + // Cleanup task types are marked as private but Mima also confused by this change, + // similar to SPARK-10381. + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanAccum"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanAccum$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanBroadcast"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanBroadcast$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanCheckpoint"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanCheckpoint$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanupTask"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanRDD"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanRDD$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanShuffle"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanShuffle$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanupTaskWeakReference"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.CleanupTaskWeakReference$") ) case v if v.startsWith("1.5") => Seq(