Skip to content

Commit

Permalink
[SPARK-3623][GraphX] GraphX should support the checkpoint operation
Browse files Browse the repository at this point in the history
Author: GuoQiang Li <witgo@qq.com>

Closes #2631 from witgo/SPARK-3623 and squashes the following commits:

a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation
  • Loading branch information
witgo authored and ankurdave committed Dec 6, 2014
1 parent 6eb1b6f commit e895e0c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
8 changes: 8 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def cache(): Graph[VD, ED]

/**
* Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint
* directory set with SparkContext.setCheckpointDir() and all references to its parent
* RDDs will be removed. It is strongly recommended that this Graph is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit

/**
* Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This method can be used to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
this
}

override def checkpoint(): Unit = {
vertices.checkpoint()
replicatedVertexView.edges.checkpoint()
}

override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking)
// TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
Expand Down
21 changes: 21 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.graphx

import org.scalatest.FunSuite

import com.google.common.io.Files

import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
Expand Down Expand Up @@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("checkpoint") {
val checkpointDir = Files.createTempDir()
checkpointDir.deleteOnExit()
withSpark { sc =>
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
val rdd = sc.parallelize(ring)
val graph = Graph.fromEdges(rdd, 1.0F)
graph.checkpoint()
graph.edges.map(_.attr).count()
graph.vertices.map(_._2).count()

val edgesDependencies = graph.edges.partitionsRDD.dependencies
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
}
}

}

0 comments on commit e895e0c

Please sign in to comment.