Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6847][Core][Streaming]Fix stack overflow issue when updateStateByKey is followed by a checkpointed dstream #10934

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

// Whether recursively checkpoint all RDDs that are marked with the checkpoint flag.
private val recursiveCheckpoint =
Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well its always recursive. The difference is whether checkpoint all that have been marked or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name suggestion for this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"spark.checkpoint.checkpointAllMarked" ?? @andrewor14 thoughts.

Btw, shouldnt this be a constant variable in some object?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hard one... I think checkpointAllMarkedAncestors is least ambiguous


/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
Expand Down Expand Up @@ -1578,6 +1582,11 @@ abstract class RDD[T: ClassTag](
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
if (recursiveCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether whether we can collect all the RDDs that needs to be checkpointed, and then checkpoint them in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! Can we add a TODO?

// Checkpoint dependencies first because dependencies will be set to
// ReliableCheckpointRDD after checkpointing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Checkpoint parents first because our lineage will be truncated after we
// checkpoint ourselves

dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,21 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
assert(rdd.isCheckpointedAndMaterialized === true)
assert(rdd.partitions.size === 0)
}

runTest("recursive RDD checkpoint") { reliableCheckpoint: Boolean =>
sc.setLocalProperty("spark.checkpoint.recursive", "true")
try {
val rdd1 = sc.parallelize(1 to 10)
checkpoint(rdd1, reliableCheckpoint)
val rdd2 = rdd1.map(_ + 1)
checkpoint(rdd2, reliableCheckpoint)
rdd2.count()
assert(rdd1.isCheckpointed === true)
assert(rdd2.isCheckpointed === true)
} finally {
sc.setLocalProperty("spark.checkpoint.recursive", null)
}
}
}

/** RDD partition that has large serialized size. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)

// Enable "spark.checkpoint.recursive" to make sure that all RDDs marked with the checkpoint
// flag are all checkpointed to avoid the stack overflow issue. See SPARK-6847
ssc.sparkContext.setLocalProperty("spark.checkpoint.recursive", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this in "JobGenerator" only, and not also in the JobScheduler that is actually running the jobs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

localProperties uses InheritableThreadLocal so they will be inherited by child threads.

However, looks it's better not to depend on this implicit assumption and just set it explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES. Please set it explicitly in both JobGenerator and JobScheduler.
There are other variables like this that are set in the JobScheduler, please put it in the same place.

Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,33 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
checkpointWriter.stop()
}

test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") {
ssc = new StreamingContext(master, framework, batchDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add a bit of context in the state, with may be a ASCII art showing the DAG structure, to explain how the RDDs are organized.

val batchCounter = new BatchCounter(ssc)
ssc.checkpoint(checkpointDir)
val inputDStream = new CheckpointInputDStream(ssc)
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.sum + state.getOrElse(0))
}
@volatile var recursiveCheckpoint = false
@volatile var rddsBothCheckpointed = false
inputDStream.map(i => (i, i)).
updateStateByKey[Int](updateFunc).checkpoint(batchDuration).
map(i => i).checkpoint(batchDuration).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you test this using two updateStateByKey

also this code style is weird and hard to read.

foreachRDD { rdd =>
recursiveCheckpoint =
Option(rdd.sparkContext.getLocalProperty("spark.checkpoint.recursive")).
map(_.toBoolean).getOrElse(false)
val stateRDD = rdd.firstParent
rdd.count()
rddsBothCheckpointed = stateRDD.isCheckpointed && rdd.isCheckpointed
}
ssc.start()
batchCounter.waitUntilBatchesCompleted(1, 10000)
assert(recursiveCheckpoint === true)
assert(rddsBothCheckpointed === true)
}

/**
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
Expand Down