From b08b3c994902629b54808c27841335fb6ca2715d Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Fri, 17 Apr 2015 09:56:11 +0800 Subject: [PATCH 1/2] Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint --- .../main/scala/org/apache/spark/ContextCleaner.scala | 2 ++ .../scala/org/apache/spark/ContextCleanerSuite.scala | 12 ++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 715b259057569..37198d887b07b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { try { logDebug("Cleaning rdd checkpoint data " + rddId) RDDCheckpointData.clearRDDCheckpointData(sc, rddId) + listeners.foreach(_.checkpointCleaned(rddId)) logInfo("Cleaned rdd checkpoint data " + rddId) } catch { @@ -260,4 +261,5 @@ private[spark] trait CleanerListener { def shuffleCleaned(shuffleId: Int) def broadcastCleaned(broadcastId: Long) def accumCleaned(accId: Long) + def checkpointCleaned(rddId: Long) } diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 097e7076e5391..2d50d987b9db3 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -406,12 +406,14 @@ class CleanerTester( sc: SparkContext, rddIds: Seq[Int] = Seq.empty, shuffleIds: Seq[Int] = Seq.empty, - broadcastIds: Seq[Long] = Seq.empty) + broadcastIds: Seq[Long] = Seq.empty, + checkpointIds: Seq[Long] = Seq.empty) extends Logging { val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds + val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds val isDistributed = !sc.isLocal val cleanerListener = new CleanerListener { @@ -433,6 +435,11 @@ class CleanerTester( def accumCleaned(accId: Long): Unit = { logInfo("Cleaned accId " + accId + " cleaned") } + + def checkpointCleaned(rddId: Long): Unit = { + toBeCheckpointIds -= rddId + logInfo("checkpoint rddId " + rddId + " cleaned") + } } val MAX_VALIDATION_ATTEMPTS = 10 @@ -547,7 +554,8 @@ class CleanerTester( private def isAllCleanedUp = toBeCleanedRDDIds.isEmpty && toBeCleanedShuffleIds.isEmpty && - toBeCleanedBroadcstIds.isEmpty + toBeCleanedBroadcstIds.isEmpty && + toBeCheckpointIds.isEmpty private def getRDDBlocks(rddId: Int): Seq[BlockId] = { blockManager.master.getMatchingBlockIds( _ match { From 964aea74235cfaf0150270e256eef00dbabb0b85 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Fri, 17 Apr 2015 17:25:06 +0800 Subject: [PATCH 2/2] review commits --- .../scala/org/apache/spark/ContextCleanerSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 2d50d987b9db3..c7868ddcf770f 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { assert(fs.exists(path)) // the checkpoint is not cleaned by default (without the configuration set) - var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil) + var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil) rdd = null // Make RDD out of scope runGC() postGCTester.assertCleanup() @@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get)) // Test that GC causes checkpoint data cleanup after dereferencing the RDD - postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil) + postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId)) rdd = null // Make RDD out of scope runGC() postGCTester.assertCleanup() @@ -429,7 +429,7 @@ class CleanerTester( def broadcastCleaned(broadcastId: Long): Unit = { toBeCleanedBroadcstIds -= broadcastId - logInfo("Broadcast" + broadcastId + " cleaned") + logInfo("Broadcast " + broadcastId + " cleaned") } def accumCleaned(accId: Long): Unit = { @@ -438,7 +438,7 @@ class CleanerTester( def checkpointCleaned(rddId: Long): Unit = { toBeCheckpointIds -= rddId - logInfo("checkpoint rddId " + rddId + " cleaned") + logInfo("checkpoint " + rddId + " cleaned") } } @@ -463,7 +463,8 @@ class CleanerTester( /** Verify that RDDs, shuffles, etc. occupy resources */ private def preCleanupValidate() { - assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup") + assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty || + checkpointIds.nonEmpty, "Nothing to cleanup") // Verify the RDDs have been persisted and blocks are present rddIds.foreach { rddId =>