From f328eb86eba5e2414ed3a53a64a762b0f2b77f02 Mon Sep 17 00:00:00 2001 From: Carlos Balduz Date: Mon, 20 Apr 2015 18:51:02 +0200 Subject: [PATCH 1/2] [SPARK-6978][GraphX] Compute all the direct and indirect relations for each vertex Right now you can only get the direct connections of each vertex in a graph using collectNeighbors or collectNeighborIds. However, it would be very useful to be able to also get all the indirect connections of each vertex, with the degree of each of these connections. I have implemented this using Pregel, you call VertexConnections and specify the graph and the maximum degree you wish to retrieve (this second parameter is optional, if not specified it will retrieve every connection) and returns a map where for each key (degree) it contains a collection with all of the vertices. --- .../spark/graphx/lib/VertexConnections.scala | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala new file mode 100644 index 0000000000000..30cef8a88cd24 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ +import scala.reflect.ClassTag + +/** + * Computes all the direct and indirect connections of each vertex on a graph up to a + * maximum degree. + */ +object VertexConnections { + /** + * @tparam ED the edge attribute type (not used in the computation) + * + * @param maxDegree the maximum number of Pregel iterations to be performed + * @param graph the graph for which to compute the connections + * @param removeDegree0 if true, the returned graph will not contain vertices with degree 0 + * + * @return a graph where each vertex attribute is a map containing all the connections and + * the degree of each connection + */ + def apply[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], + maxDegree: Int = Int.MaxValue, + removeDegree0: Boolean = true) + : Graph[Map[Int, Set[VD]], ED] = + { + type RelationsMap = Map[Int, Set[VD]] + + def makeMap(x: (Int, Set[VD])*): RelationsMap = (Map(x: _*)) + + /** + * Add two maps from different vertices + */ + def addMaps(rel1: collection.mutable.Map[Int, Set[VD]], + rel2: collection.mutable.Map[Int, Set[VD]]): RelationsMap = { + val firstValues: Set[VD] = rel1.values.toSet.flatten + + val notIncludedMap: collection.mutable.Map[Int, Set[VD]] = rel2.map{case (key, value) => + val notIncluded: Set[VD] = value -- firstValues + key -> notIncluded + } + + notIncludedMap.foreach{case (key, value) => + if (key > 0) { + val finalSet: Set[VD] = rel1.getOrElse(key + 1, Set.empty) ++ value + if (finalSet.size > 0) rel1.put(key + 1, finalSet) + } + } + + Map(rel1.toSeq: _*) + } + + /** + * This function merges two maps from the same vertex. + */ + def mergeMaps(rel1: RelationsMap, rel2: RelationsMap): RelationsMap = { + val firstValues: Set[VD] = rel1.values.toSet.flatten + + val notIncludedMap: RelationsMap = rel2.map{case (key, value) => + val notIncluded: Set[VD] = value -- firstValues + key -> notIncluded + } + + val finalSet = (rel1.keys ++ notIncludedMap.keys).map { k => + (k,rel1.getOrElse(k, Set.empty) ++ notIncludedMap.getOrElse(k, Set.empty)) + } + + finalSet.toMap + } + + /** + * Called on every vertex to merge all the inbound messages after each Pregel iteration + */ + def vertexProgram(id: VertexId, attr: RelationsMap, msg: RelationsMap): RelationsMap = { + mergeMaps(attr, msg) + } + + /** + * Function applied to all of the edges that received messages in the current iteration + */ + def sendMessage(edge: EdgeTriplet[RelationsMap, _]): Iterator[(VertexId, RelationsMap)] = { + val copySrc = collection.mutable.Map(edge.srcAttr.toSeq: _*) + val copyDst = collection.mutable.Map(edge.dstAttr.toSeq: _*) + + copySrc.put(1,copySrc.getOrElse(1, Set.empty) ++ copyDst(0)) + copyDst.put(1,copyDst.getOrElse(1, Set.empty) ++ copySrc(0)) + + val newSrcAttr = addMaps(copySrc, copyDst) + val newDstAttr = addMaps(copyDst, copySrc) + + if (edge.srcAttr != newSrcAttr || edge.dstAttr != newDstAttr) { + Iterator((edge.srcId, newSrcAttr), (edge.dstId, newDstAttr)) + } + else Iterator.empty + } + + val initialMessage = makeMap() + + val relGraph: Graph[RelationsMap, ED] = + if (!removeDegree0) { + graph.mapVertices{ (vid, attr) => makeMap((0, Set(attr))) } + } else { + graph.filter(graph => { + val degrees: VertexRDD[Int] = graph.degrees + graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} + }, + vpred = (vid: VertexId, deg:Int) => deg > 0 + ).mapVertices{ (vid, attr) => makeMap((0, Set(attr))) } + } + + Pregel(relGraph, initialMessage, maxDegree)(vertexProgram, sendMessage, mergeMaps) + } +} From a35ec6d1c4bc2be1bd3230330a50d5d61782113e Mon Sep 17 00:00:00 2001 From: Carlos Balduz Date: Wed, 2 Sep 2015 13:02:08 +0200 Subject: [PATCH 2/2] Fixed Scala style --- .../spark/graphx/lib/VertexConnections.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala index 30cef8a88cd24..02533846d6d95 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/VertexConnections.scala @@ -21,7 +21,7 @@ import org.apache.spark.graphx._ import scala.reflect.ClassTag /** - * Computes all the direct and indirect connections of each vertex on a graph up to a + * Computes all the direct and indirect connections of each vertex on a graph up to a * maximum degree. */ object VertexConnections { @@ -32,39 +32,39 @@ object VertexConnections { * @param graph the graph for which to compute the connections * @param removeDegree0 if true, the returned graph will not contain vertices with degree 0 * - * @return a graph where each vertex attribute is a map containing all the connections and - * the degree of each connection + * @return a graph where each vertex attribute is a map containing all the connections and + * the degree of each connection */ def apply[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], maxDegree: Int = Int.MaxValue, removeDegree0: Boolean = true) - : Graph[Map[Int, Set[VD]], ED] = - { + : Graph[Map[Int, Set[VD]], ED] = + { type RelationsMap = Map[Int, Set[VD]] - + def makeMap(x: (Int, Set[VD])*): RelationsMap = (Map(x: _*)) - + /** * Add two maps from different vertices */ - def addMaps(rel1: collection.mutable.Map[Int, Set[VD]], + def addMaps(rel1: collection.mutable.Map[Int, Set[VD]], rel2: collection.mutable.Map[Int, Set[VD]]): RelationsMap = { val firstValues: Set[VD] = rel1.values.toSet.flatten - + val notIncludedMap: collection.mutable.Map[Int, Set[VD]] = rel2.map{case (key, value) => val notIncluded: Set[VD] = value -- firstValues key -> notIncluded } - - notIncludedMap.foreach{case (key, value) => + + notIncludedMap.foreach{case (key, value) => if (key > 0) { val finalSet: Set[VD] = rel1.getOrElse(key + 1, Set.empty) ++ value - if (finalSet.size > 0) rel1.put(key + 1, finalSet) + if (finalSet.size > 0) rel1.put(key + 1, finalSet) } } - - Map(rel1.toSeq: _*) + + Map(rel1.toSeq: _*) } /** @@ -72,59 +72,59 @@ object VertexConnections { */ def mergeMaps(rel1: RelationsMap, rel2: RelationsMap): RelationsMap = { val firstValues: Set[VD] = rel1.values.toSet.flatten - + val notIncludedMap: RelationsMap = rel2.map{case (key, value) => val notIncluded: Set[VD] = value -- firstValues key -> notIncluded } - - val finalSet = (rel1.keys ++ notIncludedMap.keys).map { k => - (k,rel1.getOrElse(k, Set.empty) ++ notIncludedMap.getOrElse(k, Set.empty)) + + val finalSet = (rel1.keys ++ notIncludedMap.keys).map { k => + (k, rel1.getOrElse(k, Set.empty) ++ notIncludedMap.getOrElse(k, Set.empty)) } finalSet.toMap } - + /** * Called on every vertex to merge all the inbound messages after each Pregel iteration */ def vertexProgram(id: VertexId, attr: RelationsMap, msg: RelationsMap): RelationsMap = { mergeMaps(attr, msg) } - + /** * Function applied to all of the edges that received messages in the current iteration */ def sendMessage(edge: EdgeTriplet[RelationsMap, _]): Iterator[(VertexId, RelationsMap)] = { val copySrc = collection.mutable.Map(edge.srcAttr.toSeq: _*) val copyDst = collection.mutable.Map(edge.dstAttr.toSeq: _*) - - copySrc.put(1,copySrc.getOrElse(1, Set.empty) ++ copyDst(0)) - copyDst.put(1,copyDst.getOrElse(1, Set.empty) ++ copySrc(0)) + + copySrc.put(1, copySrc.getOrElse(1, Set.empty) ++ copyDst(0)) + copyDst.put(1, copyDst.getOrElse(1, Set.empty) ++ copySrc(0)) val newSrcAttr = addMaps(copySrc, copyDst) val newDstAttr = addMaps(copyDst, copySrc) if (edge.srcAttr != newSrcAttr || edge.dstAttr != newDstAttr) { - Iterator((edge.srcId, newSrcAttr), (edge.dstId, newDstAttr)) + Iterator((edge.srcId, newSrcAttr), (edge.dstId, newDstAttr)) } else Iterator.empty } - + val initialMessage = makeMap() - - val relGraph: Graph[RelationsMap, ED] = + + val relGraph: Graph[RelationsMap, ED] = if (!removeDegree0) { - graph.mapVertices{ (vid, attr) => makeMap((0, Set(attr))) } + graph.mapVertices{ (vid, attr) => makeMap((0, Set(attr))) } } else { graph.filter(graph => { val degrees: VertexRDD[Int] = graph.degrees graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} }, - vpred = (vid: VertexId, deg:Int) => deg > 0 + vpred = (vid: VertexId, deg: Int) => deg > 0 ).mapVertices{ (vid, attr) => makeMap((0, Set(attr))) } } - + Pregel(relGraph, initialMessage, maxDegree)(vertexProgram, sendMessage, mergeMaps) } }