Skip to content

Commit

Permalink
review commit
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Aug 1, 2014
1 parent cef1f1a commit f367358
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (rdd.conf.getBoolean("spark.cleaner.checkpointData.enabled", false)) {
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}

test("automatically cleanup checkpoint data") {
sc.stop()
val conf=new SparkConf().setMaster("local[2]").setAppName("cleanupCheckpointData").
set("spark.cleaner.checkpointData.enabled","true")
sc =new SparkContext(conf)
set("spark.cleaner.referenceTracking.cleanCheckpoints","true")
sc = new SparkContext(conf)
val checkpointDir = java.io.File.createTempFile("temp", "")
checkpointDir.deleteOnExit()
checkpointDir.delete()
Expand All @@ -223,8 +224,10 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val fs = path.getFileSystem(sc.hadoopConfiguration)
assert(fs.exists(path))
}

// Test that GC causes checkpoint data cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
rdd = null
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
RDDCheckpointData.rddCheckpointDataPath(sc, rddId).foreach { path =>
Expand Down

0 comments on commit f367358

Please sign in to comment.