From 48364e64388355450c04605898eb443953e1a06e Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 29 Jan 2015 11:25:36 -0800 Subject: [PATCH] Checkpoint every 25 iterations in Pregel --- .../main/scala/org/apache/spark/graphx/Pregel.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 5e55620147df..1fbbb87363cb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -126,6 +126,8 @@ object Pregel extends Logging { // Loop var prevG: Graph[VD, ED] = null var i = 0 + val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty + val checkpointFrequency = 25 while (activeMessages > 0 && i < maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. val newVerts = g.vertices.innerJoin(messages)(vprog).cache() @@ -139,6 +141,14 @@ object Pregel extends Logging { // get to send messages. We must cache messages so it can be materialized on the next line, // allowing us to uncache the previous iteration. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() + + if (checkpoint && i % checkpointFrequency == checkpointFrequency - 1) { + logInfo("Checkpointing in iteration " + i) + g.vertices.checkpoint() + g.edges.checkpoint() + messages.checkpoint() + } + // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).