From 159f9d7a25c2821edb968e2327b1d3de7d3852cf Mon Sep 17 00:00:00 2001 From: Anderson de Andrade Date: Fri, 27 May 2016 17:26:05 -0400 Subject: [PATCH 1/2] Add local checkpointing to GraphX. --- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 5 +++++ .../scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 5 +++++ .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 6 ++++++ .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 5 +++++ 4 files changed, 21 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 869caa340f52b..2beaca8363797 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -104,6 +104,11 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def checkpoint(): Unit + /** + * Mark this RDD for local checkpointing using Spark's existing caching layer. + */ + def localCheckpoint(): Graph[VD, ED] + /** * Return whether this Graph has been checkpointed or not. * This returns true iff both the vertices RDD and edges RDD have been checkpointed. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index c88b2f65a86cd..4ffe675b7607c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -76,6 +76,11 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( partitionsRDD.checkpoint() } + override def localCheckpoint(): this.type = { + partitionsRDD.localCheckpoint() + this + } + override def isCheckpointed: Boolean = { firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index da95314440d86..b73cd8d57e021 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -70,6 +70,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( replicatedVertexView.edges.checkpoint() } + override def localCheckpoint(): Graph[VD, ED] = { + vertices.localCheckpoint() + replicatedVertexView.edges.localCheckpoint() + this + } + override def isCheckpointed: Boolean = { vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 7f4e7e9d79d6b..475a5af705247 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -77,6 +77,11 @@ class VertexRDDImpl[VD] private[graphx] ( partitionsRDD.checkpoint() } + override def localCheckpoint(): this.type = { + partitionsRDD.localCheckpoint() + this + } + override def isCheckpointed: Boolean = { firstParent[ShippableVertexPartition[VD]].isCheckpointed } From 75fea68ff86d012a744d609ecceab017f6f02cae Mon Sep 17 00:00:00 2001 From: Anderson de Andrade Date: Tue, 31 May 2016 19:30:41 -0400 Subject: [PATCH 2/2] Fix MiMa test. --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8b95909179036..43b5fdc791944 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -165,6 +165,9 @@ object MimaExcludes { // SPARK-12591 Register OpenHashMapBasedStateMap for Kryo ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.serializer.KryoInputDataInputBridge"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.serializer.KryoOutputDataOutputBridge") + ) ++ Seq( + // SPARK-12431 Add local checkpointing to GraphX + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.localCheckpoint") ) case v if v.startsWith("1.5") => Seq(