Skip to content

Commit

Permalink
remove details of checkpointer in pregel implementation sketch
Browse files Browse the repository at this point in the history
  • Loading branch information
ding committed Feb 19, 2017
1 parent dae94aa commit dd6c366
Showing 1 changed file with 1 addition and 18 deletions.
19 changes: 1 addition & 18 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,6 @@ class GraphOps[VD, ED] {
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
val checkpointInterval = graph.vertices.sparkContext.getConf
.getInt("spark.graphx.pregel.checkpointInterval", 10)
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
checkpointInterval, graph.vertices.sparkContext)
Expand All @@ -736,12 +734,11 @@ class GraphOps[VD, ED] {
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
var activeMessages = messages.count()

// Loop
// Loop until no messages remain or maxIterations is achieved
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog)
graphCheckpointer.update(g)

Expand All @@ -751,24 +748,10 @@ class GraphOps[VD, ED] {
// to have a long lineage chain.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messageCheckpointer.unpersistDataSet()
graphCheckpointer.deleteAllCheckpoints()
messageCheckpointer.deleteAllCheckpoints()
g
}
}
Expand Down

0 comments on commit dd6c366

Please sign in to comment.