Skip to content

Commit

Permalink
update checkpoint interval and some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ding committed Feb 17, 2017
1 parent f2efef6 commit 194dc27
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
4 changes: 3 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @param maxIterations the maximum number of iterations to run for
*
* @param checkpointInterval the checkpoint interval
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
Expand All @@ -362,7 +364,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
checkpointInterval: Int = 25,
checkpointInterval: Int = 10,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
Expand Down
8 changes: 4 additions & 4 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object Pregel extends Logging {
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either,
checkpointInterval: Int = 25)
checkpointInterval: Int = 10)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
Expand Down Expand Up @@ -149,8 +149,8 @@ object Pregel extends Logging {

val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
// and periodic checkpoint messages so it can be materialized on the next line, and avoid
// to have a long lineage chain.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
Expand All @@ -168,7 +168,7 @@ object Pregel extends Logging {
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
messageCheckpointer.unpersist(messages)
graphCheckpointer.deleteAllCheckpoints()
messageCheckpointer.deleteAllCheckpoints()
g
Expand Down
2 changes: 1 addition & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.numTrees"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy")
)
}

Expand Down

0 comments on commit 194dc27

Please sign in to comment.