From ad428d564ae6b831f0dc44df4582f10c3c11b5e4 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 19 Feb 2016 16:12:53 +0800 Subject: [PATCH 1/5] add numIter --- .../main/scala/org/apache/spark/graphx/GraphOps.scala | 4 ++-- .../apache/spark/graphx/lib/ConnectedComponents.scala | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index d048fb5d561f3..ac9bebbfc614c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -411,8 +411,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(): Graph[VertexId, ED] = { - ConnectedComponents.run(graph) + def connectedComponents(numIter = Int.MaxValue): Graph[VertexId, ED] = { + ConnectedComponents.run(graph, numIter) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index f72cbb15242ec..0fea565cfe302 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -29,13 +29,14 @@ object ConnectedComponents { * * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) - * * @param graph the graph for which to compute the connected components - * + * @param numIter the maximum number of iterations to run for * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numIter: Int = Int.MaxValue): Graph[VertexId, ED] = { + require(numIter > 0) val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { @@ -47,7 +48,8 @@ object ConnectedComponents { } } val initialMessage = Long.MaxValue - val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( + val pregelGraph = Pregel(ccGraph, initialMessage, + numIter, EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) From 918e7812db5efa22329c0c40b99c234c2ebc7c3b Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 19 Feb 2016 17:24:08 +0800 Subject: [PATCH 2/5] fix example --- .../scala/org/apache/spark/examples/graphx/SynthBenchmark.scala | 2 +- graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 41ca5cbb9f083..b480eb76c7e9c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -117,7 +117,7 @@ object SynthBenchmark { println(s"Total PageRank = $totalPR") } else if (app == "cc") { println("Running Connected Components") - val numComponents = graph.connectedComponents.vertices.map(_._2).distinct().count() + val numComponents = graph.connectedComponents().vertices.map(_._2).distinct().count() println(s"Number of components = $numComponents") } val runTime = System.currentTimeMillis() - startTime diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index ac9bebbfc614c..0d98360481dfc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -411,7 +411,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(numIter = Int.MaxValue): Graph[VertexId, ED] = { + def connectedComponents(numIter: Int = Int.MaxValue): Graph[VertexId, ED] = { ConnectedComponents.run(graph, numIter) } From 1b23afb176b563f6e1a1a59646fce766ec5f89c4 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sat, 20 Feb 2016 10:34:38 +0800 Subject: [PATCH 3/5] add an api --- .../spark/examples/graphx/SynthBenchmark.scala | 2 +- .../scala/org/apache/spark/graphx/GraphOps.scala | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index b480eb76c7e9c..41ca5cbb9f083 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -117,7 +117,7 @@ object SynthBenchmark { println(s"Total PageRank = $totalPR") } else if (app == "cc") { println("Running Connected Components") - val numComponents = graph.connectedComponents().vertices.map(_._2).distinct().count() + val numComponents = graph.connectedComponents.vertices.map(_._2).distinct().count() println(s"Number of components = $numComponents") } val runTime = System.currentTimeMillis() - startTime diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 0d98360481dfc..f6b08c2ecd20a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -405,13 +405,23 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali PageRank.run(graph, numIter, resetProb) } + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] + */ + def connectedComponents(): Graph[VertexId, ED] = { + ConnectedComponents.run(graph, Int.MaxValue) + } + /** * Compute the connected component membership of each vertex and return a graph with the vertex * value containing the lowest vertex id in the connected component containing that vertex. * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(numIter: Int = Int.MaxValue): Graph[VertexId, ED] = { + def connectedComponents(numIter: Int): Graph[VertexId, ED] = { ConnectedComponents.run(graph, numIter) } From 53b93553cde0b55332fe0d2dd4db50929b14cb8b Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sat, 20 Feb 2016 13:21:49 +0800 Subject: [PATCH 4/5] options renamed --- .../src/main/scala/org/apache/spark/graphx/GraphOps.scala | 4 ++-- .../org/apache/spark/graphx/lib/ConnectedComponents.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index f6b08c2ecd20a..fb8e014cae461 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -421,8 +421,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(numIter: Int): Graph[VertexId, ED] = { - ConnectedComponents.run(graph, numIter) + def connectedComponents(maxIterations: Int): Graph[VertexId, ED] = { + ConnectedComponents.run(graph, maxIterations) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 0fea565cfe302..763148b00d6f7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -30,13 +30,13 @@ object ConnectedComponents { * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) * @param graph the graph for which to compute the connected components - * @param numIter the maximum number of iterations to run for + * @param maxIterations the maximum number of iterations to run for * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], - numIter: Int = Int.MaxValue): Graph[VertexId, ED] = { - require(numIter > 0) + maxIterations: Int = Int.MaxValue): Graph[VertexId, ED] = { + require(maxIterations > 0) val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { @@ -49,7 +49,7 @@ object ConnectedComponents { } val initialMessage = Long.MaxValue val pregelGraph = Pregel(ccGraph, initialMessage, - numIter, EdgeDirection.Either)( + maxIterations, EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) From 774f0fcc9bcb2d5c468a7d56c4144453d31258a5 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sat, 20 Feb 2016 20:35:57 +0800 Subject: [PATCH 5/5] add an run api for ConnectedComponents --- .../scala/org/apache/spark/graphx/GraphOps.scala | 2 +- .../spark/graphx/lib/ConnectedComponents.scala | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index fb8e014cae461..97a82239a97ee 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -412,7 +412,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ def connectedComponents(): Graph[VertexId, ED] = { - ConnectedComponents.run(graph, Int.MaxValue) + ConnectedComponents.run(graph) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 763148b00d6f7..40cf0735e274b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -35,7 +35,7 @@ object ConnectedComponents { * connected component */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], - maxIterations: Int = Int.MaxValue): Graph[VertexId, ED] = { + maxIterations: Int): Graph[VertexId, ED] = { require(maxIterations > 0) val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { @@ -56,4 +56,18 @@ object ConnectedComponents { ccGraph.unpersist() pregelGraph } // end of connectedComponents + + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * @param graph the graph for which to compute the connected components + * @return a graph with vertex attributes containing the smallest vertex in each + * connected component + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { + run(graph, Int.MaxValue) + } }