-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2490] Change recursive visiting on RDD dependencies to iterative approach #1418
Conversation
… to avoid stackoverflowerror.
Another example of this problem is the PageRank example bundled in Spark. At this time, since the problem of Java serializer still exists, to avoid causing StackOverflowError after too many iterations, it is needed to call |
Thanks for submitting this. I think we can still stack overflow in serialization, but I agree it's better to do this non-recursivley. |
Actually it's late. I will review this tomorrow. |
ok to test |
Jenkins, ok to test. |
Thanks for commenting. How about the review? |
Jenkins, test this please |
if (i % 50 == 0) { | ||
ranks.cache() | ||
ranks.checkpoint() | ||
ranks.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't do a collect, if you want to force it to be computed, just do a foreach
. Otherwise collect
will try to bring all the data back to the driver.
QA tests have started for PR 1418. This patch merges cleanly. |
QA results for PR 1418: |
Hey @viirya, instead of modifying the PageRank example, what do you think of leaving it as-is until we have automatic checkpointing of long lineage chains? I think that will be better because the example is meant to be easy to understand and not that many people will run PageRank with hundreds of iterations (this particular algorithm usually converges much faster). |
BTW the Jenkins failure is due to a code style issue: an if block without braces Jenkins, this is ok to test |
} | ||
} | ||
} | ||
} | ||
visit(rdd) | ||
waitingForVisit.push(rdd) | ||
while (!waitingForVisit.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have braces around the loop body
BTW you can run sbt scalastyle to check these style things locally |
@mateiz Thanks for suggestion. I leave the PageRank example as-is. These braces are added to comply with code style. |
Jenkins, test this please |
Jenkins, retest this please. |
QA tests have started for PR 1418. This patch merges cleanly. |
QA results for PR 1418: |
val parents = new Stack[ShuffleDependency[_, _, _]] | ||
val visited = new HashSet[RDD[_]] | ||
val waitingForVisit = new Stack[RDD[_]] | ||
def visit(r: RDD[_]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why define a function here? seems like this is only used once? why not just inline it in the while?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I let the codes as the function getParentShuffleDependencies
because it contains multiple indents and so put it under the case
statement would not be readable. I can make it as inline if this is an issue.
parents.toList | ||
} | ||
|
||
private def getParentShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should actually be called getAncestorShuffleDependencies because it's not just direct parents, it can be grandparents and such as well.
Also, please add a comment at the top of this saying that what it finds. In particular it only finds missing ones.
Finally, would it be possible to merge this with the code that calls it in getShuffleMapStage, e.g. have a method called registerShuffleDependencies? Might be easier to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add new commit for several changes including function name, comments. Please review if it is fine.
Jenkins, test this please |
QA tests have started for PR 1418. This patch merges cleanly. |
QA results for PR 1418: |
Jenkins, test this please |
QA tests have started for PR 1418. This patch merges cleanly. |
QA results for PR 1418: |
Thanks Liang-Chi; I've merged this in. |
…ve approach When performing some transformations on RDDs after many iterations, the dependencies of RDDs could be very long. It can easily cause StackOverflowError when recursively visiting these dependencies in Spark core. For example: var rdd = sc.makeRDD(Array(1)) for (i <- 1 to 1000) { rdd = rdd.coalesce(1).cache() rdd.collect() } This PR changes recursive visiting on rdd's dependencies to iterative approach to avoid StackOverflowError. In addition to the recursive visiting, since the Java serializer has a known [bug](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4152790) that causes StackOverflowError too when serializing/deserializing a large graph of objects. So applying this PR only solves part of the problem. Using KryoSerializer to replace Java serializer might be helpful. However, since KryoSerializer is not supported for `spark.closure.serializer` now, I can not test if KryoSerializer can solve Java serializer's problem completely. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#1418 from viirya/remove_recursive_visit and squashes the following commits: 6b2c615 [Liang-Chi Hsieh] change function name; comply with code style. 5f072a7 [Liang-Chi Hsieh] add comments to explain Stack usage. 8742dbb [Liang-Chi Hsieh] comply with code style. 900538b [Liang-Chi Hsieh] change recursive visiting on rdd's dependencies to iterative approach to avoid stackoverflowerror.
When performing some transformations on RDDs after many iterations, the dependencies of RDDs could be very long. It can easily cause StackOverflowError when recursively visiting these dependencies in Spark core. For example:
This PR changes recursive visiting on rdd's dependencies to iterative approach to avoid StackOverflowError.
In addition to the recursive visiting, since the Java serializer has a known bug that causes StackOverflowError too when serializing/deserializing a large graph of objects. So applying this PR only solves part of the problem. Using KryoSerializer to replace Java serializer might be helpful. However, since KryoSerializer is not supported for
spark.closure.serializer
now, I can not test if KryoSerializer can solve Java serializer's problem completely.