Skip to content

Commit

Permalink
add config value to controll checkpoint interval in pergel
Browse files Browse the repository at this point in the history
  • Loading branch information
ding committed Feb 17, 2017
1 parent 194dc27 commit 9d7e796
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ private[spark] abstract class PeriodicCheckpointer[T](
/** Get list of checkpoint files for this given Dataset */
protected def getCheckpointFiles(data: T): Iterable[String]

/**
* Call this to unpersist the Dataset.
*/
def unpersistDataSet(): Unit = {
while (persistedQueue.nonEmpty) {
val dataToUnpersist = persistedQueue.dequeue()
unpersist(dataToUnpersist)
}
}

/**
* Call this at the end to delete any remaining checkpoint files.
*/
Expand Down
6 changes: 1 addition & 5 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @param maxIterations the maximum number of iterations to run for
*
* @param checkpointInterval the checkpoint interval
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
Expand All @@ -364,14 +362,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
checkpointInterval: Int = 10,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection,
checkpointInterval)(vprog, sendMsg, mergeMsg)
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}

/**
Expand Down
7 changes: 4 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ object Pregel extends Logging {
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either,
checkpointInterval: Int = 10)
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
Expand All @@ -126,6 +125,8 @@ object Pregel extends Logging {
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")

val checkpointInterval = graph.vertices.sparkContext.getConf
.getInt("spark.graphx.pregel.checkpointInterval", 10)
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
checkpointInterval, graph.vertices.sparkContext)
Expand Down Expand Up @@ -168,7 +169,7 @@ object Pregel extends Logging {
// count the iteration
i += 1
}
messageCheckpointer.unpersist(messages)
messageCheckpointer.unpersistDataSet()
graphCheckpointer.deleteAllCheckpoints()
messageCheckpointer.deleteAllCheckpoints()
g
Expand Down
7 changes: 1 addition & 6 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"),

// [SPARK-17161] Removing Python-friendly constructors not needed
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),

// SPARK-5484 Periodically do checkpoint in Pregel
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.GraphOps.pregel"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.graphx.GraphOps.pregel$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this")
)

// Exclude rules for 2.1.x
Expand Down

0 comments on commit 9d7e796

Please sign in to comment.