From 15f78cd76787573201d2f19051bb7b4c21754ede Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 17 Mar 2015 17:53:08 +0900 Subject: [PATCH] Support incremental updates for an existing graph --- .../scala/org/apache/spark/graphx/Graph.scala | 9 +++++++++ .../spark/graphx/impl/EdgePartition.scala | 8 ++++++++ .../graphx/impl/EdgePartitionBuilder.scala | 4 ++++ .../apache/spark/graphx/impl/GraphImpl.scala | 20 ++++++++++++++++++- .../org/apache/spark/graphx/GraphSuite.scala | 20 +++++++++++++++++++ 5 files changed, 60 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 8494d06b1cdb7..7ccea9ec0b28d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -147,6 +147,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] + /** + * Append new vertices and edges into an existing Graph. + * This is useful for time-evolving graphs; new vertices and edges are built + * from streaming data in Spark Streaming, and then incrementally + * appended into an existing graph. + */ + def append(newEdges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy, + defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] + /** * Transforms each vertex attribute in the graph using the map function. * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 373af75448374..741065f01df6e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -95,6 +95,14 @@ class EdgePartition[ activeSet) } + /** Append new edges into existing edges. */ + def appendEdges(newEdges: Iterator[Edge[ED]]): EdgePartition[ED, VD] = { + val builder = new EdgePartitionBuilder[ED, VD]() + for (e <- iterator) builder.add(e) + for (e <- newEdges) builder.add(e) + builder.toEdgePartition + } + /** Return a new `EdgePartition` without any locally cached vertex attributes. */ def withoutVertexAttributes[VD2: ClassTag](): EdgePartition[ED, VD2] = { val newVertexAttrs = new Array[VD2](vertexAttrs.length) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 906d42328fcb9..fa6134e530dfd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -34,6 +34,10 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla edges += Edge(src, dst, d) } + def add(newEdge: Edge[ED]): Unit = { + edges += newEdge + } + def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array new Sorter(Edge.edgeArraySortDataFormat[ED]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 90a74d23a26cc..79814ac34835f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -101,7 +101,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] - val newEdges = edges.withPartitionsRDD(edges.map { e => + val newEdges = edges.withPartitionsRDD[ED, VD](edges.map { e => val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) (part, (e.srcId, e.dstId, e.attr)) } @@ -118,6 +118,24 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges) } + def append(newEdges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy, + defaultVertexAttr: VD): Graph[VD, ED] = { + val numPartitions = edges.partitions.size + val msgEdgePartitions = newEdges.map { e => + val pid: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) + (pid, e) + }.partitionBy(edges.partitioner.get) + val newEdgeRdd = edges.withPartitionsRDD[ED, VD]( + edges.partitionsRDD.zipPartitions(msgEdgePartitions, true) { (baseIter, newIter) => + baseIter.map { case (pid, ePart) => + val newEdgePart = ePart.appendEdges(newIter.map(e => e._2)) + (pid, newEdgePart) + } + }) + GraphImpl.fromEdgeRDD[VD, ED]( + newEdgeRdd, defaultVertexAttr, edges.targetStorageLevel, edges.targetStorageLevel) + } + override def reverse: Graph[VD, ED] = { new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse()) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index b61d9f0fbe5e4..adb7c16202e1a 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -149,6 +149,26 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("Graph#append updates an existing graph") { + withSpark { sc => + val edges = sc.parallelize(0L until 100L).map { i => Edge(i, 0L, 0) } + val strategy = PartitionStrategy.fromString("EdgePartition2D") + val graph = Graph.fromEdges[Int, Int](edges, 0).partitionBy(strategy, 2) + val verts = graph.aggregateMessages[Int]({ case ctx => + ctx.sendToDst(1) + }, _ + _) + assert(verts.map(v => v._2).reduce(_ + _) == 100) + + // Append new edges into an existing graph + val newEdges = sc.parallelize(100L until 150L).map { i => Edge(i, 0L, 0) } + val newGraph = graph.append(newEdges, strategy) + val newVerts = newGraph.aggregateMessages[Int]({ case ctx => + ctx.sendToDst(1) + }, _ + _) + assert(newVerts.map(v => v._2).reduce(_ + _) == 150) + } + } + test("mapVertices") { withSpark { sc => val n = 5