From 387b5787fcff0903770fc6dfea082dd2c4aca756 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 26 Aug 2014 11:43:09 -0700 Subject: [PATCH 1/4] Made ContextCleaner to not block on shuffles --- .../scala/org/apache/spark/ContextCleaner.scala | 17 ++++++++++++++++- .../spark/storage/BlockManagerMaster.scala | 12 +++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 3848734d6f639..b2145ea6f3c6b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -76,6 +76,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val blockOnCleanupTasks = sc.conf.getBoolean( "spark.cleaner.referenceTracking.blocking", true) + /** + * Whether to disable blocking on shuffle tasks. This override is effective only when + * blocking on cleanup tasks is enabled. + * + * When context cleaner is configured to block on every delete request, it can throw timeout + * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this + * parameter disables blocking on shuffle cleanups when when `blockOnCleanupTasks` is true). + * Note that this does not affect the cleanup of RDDs and broadcasts. + * This is intended to be a temporary workaround, until the real Akka issue (referred to in + * the comment above `blockOnCleanupTasks`) is resolved. + */ + private val disableBlockOnShuffleCleanupTasks = sc.conf.getBoolean( + "spark.cleaner.referenceTracking.disableBlockingForShuffles", true) + @volatile private var stopped = false /** Attach a listener object to get information of when objects are cleaned. */ @@ -128,7 +142,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks) + doCleanupShuffle(shuffleId, + blocking = (blockOnCleanupTasks && !disableBlockOnShuffleCleanupTasks)) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 669307765d1fa..e67b3dc5ce02e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log def removeRdd(rddId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) future.onFailure { - case e: Throwable => logError("Failed to remove RDD " + rddId, e) + case e: Exception => + logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}") } if (blocking) { Await.result(future, timeout) @@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) future.onFailure { - case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) + case e: Exception => + logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}") } if (blocking) { Await.result(future, timeout) @@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log val future = askDriverWithReply[Future[Seq[Int]]]( RemoveBroadcast(broadcastId, removeFromMaster)) future.onFailure { - case e: Throwable => - logError("Failed to remove broadcast " + broadcastId + - " with removeFromMaster = " + removeFromMaster, e) + case e: Exception => + logWarning(s"Failed to remove broadcast $broadcastId" + + s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}") } if (blocking) { Await.result(future, timeout) From e337cc2d6731ed709afe78a958242abd89c3df56 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 26 Aug 2014 12:34:38 -0700 Subject: [PATCH 2/4] Changed semantics based on PR comments. --- .../org/apache/spark/ContextCleaner.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index b2145ea6f3c6b..cffcd620381e7 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -77,18 +77,18 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { "spark.cleaner.referenceTracking.blocking", true) /** - * Whether to disable blocking on shuffle tasks. This override is effective only when - * blocking on cleanup tasks is enabled. + * Whether the cleaning thread will block on shuffle cleanup tasks. + * This overrides the global setting `blockOnCleanupTasks` * * When context cleaner is configured to block on every delete request, it can throw timeout * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this - * parameter disables blocking on shuffle cleanups when when `blockOnCleanupTasks` is true). - * Note that this does not affect the cleanup of RDDs and broadcasts. - * This is intended to be a temporary workaround, until the real Akka issue (referred to in - * the comment above `blockOnCleanupTasks`) is resolved. + * parameter by default disables blocking on shuffle cleanups. Note that this does not affect + * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, + * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is + * resolved. */ - private val disableBlockOnShuffleCleanupTasks = sc.conf.getBoolean( - "spark.cleaner.referenceTracking.disableBlockingForShuffles", true) + private val blockOnShuffleCleanupTasks = sc.conf.getBoolean( + "spark.cleaner.referenceTracking.blocking.shuffle", blockOnCleanupTasks) @volatile private var stopped = false @@ -142,8 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, - blocking = (blockOnCleanupTasks && !disableBlockOnShuffleCleanupTasks)) + doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } From 2181329d704b7aa79612c2026bca04e5b7fd8e8c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 26 Aug 2014 16:32:03 -0700 Subject: [PATCH 3/4] Mark shuffle cleanup as non-blocking. --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index cffcd620381e7..c1e1139fd848e 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -88,7 +88,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * resolved. */ private val blockOnShuffleCleanupTasks = sc.conf.getBoolean( - "spark.cleaner.referenceTracking.blocking.shuffle", blockOnCleanupTasks) + "spark.cleaner.referenceTracking.blocking.shuffle", false) @volatile private var stopped = false From 9c84202631ccc82a99179e7a9dbfdff3a1d32c55 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 26 Aug 2014 20:38:25 -0700 Subject: [PATCH 4/4] Restoring default blocking behavior in ContextCleanerSuite, and added docs to identify that spark.cleaner.referenceTracking.blocking does not control shuffle. --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 4 ++-- .../src/test/scala/org/apache/spark/ContextCleanerSuite.scala | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index c1e1139fd848e..ede1e23f4fcc5 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} /** - * Whether the cleaning thread will block on cleanup tasks. + * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which + * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). * * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary * workaround for the issue, which is ultimately caused by the way the BlockManager actors @@ -78,7 +79,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** * Whether the cleaning thread will block on shuffle cleanup tasks. - * This overrides the global setting `blockOnCleanupTasks` * * When context cleaner is configured to block on every delete request, it can throw timeout * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 4bc4346c0a288..2744894277ae8 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha .setMaster("local[2]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") + .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.shuffle.manager", shuffleManager.getName) before { @@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { .setMaster("local-cluster[2, 1, 512]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") + .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.shuffle.manager", shuffleManager.getName) sc = new SparkContext(conf2) @@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor .setMaster("local-cluster[2, 1, 512]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") + .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.shuffle.manager", shuffleManager.getName) sc = new SparkContext(conf2)