From 3e900b18859c2e48467da564c146948b96509f75 Mon Sep 17 00:00:00 2001 From: Kenny Bastani Date: Fri, 14 Aug 2015 03:06:44 -0700 Subject: [PATCH] SPARK-9975 Add Normalized Closeness Centrality to Spark GraphX * Added normalized closeness centrality to GraphX library * Added closeness centrality test suite * Added closeness centrality operation to GraphOps --- .../org/apache/spark/graphx/GraphOps.scala | 11 ++++ .../graphx/lib/ClosenessCentrality.scala | 66 +++++++++++++++++++ .../graphx/lib/ClosenessCentralitySuite.scala | 60 +++++++++++++++++ 3 files changed, 137 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/ClosenessCentrality.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/ClosenessCentralitySuite.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 9451ff1e5c0e2..8e54b6f9b6e5e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -435,4 +435,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = { StronglyConnectedComponents.run(graph, numIter) } + + /** + * Compute the closeness centrality of each vertex and return a graph with the + * vertex value containing the reciprocal of the average weighted sum of its distance + * to all other vertices in the graph. + * + * @see [[org.apache.spark.graphx.lib.ClosenessCentrality#run]] + */ + def closenessCentrality(): Graph[Double, ED] = { + ClosenessCentrality.run(graph) + } } // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ClosenessCentrality.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ClosenessCentrality.scala new file mode 100644 index 0000000000000..7b5eb604d1844 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ClosenessCentrality.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag +import scala.language.reflectiveCalls +import scala.language.implicitConversions + +/** + * Compute the sum of the distance of each vertex's shortest paths to all other vertices in the graph + * + * The algorithm is relatively straightforward and can be computed in three steps: + * + * + */ +object ClosenessCentrality { + + /** + * Calculate the closeness centrality of each vertex in the input graph. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the closeness centrality + * + * @return a graph with vertex attributes containing its closeness centrality + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Double, ED] = { + Graph(ShortestPaths.run(graph, graph.vertices.map { vx => vx._1 }.collect()) + .vertices.map { + vx => (vx._1, { + val dx = 1.0 / vx._2.values.seq.avg + if (dx.isNaN | dx.isNegInfinity | dx.isPosInfinity) 0.0 else dx + }) + }: RDD[(VertexId, Double)], graph.edges) + } + + def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = { + num.toDouble(ts.sum) / ts.size + } + + implicit def iterableWithAvg[T: Numeric](data: Iterable[T]): Object {def avg: Double} = new { + def avg = average(data) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ClosenessCentralitySuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ClosenessCentralitySuite.scala new file mode 100644 index 0000000000000..9e2bcaad6fcb0 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ClosenessCentralitySuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.SparkFunSuite +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD + +class ClosenessCentralitySuite extends SparkFunSuite with LocalSparkContext { + + test("Closeness Centrality Computations") { + withSpark { sc => + val closenessCentrality = Set( + (1, .4), (2, .4), (3, .4), + (4, .4), (5, .4), (0, .4)) + + // Create an RDD for the vertices + val vertices: RDD[(VertexId, Double)] = sc.parallelize(Seq( + (0L, 1.0), + (1L, 1.0), + (2L, 1.0), + (3L, 1.0), + (4L, 1.0), + (5L, 1.0))) + + // Create an RDD for edges + val edges: RDD[Edge[Double]] = sc.parallelize(Seq( + Edge(0L, 1L, 0.0), + Edge(1L, 2L, 0.0), + Edge(2L, 3L, 0.0), + Edge(3L, 4L, 0.0), + Edge(4L, 5L, 0.0), + Edge(5L, 0L, 0.0))) + + // Build the initial Graph + val graph = Graph(vertices, edges) + + val results = ClosenessCentrality.run(graph).vertices.map(a => (a._1, a._2)).collect().toSet[(VertexId, Double)] + + assert(results === closenessCentrality) + } + } + +} +