-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6847][Core][Streaming]Fix stack overflow issue when updateStateByKey is followed by a checkpointed dstream #10934
Conversation
…pointed dstream Add a local property to indicate if checkpointing all RDDs that are marked with the checkpoint flag, and enable it in Streaming
/cc @andrewor14 @tdas |
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag]( | |||
|
|||
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None | |||
|
|||
// Whether recursively checkpoint all RDDs that are marked with the checkpoint flag. | |||
private val recursiveCheckpoint = | |||
Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well its always recursive. The difference is whether checkpoint all that have been marked or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better name suggestion for this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"spark.checkpoint.checkpointAllMarked" ?? @andrewor14 thoughts.
Btw, shouldnt this be a constant variable in some object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hard one... I think checkpointAllMarkedAncestors
is least ambiguous
Test build #50141 has finished for PR 10934 at commit
|
retest this please |
Test build #50173 has finished for PR 10934 at commit
|
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag]( | |||
|
|||
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None | |||
|
|||
// Whether checkpoint all RDDs that are marked with the checkpoint flag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to expand on this comment:
// Whether to checkpoint all ancestor RDDs that are marked for checkpointing. By default,
// we stop as soon as we find the first such RDD, an optimization that allows us to write
// less data but is not safe for all workloads. E.g. in streaming we may checkpoint both
// an RDD and its parent in every batch, in which case the parent may never be checkpointed
// and its lineage never truncated, leading to OOMs in the long run (SPARK-6847).
Looks great! I only have documentation and test suggestions. |
Test build #50421 has finished for PR 10934 at commit
|
rdd.count() | ||
// Check the two state RDDs are both checkpointed | ||
rddsCheckpointed = stateRDDs.size == 2 && stateRDDs.forall(_.isCheckpointed) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm indentation is weird here?
LGTM, I'll merge this once you address the minor comments |
Test build #50448 has finished for PR 10934 at commit
|
Merged into master. |
I did not merge this into 1.6 and before for 2 reasons:
Let me know if you disagree. |
Add a local property to indicate if checkpointing all RDDs that are marked with the checkpoint flag, and enable it in Streaming