From f2e0f3a8b37ea5ed82731f111b5e011c310e0695 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 19 Feb 2016 17:46:43 +0800 Subject: [PATCH 1/5] add ConnectedComponentsWithDegree --- .../lib/ConnectedComponentsWithDegree.scala | 86 +++++++++++ .../ConnectedComponentsWithDegreeSuite.scala | 133 ++++++++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala new file mode 100644 index 000000000000..56a28c350e04 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ + +/** + * Created by zrf on 16/2/19. + */ +object ConnectedComponentsWithDegree { + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the largest degree in the connected component and the corresponding vertex + * id. If several vertices have the same largest degree, the one with lowest id is chosen. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * @param graph the graph for which to compute the connected components + * @return a graph with vertex attributes containing the largest degree and the corresponding id + * in each connected component + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numIter: Int = Int.MaxValue): Graph[(VertexId, Int), ED] = { + + val degGraph = graph.outerJoinVertices(graph.degrees)( + (vid, vd, degOpt) => (vid, degOpt.getOrElse(0))) + + + def cmp(a: (VertexId, Int), b: (VertexId, Int)): Int = { + if (a._2 > b._2) { + 1 + } else if (a._2 < b._2) { + -1 + } else if (a._1 < b._1) { + 1 + } else if (a._1 > b._1) { + -1 + } else { + 0 + } + } + + def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { + if (cmp(a, b) >= 0) { + a + } else { + b + } + } + + val pregelGraph = Pregel(graph = degGraph, + initialMsg = (Long.MaxValue, Int.MinValue), + maxIterations = numIter, + activeDirection = EdgeDirection.Either)( + vprog = (id, attr, msg) => max(attr, msg), + sendMsg = edge => { + cmp(edge.srcAttr, edge.dstAttr) match { + case 1 => Iterator((edge.dstId, edge.srcAttr)) + case -1 => Iterator((edge.srcId, edge.dstAttr)) + case 0 => Iterator.empty + } + }, + mergeMsg = max + ) + degGraph.unpersist() + pregelGraph + } // end of connectedComponentswithdegree +} + diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala new file mode 100644 index 000000000000..6074644cec1d --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + +/** + * Created by zrf on 16/2/19. + */ +class ConnectedComponentsWithDegreeSuite extends SparkFunSuite with LocalSparkContext { + + test("Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) + val ccGraph = gridGraph.connectedComponents() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Reverse Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse + val ccGraph = gridGraph.connectedComponents() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x + 1)) + val chain2 = (10 until 20).map(x => (x, x + 1)) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) + val ccGraph = twoChains.connectedComponents() + val vertices = ccGraph.vertices.collect() + for ( (id, cc) <- vertices ) { + if (id < 10) { + assert(cc === 0) + } else { + assert(cc === 10) + } + } + val ccMap = vertices.toMap + for (id <- 0 until 20) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of chain connected components + + test("Reverse Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x + 1)) + val chain2 = (10 until 20).map(x => (x, x + 1)) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse + val ccGraph = twoChains.connectedComponents() + val vertices = ccGraph.vertices.collect() + for ( (id, cc) <- vertices ) { + if (id < 10) { + assert(cc === 0) + } else { + assert(cc === 10) + } + } + val ccMap = vertices.toMap + for ( id <- 0 until 20 ) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of reverse chain connected components + + test("Connected Components on a Toy Connected Graph") { + withSpark { sc => + // Create an RDD for the vertices + val users: RDD[(VertexId, (String, String))] = + sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), + (4L, ("peter", "student")))) + // Create an RDD for edges + val relationships: RDD[Edge[String]] = + sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), + Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) + // Edges are: + // 2 ---> 5 ---> 3 + // | \ + // V \| + // 4 ---> 0 7 + // + // Define a default user in case there are relationship with missing user + val defaultUser = ("John Doe", "Missing") + // Build the initial Graph + val graph = Graph(users, relationships, defaultUser) + val ccGraph = graph.connectedComponents() + val vertices = ccGraph.vertices.collect() + for ( (id, cc) <- vertices ) { + assert(cc === 0) + } + } + } // end of toy connected components + +} From f4d46c332d34c4c8cb744801c61bdb1c11452f81 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 19 Feb 2016 17:57:13 +0800 Subject: [PATCH 2/5] add to GraphOps --- .../org/apache/spark/graphx/GraphOps.scala | 11 ++++++++++ .../graphx/lib/ConnectedComponentsSuite.scala | 20 +++++++++---------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index d537b6141cc9..bedf7cd15819 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -434,6 +434,17 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali ConnectedComponents.run(graph, maxIterations) } + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the largest degree in the connected component and the corresponding vertex + * id. If several vertices have the same largest degree, the one with lowest id is chosen. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponentsWithDegree#run]] + */ + def connectedComponentsWithDegree(numIter: Int = Int.MaxValue): Graph[(VertexId, Int), ED] = { + ConnectedComponentsWithDegree.run(graph, numIter) + } + /** * Compute the number of triangles passing through each vertex. * diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 1b8142356337..caf9373a4b61 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -28,8 +28,8 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { test("Grid Connected Components") { withSpark { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) - val ccGraph = gridGraph.connectedComponents() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() + val ccGraph = gridGraph.connectedComponentsWithDegree() + val maxCCid = ccGraph.vertices.map { case (vid, (ccId, degree)) => ccId }.sum() assert(maxCCid === 0) } } // end of Grid connected components @@ -53,7 +53,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) val ccGraph = twoChains.connectedComponents() val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { + for ((id, cc) <- vertices) { if (id < 10) { assert(cc === 0) } else { @@ -79,7 +79,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse val ccGraph = twoChains.connectedComponents() val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { + for ((id, cc) <- vertices) { if (id < 10) { assert(cc === 0) } else { @@ -87,7 +87,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { } } val ccMap = vertices.toMap - for ( id <- 0 until 20 ) { + for (id <- 0 until 20) { if (id < 10) { assert(ccMap(id) === 0) } else { @@ -102,13 +102,13 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), - (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), - (4L, ("peter", "student")))) + (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), + (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), - Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), - Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) + Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), + Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Edges are: // 2 ---> 5 ---> 3 // | \ @@ -121,7 +121,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val graph = Graph(users, relationships, defaultUser) val ccGraph = graph.connectedComponents() val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { + for ((id, cc) <- vertices) { assert(cc === 0) } } From e3d93973970e46d28bb4dc118109cab418f3f379 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 19 Feb 2016 18:01:15 +0800 Subject: [PATCH 3/5] add to test --- .../graphx/lib/ConnectedComponentsSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index caf9373a4b61..0f365c9d924a 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -38,8 +38,8 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { test("Reverse Grid Connected Components") { withSpark { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse - val ccGraph = gridGraph.connectedComponents() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() + val ccGraph = gridGraph.connectedComponentsWithDegree() + val maxCCid = ccGraph.vertices.map { case (vid, (ccId, degree)) => ccId }.sum() assert(maxCCid === 0) } } // end of Grid connected components @@ -51,9 +51,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val chain2 = (10 until 20).map(x => (x, x + 1)) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) - val ccGraph = twoChains.connectedComponents() + val ccGraph = twoChains.connectedComponentsWithDegree() val vertices = ccGraph.vertices.collect() - for ((id, cc) <- vertices) { + for ((id, (cc, degree)) <- vertices) { if (id < 10) { assert(cc === 0) } else { @@ -63,9 +63,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val ccMap = vertices.toMap for (id <- 0 until 20) { if (id < 10) { - assert(ccMap(id) === 0) + assert(ccMap(id)._1 === 0) } else { - assert(ccMap(id) === 10) + assert(ccMap(id)._1 === 10) } } } @@ -77,9 +77,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val chain2 = (10 until 20).map(x => (x, x + 1)) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse - val ccGraph = twoChains.connectedComponents() + val ccGraph = twoChains.connectedComponentsWithDegree() val vertices = ccGraph.vertices.collect() - for ((id, cc) <- vertices) { + for ((id, (cc, degree)) <- vertices) { if (id < 10) { assert(cc === 0) } else { @@ -89,9 +89,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val ccMap = vertices.toMap for (id <- 0 until 20) { if (id < 10) { - assert(ccMap(id) === 0) + assert(ccMap(id)._1 === 0) } else { - assert(ccMap(id) === 10) + assert(ccMap(id)._1 === 10) } } } @@ -119,9 +119,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) - val ccGraph = graph.connectedComponents() + val ccGraph = graph.connectedComponentsWithDegree() val vertices = ccGraph.vertices.collect() - for ((id, cc) <- vertices) { + for ((id, (cc, dgree)) <- vertices) { assert(cc === 0) } } From bd7d498b930e7c9d354281888e89f38bb1e24e9a Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Tue, 1 Mar 2016 17:58:52 +0800 Subject: [PATCH 4/5] add test --- .../org/apache/spark/graphx/GraphOps.scala | 27 +++++-- .../lib/ConnectedComponentsWithDegree.scala | 53 ++++++++----- .../ConnectedComponentsWithDegreeSuite.scala | 76 +++++++++---------- 3 files changed, 89 insertions(+), 67 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index bedf7cd15819..98cfd2879fb2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -435,14 +435,25 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } /** - * Compute the connected component membership of each vertex and return a graph with the vertex - * value containing the largest degree in the connected component and the corresponding vertex - * id. If several vertices have the same largest degree, the one with lowest id is chosen. - * - * @see [[org.apache.spark.graphx.lib.ConnectedComponentsWithDegree#run]] - */ - def connectedComponentsWithDegree(numIter: Int = Int.MaxValue): Graph[(VertexId, Int), ED] = { - ConnectedComponentsWithDegree.run(graph, numIter) + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the largest degree in the connected component and the corresponding vertex + * id. If several vertices have the same largest degree, the one with lowest id is chosen. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponentsWithDegree#run]] + */ + def connectedComponentsWithDegree(): Graph[(VertexId, Int), ED] = { + ConnectedComponentsWithDegree.run(graph) + } + + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the largest degree in the connected component and the corresponding vertex + * id. If several vertices have the same largest degree, the one with lowest id is chosen. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponentsWithDegree#run]] + */ + def connectedComponentsWithDegree(maxIterations: Int): Graph[(VertexId, Int), ED] = { + ConnectedComponentsWithDegree.run(graph, maxIterations) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala index 56a28c350e04..857a0c72b58e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegree.scala @@ -21,28 +21,30 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ -/** - * Created by zrf on 16/2/19. - */ +/** Connected components algorithm based on degree propagation. */ object ConnectedComponentsWithDegree { /** - * Compute the connected component membership of each vertex and return a graph with the vertex - * value containing the largest degree in the connected component and the corresponding vertex - * id. If several vertices have the same largest degree, the one with lowest id is chosen. - * - * @tparam VD the vertex attribute type (discarded in the computation) - * @tparam ED the edge attribute type (preserved in the computation) - * @param graph the graph for which to compute the connected components - * @return a graph with vertex attributes containing the largest degree and the corresponding id - * in each connected component - */ + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the largest degree in the connected component and the corresponding vertex + * id. If several vertices have the same largest degree, the one with lowest id is chosen. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * @param graph the graph for which to compute the connected components + * @param maxIterations the maximum number of iterations to run for + * @return a graph with vertex attributes containing the largest degree and the corresponding id + * in each connected component + */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], - numIter: Int = Int.MaxValue): Graph[(VertexId, Int), ED] = { - + maxIterations: Int): Graph[(VertexId, Int), ED] = { + require(maxIterations > 0, s"Maximum of iterations must be greater than 0," + + s" but got ${maxIterations}") val degGraph = graph.outerJoinVertices(graph.degrees)( (vid, vd, degOpt) => (vid, degOpt.getOrElse(0))) - + // First compare the degree, and then the vertex id. The greater the degree is, the greater + // the tuple is. And if two tuples have the same degree, the less the vertex id is, the greater + // the tuple is. def cmp(a: (VertexId, Int), b: (VertexId, Int)): Int = { if (a._2 > b._2) { 1 @@ -57,6 +59,7 @@ object ConnectedComponentsWithDegree { } } + // return the greater tuple def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (cmp(a, b) >= 0) { a @@ -67,7 +70,7 @@ object ConnectedComponentsWithDegree { val pregelGraph = Pregel(graph = degGraph, initialMsg = (Long.MaxValue, Int.MinValue), - maxIterations = numIter, + maxIterations = maxIterations, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => max(attr, msg), sendMsg = edge => { @@ -82,5 +85,19 @@ object ConnectedComponentsWithDegree { degGraph.unpersist() pregelGraph } // end of connectedComponentswithdegree -} + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the largest degree in the connected component and the corresponding vertex + * id. If several vertices have the same largest degree, the one with lowest id is chosen. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * @param graph the graph for which to compute the connected components + * @return a graph with vertex attributes containing the largest degree and the corresponding id + * in each connected component + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[(VertexId, Int), ED] = { + run(graph, Int.MaxValue) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala index 6074644cec1d..0e13a53ce51b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsWithDegreeSuite.scala @@ -23,27 +23,31 @@ import org.apache.spark.graphx._ import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.rdd._ -/** - * Created by zrf on 16/2/19. - */ + class ConnectedComponentsWithDegreeSuite extends SparkFunSuite with LocalSparkContext { test("Grid Connected Components") { withSpark { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) - val ccGraph = gridGraph.connectedComponents() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() - assert(maxCCid === 0) + val n = 5 + val gridGraph = GraphGenerators.gridGraph(sc, n, n) + val dccGraph = gridGraph.connectedComponentsWithDegree() + val dccIds = dccGraph.vertices.map { case (vid, dccId) => dccId }.distinct().collect() + assert(dccIds.length === 1) + assert(dccIds.head._1 === n + 1) + assert(dccIds.head._2 === 4) } } // end of Grid connected components test("Reverse Grid Connected Components") { withSpark { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse - val ccGraph = gridGraph.connectedComponents() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() - assert(maxCCid === 0) + val n = 5 + val gridGraph = GraphGenerators.gridGraph(sc, n, n).reverse + val dccGraph = gridGraph.connectedComponentsWithDegree() + val dccIds = dccGraph.vertices.map { case (vid, dccId) => dccId }.distinct().collect() + assert(dccIds.length === 1) + assert(dccIds.head._1 === n + 1) + assert(dccIds.head._2 === 4) } } // end of Grid connected components @@ -54,21 +58,15 @@ class ConnectedComponentsWithDegreeSuite extends SparkFunSuite with LocalSparkCo val chain2 = (10 until 20).map(x => (x, x + 1)) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) - val ccGraph = twoChains.connectedComponents() - val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { - if (id < 10) { - assert(cc === 0) - } else { - assert(cc === 10) - } - } - val ccMap = vertices.toMap - for (id <- 0 until 20) { + val dccGraph = twoChains.connectedComponentsWithDegree() + val vertices = dccGraph.vertices.collect() + for ( (id, dcc) <- vertices ) { if (id < 10) { - assert(ccMap(id) === 0) + assert(dcc._1 === 1) + assert(dcc._2 === 2) } else { - assert(ccMap(id) === 10) + assert(dcc._1 === 11) + assert(dcc._2 === 2) } } } @@ -80,21 +78,15 @@ class ConnectedComponentsWithDegreeSuite extends SparkFunSuite with LocalSparkCo val chain2 = (10 until 20).map(x => (x, x + 1)) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse - val ccGraph = twoChains.connectedComponents() - val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { - if (id < 10) { - assert(cc === 0) - } else { - assert(cc === 10) - } - } - val ccMap = vertices.toMap - for ( id <- 0 until 20 ) { + val dccGraph = twoChains.connectedComponentsWithDegree() + val vertices = dccGraph.vertices.collect() + for ( (id, dcc) <- vertices ) { if (id < 10) { - assert(ccMap(id) === 0) + assert(dcc._1 === 1) + assert(dcc._2 === 2) } else { - assert(ccMap(id) === 10) + assert(dcc._1 === 11) + assert(dcc._2 === 2) } } } @@ -122,10 +114,12 @@ class ConnectedComponentsWithDegreeSuite extends SparkFunSuite with LocalSparkCo val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) - val ccGraph = graph.connectedComponents() - val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { - assert(cc === 0) + val dccGraph = graph.connectedComponentsWithDegree() + val vertices = dccGraph.vertices.collect() + for ( (id, dcc) <- vertices ) { + // vertex 5 has the max degree, which equals to 4 + assert(dcc._1 === 5) + assert(dcc._2 === 4) } } } // end of toy connected components From 1e0bd879b97dd03a6dc4d7d4804a24bb29b71dd8 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 7 Mar 2016 17:31:26 +0800 Subject: [PATCH 5/5] revert test for cc --- .../graphx/lib/ConnectedComponentsSuite.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 0f365c9d924a..1b8142356337 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -28,8 +28,8 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { test("Grid Connected Components") { withSpark { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) - val ccGraph = gridGraph.connectedComponentsWithDegree() - val maxCCid = ccGraph.vertices.map { case (vid, (ccId, degree)) => ccId }.sum() + val ccGraph = gridGraph.connectedComponents() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() assert(maxCCid === 0) } } // end of Grid connected components @@ -38,8 +38,8 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { test("Reverse Grid Connected Components") { withSpark { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse - val ccGraph = gridGraph.connectedComponentsWithDegree() - val maxCCid = ccGraph.vertices.map { case (vid, (ccId, degree)) => ccId }.sum() + val ccGraph = gridGraph.connectedComponents() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum() assert(maxCCid === 0) } } // end of Grid connected components @@ -51,9 +51,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val chain2 = (10 until 20).map(x => (x, x + 1)) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) - val ccGraph = twoChains.connectedComponentsWithDegree() + val ccGraph = twoChains.connectedComponents() val vertices = ccGraph.vertices.collect() - for ((id, (cc, degree)) <- vertices) { + for ( (id, cc) <- vertices ) { if (id < 10) { assert(cc === 0) } else { @@ -63,9 +63,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val ccMap = vertices.toMap for (id <- 0 until 20) { if (id < 10) { - assert(ccMap(id)._1 === 0) + assert(ccMap(id) === 0) } else { - assert(ccMap(id)._1 === 10) + assert(ccMap(id) === 10) } } } @@ -77,9 +77,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val chain2 = (10 until 20).map(x => (x, x + 1)) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s, d) => (s.toLong, d.toLong) } val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse - val ccGraph = twoChains.connectedComponentsWithDegree() + val ccGraph = twoChains.connectedComponents() val vertices = ccGraph.vertices.collect() - for ((id, (cc, degree)) <- vertices) { + for ( (id, cc) <- vertices ) { if (id < 10) { assert(cc === 0) } else { @@ -87,11 +87,11 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { } } val ccMap = vertices.toMap - for (id <- 0 until 20) { + for ( id <- 0 until 20 ) { if (id < 10) { - assert(ccMap(id)._1 === 0) + assert(ccMap(id) === 0) } else { - assert(ccMap(id)._1 === 10) + assert(ccMap(id) === 10) } } } @@ -102,13 +102,13 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), - (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), - (4L, ("peter", "student")))) + (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), + (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), - Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), - Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) + Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), + Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Edges are: // 2 ---> 5 ---> 3 // | \ @@ -119,9 +119,9 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) - val ccGraph = graph.connectedComponentsWithDegree() + val ccGraph = graph.connectedComponents() val vertices = ccGraph.vertices.collect() - for ((id, (cc, dgree)) <- vertices) { + for ( (id, cc) <- vertices ) { assert(cc === 0) } }