Skip to content

Commit

Permalink
[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
Browse files Browse the repository at this point in the history
create verticesDeduplicate with reduceByKey, using mergeFunc
then proceed with verticesDedup
  • Loading branch information
larryxiao committed Aug 12, 2014
1 parent b715aa0 commit 20d80a3
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,10 @@ object VertexRDD {
def apply[VD: ClassTag](
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2))
val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match {
case Some(p) => verticesDedup
case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size))
}
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
Expand Down

0 comments on commit 20d80a3

Please sign in to comment.