Skip to content

Commit

Permalink
[SPARK-9109] [GRAPHX] Keep the cached edge in the graph
Browse files Browse the repository at this point in the history
The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted.

```java
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges

graph.unpersist()

sc.getPersistentRDDs.foreach( r => println( r._2.toString))
```

Author: tien-dungle <tien-dung.le@realimpactanalytics.com>

Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits:

8d87997 [tien-dungle] Keep the cached edge in the graph

(cherry picked from commit 587c315)
Signed-off-by: Ankur Dave <ankurdave@gmail.com>
  • Loading branch information
tien-dungle authored and ankurdave committed Jul 17, 2015
1 parent bb14015 commit f34f3d7
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,9 @@ object GraphImpl {
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
.withTargetStorageLevel(edgeStorageLevel).cache()
.withTargetStorageLevel(edgeStorageLevel)
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel).cache()
.withTargetStorageLevel(vertexStorageLevel)
GraphImpl(vertexRDD, edgeRDD)
}

Expand All @@ -346,9 +346,14 @@ object GraphImpl {
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {

vertices.cache()

// Convert the vertex partitions in edges to the correct type
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
.cache()

GraphImpl.fromExistingRDDs(vertices, newEdges)
}

Expand Down

0 comments on commit f34f3d7

Please sign in to comment.