From 75d63cbd8f471cbc8c783cedfb6afe9fab3c7189 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 16 Apr 2014 13:28:23 -0700 Subject: [PATCH] Rebuild routing table after Graph.reverse GraphImpl.reverse used to reverse edges in each partition of the edge RDD but preserve the routing table and replicated vertex view, since reversing should not affect partitioning. However, the old routing table would then have incorrect information for srcAttrOnly and dstAttrOnly. These RDDs should be switched. A simple fix is for Graph.reverse to rebuild the routing table and replicated vertex view. Thanks to Bogdan Ghidireac for reporting this issue on the mailing list. --- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 2 +- .../scala/org/apache/spark/graphx/GraphSuite.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index c2b510a31ee3f..9eabccdee48db 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def reverse: Graph[VD, ED] = { val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) - new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) + GraphImpl(vertices, newETable) } override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index c65e36636fe10..d9ba4672ce0c5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("reverse with join elimination") { + withSpark { sc => + val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2))) + val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0))) + val graph = Graph(vertices, edges).reverse + val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _) + assert(result.collect.toSet === Set((1L, 2))) + } + } + test("subgraph") { withSpark { sc => // Create a star graph of 10 veritces.