From 36cba8ca763e7df5a20d5ab015812354ade365c9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 26 Jan 2016 16:20:33 -0800 Subject: [PATCH 1/4] Fix stack overflow issue when updateStateByKey is followed by a checkpointed dstream Add a local property to indicate if checkpointing all RDDs that are marked with the checkpoint flag, and enable it in Streaming --- .../main/scala/org/apache/spark/rdd/RDD.scala | 9 +++++++ .../org/apache/spark/CheckpointSuite.scala | 15 +++++++++++ .../streaming/scheduler/JobGenerator.scala | 4 +++ .../spark/streaming/CheckpointSuite.scala | 27 +++++++++++++++++++ 4 files changed, 55 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9dad7944144d8..d4c449d1dd8a2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag]( private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None + // Whether recursively checkpoint all RDDs that are marked with the checkpoint flag. + private val recursiveCheckpoint = + Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false) + /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] @@ -1578,6 +1582,11 @@ abstract class RDD[T: ClassTag]( if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { + if (recursiveCheckpoint) { + // Checkpoint dependencies first because dependencies will be set to + // ReliableCheckpointRDD after checkpointing. + dependencies.foreach(_.rdd.doCheckpoint()) + } checkpointData.get.checkpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 390764ba242fd..a17a9dec375bf 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -512,6 +512,21 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd.isCheckpointedAndMaterialized === true) assert(rdd.partitions.size === 0) } + + runTest("recursive RDD checkpoint") { reliableCheckpoint: Boolean => + sc.setLocalProperty("spark.checkpoint.recursive", "true") + try { + val rdd1 = sc.parallelize(1 to 10) + checkpoint(rdd1, reliableCheckpoint) + val rdd2 = rdd1.map(_ + 1) + checkpoint(rdd2, reliableCheckpoint) + rdd2.count() + assert(rdd1.isCheckpointed === true) + assert(rdd2.isCheckpointed === true) + } finally { + sc.setLocalProperty("spark.checkpoint.recursive", null) + } + } } /** RDD partition that has large serialized size. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index a5a01e77639c4..cb510d25e8a24 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -243,6 +243,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) + + // Enable "spark.checkpoint.recursive" to make sure that all RDDs marked with the checkpoint + // flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847 + ssc.sparkContext.setLocalProperty("spark.checkpoint.recursive", "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 4a6b91fbc745e..4b56cb01bc918 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -821,6 +821,33 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester checkpointWriter.stop() } + test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") { + ssc = new StreamingContext(master, framework, batchDuration) + val batchCounter = new BatchCounter(ssc) + ssc.checkpoint(checkpointDir) + val inputDStream = new CheckpointInputDStream(ssc) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.sum + state.getOrElse(0)) + } + @volatile var recursiveCheckpoint = false + @volatile var rddsBothCheckpointed = false + inputDStream.map(i => (i, i)). + updateStateByKey[Int](updateFunc).checkpoint(batchDuration). + map(i => i).checkpoint(batchDuration). + foreachRDD { rdd => + recursiveCheckpoint = + Option(rdd.sparkContext.getLocalProperty("spark.checkpoint.recursive")). + map(_.toBoolean).getOrElse(false) + val stateRDD = rdd.firstParent + rdd.count() + rddsBothCheckpointed = stateRDD.isCheckpointed && rdd.isCheckpointed + } + ssc.start() + batchCounter.waitUntilBatchesCompleted(1, 10000) + assert(recursiveCheckpoint === true) + assert(rddsBothCheckpointed === true) + } + /** * Advances the manual clock on the streaming scheduler by given number of batches. * It also waits for the expected amount of time for each batch. From ef3983ba07ddb3c14f2946ef85f0445d491ce840 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 26 Jan 2016 21:11:24 -0800 Subject: [PATCH 2/4] Address TD's comments --- .../main/scala/org/apache/spark/rdd/RDD.scala | 10 +-- .../org/apache/spark/CheckpointSuite.scala | 6 +- .../streaming/scheduler/JobGenerator.scala | 7 ++- .../streaming/scheduler/JobScheduler.scala | 7 ++- .../spark/streaming/CheckpointSuite.scala | 61 +++++++++++++++---- 5 files changed, 67 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d4c449d1dd8a2..38a2bc041f77c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1535,9 +1535,9 @@ abstract class RDD[T: ClassTag]( private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None - // Whether recursively checkpoint all RDDs that are marked with the checkpoint flag. - private val recursiveCheckpoint = - Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false) + // Whether checkpoint all RDDs that are marked with the checkpoint flag. + private val checkpointAllMarked = + Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED)).map(_.toBoolean).getOrElse(false) /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { @@ -1582,7 +1582,7 @@ abstract class RDD[T: ClassTag]( if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { - if (recursiveCheckpoint) { + if (checkpointAllMarked) { // Checkpoint dependencies first because dependencies will be set to // ReliableCheckpointRDD after checkpointing. dependencies.foreach(_.rdd.doCheckpoint()) @@ -1706,6 +1706,8 @@ abstract class RDD[T: ClassTag]( */ object RDD { + private[spark] val CHECKPOINT_ALL_MARKED = "spark.checkpoint.checkpointAllMarked" + // The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, we still keep the old functions in SparkContext for backward diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index a17a9dec375bf..eda866cf281c4 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -513,8 +513,8 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd.partitions.size === 0) } - runTest("recursive RDD checkpoint") { reliableCheckpoint: Boolean => - sc.setLocalProperty("spark.checkpoint.recursive", "true") + runTest("checkpoint all marked RDDs") { reliableCheckpoint: Boolean => + sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, "true") try { val rdd1 = sc.parallelize(1 to 10) checkpoint(rdd1, reliableCheckpoint) @@ -524,7 +524,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd1.isCheckpointed === true) assert(rdd2.isCheckpointed === true) } finally { - sc.setLocalProperty("spark.checkpoint.recursive", null) + sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, null) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index cb510d25e8a24..6761596906d99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler import scala.util.{Failure, Success, Try} import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils} @@ -244,9 +245,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) - // Enable "spark.checkpoint.recursive" to make sure that all RDDs marked with the checkpoint - // flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847 - ssc.sparkContext.setLocalProperty("spark.checkpoint.recursive", "true") + // Enable "spark.checkpoint.checkpointAllMarked" to make sure that all RDDs marked with the + // checkpoint flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847 + ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9535c8e5b768a..82d4df9927962 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -23,10 +23,10 @@ import scala.collection.JavaConverters._ import scala.util.Failure import org.apache.spark.Logging -import org.apache.spark.rdd.PairRDDFunctions +import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.ui.UIUtils -import org.apache.spark.util.{EventLoop, ThreadUtils, Utils} +import org.apache.spark.util.{EventLoop, ThreadUtils} private[scheduler] sealed trait JobSchedulerEvent @@ -210,6 +210,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { s"""Streaming job from $batchLinkText""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) + // Enable "spark.checkpoint.checkpointAllMarked" to make sure that all RDDs marked with the + // checkpoint flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847 + ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, "true") // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 4b56cb01bc918..6174d6d96fe3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -822,6 +822,30 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") { + // In this test, there are two updateStateByKey operators. The RDD DAG is as follows: + // + // batch 1 batch 2 batch 3 ... + // + // 1) input rdd input rdd input rdd + // | | | + // 2) cogroup rdd ---> cogroup rdd ---> cogroup rdd ... + // | / | / | + // 3) map rdd --- map rdd --- map rdd ... + // | + // 4) cogroup rdd ---> cogroup rdd ---> cogroup rdd ... + // | / | / | + // 5) map rdd --- map rdd --- map rdd ... + // + // Every batch depends on its previous batch, so "updateStateByKey" needs to do checkpoint to + // break the RDD chain. However, before SPARK-6847, when the state RDD (layer 5) of the second + // "updateStateByKey" does checkpoint, it won't checkpoint the state RDD (layer 3) of the first + // "updateStateByKey" (Note: "updateStateByKey" has already marked that its state RDD (layer 3) + // should be checkpointed). Hence, the connections between layer 2 and layer 3 won't be broken + // and the RDD chain will grow infinitely and cause StackOverflow. + // + // Therefore SPARK-6847 introduces "spark.checkpoint.checkpointAllMarked" to force checkpointing + // all marked RDDs in the DAG to resolve this issue. (For the previous example, it will break + // connections between layer 2 and layer 3) ssc = new StreamingContext(master, framework, batchDuration) val batchCounter = new BatchCounter(ssc) ssc.checkpoint(checkpointDir) @@ -829,23 +853,36 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val updateFunc = (values: Seq[Int], state: Option[Int]) => { Some(values.sum + state.getOrElse(0)) } - @volatile var recursiveCheckpoint = false - @volatile var rddsBothCheckpointed = false - inputDStream.map(i => (i, i)). - updateStateByKey[Int](updateFunc).checkpoint(batchDuration). - map(i => i).checkpoint(batchDuration). - foreachRDD { rdd => - recursiveCheckpoint = - Option(rdd.sparkContext.getLocalProperty("spark.checkpoint.recursive")). + @volatile var checkpointAllMarkedRDDsEnable = false + @volatile var rddsCheckpointed = false + inputDStream.map(i => (i, i)) + .updateStateByKey(updateFunc).checkpoint(batchDuration) + .updateStateByKey(updateFunc).checkpoint(batchDuration) + .foreachRDD { rdd => + checkpointAllMarkedRDDsEnable = + Option(rdd.sparkContext.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED)). map(_.toBoolean).getOrElse(false) - val stateRDD = rdd.firstParent + + val stateRDDs = { + def findAllMarkedRDDs(_rdd: RDD[_], buffer: ArrayBuffer[RDD[_]]): Unit = { + if (_rdd.checkpointData.isDefined) { + buffer += _rdd + } + _rdd.dependencies.foreach(dep => findAllMarkedRDDs(dep.rdd, buffer)) + } + + val buffer = new ArrayBuffer[RDD[_]] + findAllMarkedRDDs(rdd, buffer) + buffer.toSeq + } rdd.count() - rddsBothCheckpointed = stateRDD.isCheckpointed && rdd.isCheckpointed + // Check the two state RDDs are both checkpointed + rddsCheckpointed = stateRDDs.size == 2 && stateRDDs.forall(_.isCheckpointed) } ssc.start() batchCounter.waitUntilBatchesCompleted(1, 10000) - assert(recursiveCheckpoint === true) - assert(rddsBothCheckpointed === true) + assert(checkpointAllMarkedRDDsEnable === true) + assert(rddsCheckpointed === true) } /** From 97e39c045f3ee16713b2015150ba12a0815d7fc4 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 29 Jan 2016 16:26:36 -0800 Subject: [PATCH 3/4] Address Andrew's comments --- .../main/scala/org/apache/spark/rdd/RDD.scala | 18 +++++--- .../org/apache/spark/CheckpointSuite.scala | 14 ++++-- .../streaming/scheduler/JobGenerator.scala | 6 +-- .../streaming/scheduler/JobScheduler.scala | 6 +-- .../spark/streaming/CheckpointSuite.scala | 45 ++++++++++--------- 5 files changed, 54 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 38a2bc041f77c..4d51eac73f7d2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1535,9 +1535,14 @@ abstract class RDD[T: ClassTag]( private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None - // Whether checkpoint all RDDs that are marked with the checkpoint flag. + // Whether to checkpoint all ancestor RDDs that are marked for checkpointing. By default, + // we stop as soon as we find the first such RDD, an optimization that allows us to write + // less data but is not safe for all workloads. E.g. in streaming we may checkpoint both + // an RDD and its parent in every batch, in which case the parent may never be checkpointed + // and its lineage never truncated, leading to OOMs in the long run (SPARK-6847). private val checkpointAllMarked = - Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED)).map(_.toBoolean).getOrElse(false) + Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)) + .map(_.toBoolean).getOrElse(false) /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { @@ -1583,8 +1588,10 @@ abstract class RDD[T: ClassTag]( doCheckpointCalled = true if (checkpointData.isDefined) { if (checkpointAllMarked) { - // Checkpoint dependencies first because dependencies will be set to - // ReliableCheckpointRDD after checkpointing. + // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint + // them in parallel. + // Checkpoint parents first because our lineage will be truncated after we + // checkpoint ourselves dependencies.foreach(_.rdd.doCheckpoint()) } checkpointData.get.checkpoint() @@ -1706,7 +1713,8 @@ abstract class RDD[T: ClassTag]( */ object RDD { - private[spark] val CHECKPOINT_ALL_MARKED = "spark.checkpoint.checkpointAllMarked" + private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS = + "spark.checkpoint.checkpointAllMarkedAncestors" // The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index eda866cf281c4..ce35856dce3f7 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -513,18 +513,24 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd.partitions.size === 0) } - runTest("checkpoint all marked RDDs") { reliableCheckpoint: Boolean => - sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, "true") + runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean => + testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = true) + testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = false) + } + + private def testCheckpointAllMarkedAncestors( + reliableCheckpoint: Boolean, checkpointAllMarkedAncestors: Boolean): Unit = { + sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, checkpointAllMarkedAncestors.toString) try { val rdd1 = sc.parallelize(1 to 10) checkpoint(rdd1, reliableCheckpoint) val rdd2 = rdd1.map(_ + 1) checkpoint(rdd2, reliableCheckpoint) rdd2.count() - assert(rdd1.isCheckpointed === true) + assert(rdd1.isCheckpointed === checkpointAllMarkedAncestors) assert(rdd2.isCheckpointed === true) } finally { - sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, null) + sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, null) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 6761596906d99..a3ad5eaa40edc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -245,9 +245,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) - // Enable "spark.checkpoint.checkpointAllMarked" to make sure that all RDDs marked with the - // checkpoint flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847 - ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, "true") + // Checkpoint all RDDs marked for checkpointing to ensure their lineages are + // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). + ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 82d4df9927962..3fed3d88354c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -210,9 +210,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { s"""Streaming job from $batchLinkText""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) - // Enable "spark.checkpoint.checkpointAllMarked" to make sure that all RDDs marked with the - // checkpoint flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847 - ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED, "true") + // Checkpoint all RDDs marked for checkpointing to ensure their lineages are + // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). + ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6174d6d96fe3a..17c25ef45c51a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -828,12 +828,16 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester // // 1) input rdd input rdd input rdd // | | | + // v v v // 2) cogroup rdd ---> cogroup rdd ---> cogroup rdd ... // | / | / | + // v / v / v // 3) map rdd --- map rdd --- map rdd ... - // | + // | | | + // v v v // 4) cogroup rdd ---> cogroup rdd ---> cogroup rdd ... // | / | / | + // v / v / v // 5) map rdd --- map rdd --- map rdd ... // // Every batch depends on its previous batch, so "updateStateByKey" needs to do checkpoint to @@ -853,35 +857,36 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val updateFunc = (values: Seq[Int], state: Option[Int]) => { Some(values.sum + state.getOrElse(0)) } - @volatile var checkpointAllMarkedRDDsEnable = false + @volatile var shouldCheckpointAllMarkedRDDs = false @volatile var rddsCheckpointed = false inputDStream.map(i => (i, i)) .updateStateByKey(updateFunc).checkpoint(batchDuration) .updateStateByKey(updateFunc).checkpoint(batchDuration) .foreachRDD { rdd => - checkpointAllMarkedRDDsEnable = - Option(rdd.sparkContext.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED)). - map(_.toBoolean).getOrElse(false) - - val stateRDDs = { - def findAllMarkedRDDs(_rdd: RDD[_], buffer: ArrayBuffer[RDD[_]]): Unit = { - if (_rdd.checkpointData.isDefined) { - buffer += _rdd - } - _rdd.dependencies.foreach(dep => findAllMarkedRDDs(dep.rdd, buffer)) + /** + * Find all RDDs that are marked for checkpointing in the specified RDD and its ancestors. + */ + def findAllMarkedRDDs(rdd: RDD[_]): List[RDD[_]] = { + val markedRDDs = rdd.dependencies.flatMap(dep => findAllMarkedRDDs(dep.rdd)).toList + if (rdd.checkpointData.isDefined) { + rdd :: markedRDDs + } else { + markedRDDs } + } + + shouldCheckpointAllMarkedRDDs = + Option(rdd.sparkContext.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)). + map(_.toBoolean).getOrElse(false) - val buffer = new ArrayBuffer[RDD[_]] - findAllMarkedRDDs(rdd, buffer) - buffer.toSeq + val stateRDDs = findAllMarkedRDDs(rdd) + rdd.count() + // Check the two state RDDs are both checkpointed + rddsCheckpointed = stateRDDs.size == 2 && stateRDDs.forall(_.isCheckpointed) } - rdd.count() - // Check the two state RDDs are both checkpointed - rddsCheckpointed = stateRDDs.size == 2 && stateRDDs.forall(_.isCheckpointed) - } ssc.start() batchCounter.waitUntilBatchesCompleted(1, 10000) - assert(checkpointAllMarkedRDDsEnable === true) + assert(shouldCheckpointAllMarkedRDDs === true) assert(rddsCheckpointed === true) } From 20e45095506067f3f5195470e3a390cd4872e531 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sat, 30 Jan 2016 15:00:41 -0800 Subject: [PATCH 4/4] Rename and fix indentation --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../org/apache/spark/streaming/CheckpointSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4d51eac73f7d2..f26f49f3cbcc8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1540,7 +1540,7 @@ abstract class RDD[T: ClassTag]( // less data but is not safe for all workloads. E.g. in streaming we may checkpoint both // an RDD and its parent in every batch, in which case the parent may never be checkpointed // and its lineage never truncated, leading to OOMs in the long run (SPARK-6847). - private val checkpointAllMarked = + private val checkpointAllMarkedAncestors = Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)) .map(_.toBoolean).getOrElse(false) @@ -1587,7 +1587,7 @@ abstract class RDD[T: ClassTag]( if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { - if (checkpointAllMarked) { + if (checkpointAllMarkedAncestors) { // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint // them in parallel. // Checkpoint parents first because our lineage will be truncated after we diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 17c25ef45c51a..786703eb9a84e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -880,10 +880,10 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester map(_.toBoolean).getOrElse(false) val stateRDDs = findAllMarkedRDDs(rdd) - rdd.count() - // Check the two state RDDs are both checkpointed - rddsCheckpointed = stateRDDs.size == 2 && stateRDDs.forall(_.isCheckpointed) - } + rdd.count() + // Check the two state RDDs are both checkpointed + rddsCheckpointed = stateRDDs.size == 2 && stateRDDs.forall(_.isCheckpointed) + } ssc.start() batchCounter.waitUntilBatchesCompleted(1, 10000) assert(shouldCheckpointAllMarkedRDDs === true)