From b8c67c0c3a8ae54f3b3df2e1fd5ece2bc76acbb0 Mon Sep 17 00:00:00 2001 From: Ji Dai Date: Mon, 1 May 2017 14:28:13 -0700 Subject: [PATCH 1/2] [SPARK-20454] [GraphX] Two Improvements of ShortestPaths in GraphX MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I made two improvements as follows, we can have broader usage on shortest paths. (1) Output multiple shortest paths if there are. It computes shortest paths from a given source vertex to all other vertices. If several paths have the same shortest distance between two vertices, all the paths will be outputted. (2) Support both weighted and unweighted graphs. The generalized problem is weighted graph, and it is what my code works on. You just need to input the weighted graph as ’true’ in the parameters. It can also address the unweighted graph. If you input an unweighted graph, I just set the weights of each edge as 1. The contribution is my original work and that I license the work to the project under the project’s open source license. --- .../spark/graphx/lib/ShortestPaths.scala | 155 ++++++++++++++++-- .../spark/graphx/lib/ShortestPathsSuite.scala | 74 +++++++++ 2 files changed, 214 insertions(+), 15 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index f0c6bcb93445c..252da82b10667 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -22,13 +22,19 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ /** - * Computes shortest paths to the given set of landmark vertices, returning a graph where each - * vertex attribute is a map containing the shortest-path distance to each reachable landmark. - */ +* Computes shortest paths to the given set of landmark vertices, returning a graph where each +* vertex attribute is a map containing the shortest-path distance to each reachable landmark. +*/ object ShortestPaths { /** Stores a map from the vertex id of a landmark to the distance to that landmark. */ type SPMap = Map[VertexId, Int] + /** + * Stores a pair which contains the shortest dist to source vertex + * and a List contains all the paths to source + */ + type SSSPPair = (Double, List[Seq[VertexId]]) + private def makeMap(x: (VertexId, Int)*) = Map(x: _*) private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } @@ -38,18 +44,18 @@ object ShortestPaths { k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) }.toMap - /** - * Computes shortest paths to the given set of landmark vertices. - * - * @tparam ED the edge attribute type (not used in the computation) - * - * @param graph the graph for which to compute the shortest paths - * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each - * landmark. - * - * @return a graph where each vertex attribute is a map containing the shortest-path distance to - * each reachable landmark vertex. - */ + /** + * Computes shortest paths to the given set of landmark vertices. + * + * @tparam ED the edge attribute type (not used in the computation) + * + * @param graph the graph for which to compute the shortest paths + * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each + * landmark. + * + * @return a graph where each vertex attribute is a map containing the shortest-path distance to + * each reachable landmark vertex. + */ def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = { val spGraph = graph.mapVertices { (vid, attr) => if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap() @@ -69,4 +75,123 @@ object ShortestPaths { Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) } + + /** + * Computes shortest paths from the source vertex to all other vertices. + * Weights of edges in the graph should be positive + * + * @tparam VD the vertex attribute type + * + * @param graph the graph for which to compute the shortest paths + * @param sourceId source vertex + * @param weightedGraph weighted or unweighted graph + * + * @return a graph where each vertex attribute is a pair + * containing the shortest-path distance from the source to this vertex + * and all shortest-paths from the source to this vertex + */ + def run[VD: ClassTag](graph: Graph[VD, Double], + sourceId : VertexId, + weightedGraph: Boolean) + : Graph[SSSPPair, Double] = { + + // guarantee the source vertex exists in the graph + val sourceCount = graph.vertices.filter(v => v._1 == sourceId).count() + require(sourceCount == 1, "invalid source Id, this id doesn't exist") + + // guarantee all edges be positive + val negativeEdgeCount = graph.edges.filter(edge => edge.attr <= 0).count() + require(negativeEdgeCount == 0, "edge weight should be greater than 0") + + // initialize the graph: + // if the vertex is source, initialize it with the 0.0 as the shortest distance, + // a list with the sourceId as the shortest path; + // else, initialize the vertex with a maximum double as the shortest distance, + // an empty list as the shortest path. + val initialGraph : Graph[(Double, List[Seq[VertexId]]), Double] = { + val temp = graph.mapVertices { + (id, _) => { + if (id == sourceId) (0.0, List(Seq[VertexId](sourceId))) + else (Double.PositiveInfinity, List[Seq[VertexId]]()) + } + } + // unweighted graph, simply change the edge attribute to 1.0 + if(!weightedGraph) { + temp.mapEdges(_ => 1.0) + } else { + temp + } + } + + // initial massage: set shortest path distance to infinity and initialize a empty list + val initialMsg: SSSPPair = (Double.PositiveInfinity, List[Seq[VertexId]]()) + + // set up maximum iteration times + val maxIterations: Int = Int.MaxValue + + // set up active direction as 'out' + val activeDirection: EdgeDirection = EdgeDirection.Out + + // Vertex Program that receiving messages + // runs on each vertex and receives the inbound message and + // computes a new vertex value. + // On the first iteration the vertex program is invoked on + // all vertices and is passed the default message. + // On subsequent iterations the vertex program is only invoked + // on those vertices that receive messages. + // 'distAndPaths' is the type of SSSPPair, + // so distAndPath._1 is the shortest distance the current node holds from source, + // distAndPath._2 is the shortest path the current node holds from source, + // if a new msg with shorter distance comes, update the vertex with the msg + // else if the msg holds the same shortest dist as the current node, + // appending the shortest paths of the msg to the shortest paths of the current node, + // and then remove the duplicate path. + def vprog(id: VertexId, distAndPaths: SSSPPair, msg: SSSPPair): SSSPPair = { + if (distAndPaths._1 < msg._1) distAndPaths + else if (distAndPaths._1 == msg._1) { + (distAndPaths._1, (distAndPaths._2 ::: msg._2).distinct) + } + else msg + } + + // Send Message + // a user supplied function that is applied to out edges of vertices + // that received messages in the current iteration, + // determine the messages to send out for the next iteration and where to send it. + // if a vertex receive a message in the current iteration + // the 'sendMsg' function will be applied to it, + // and try to find a shorter or equal length path, + // send the updated msg with the corresponding dist and the path list + def sendMsg(triplet: EdgeTriplet[SSSPPair, Double]): Iterator[(VertexId, SSSPPair)] = { + if (triplet.srcAttr._1 + triplet.attr <= triplet.dstAttr._1) { + Iterator{ + (triplet.dstId, + (triplet.srcAttr._1 + triplet.attr, + triplet.srcAttr._2.map(curPaths => curPaths :+ triplet.dstId))) + } + } else { + Iterator.empty + } + } + + // Merge Message + // user defined function to merge multiple messages arriving at the same vertex + // at the start of a Superstep before applying the vertex program 'vprog' + // get two messages of type - SSSPPair, and compare them, + // get the one which holds the shorter distance + // if two messages come in with the equal shortest dist, combine the two msg into one + // e.g. a vertex get 2 messages(msg1 and msg2) with different dist, + // mergeMsg will compare dist in msg1 and msg2 + // and merge them into one msg which contains the shorter distance + def mergeMsg(msg1: SSSPPair, msg2: SSSPPair): SSSPPair = { + if (msg1._1 < msg2._1) msg1 + else if (msg1._1 == msg2._1) { + (msg1._1, (msg1._2 ::: msg2._2).distinct) + } + else msg2 + } + + initialGraph.pregel(initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) + } } + diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala index 994395bbffa56..833829a0abf52 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala @@ -40,4 +40,78 @@ class ShortestPathsSuite extends SparkFunSuite with LocalSparkContext { } } + test("Shortest Path single source all paths in weighted graph") { + withSpark { sc => + val edgeArr: Array[Edge[Double]] = Array(Edge(2, 1, 7), + Edge(2, 4, 2), + Edge(3, 2, 4), + Edge(3, 6, 3), + Edge(4, 1, 1), + Edge(5, 2, 2), + Edge(5, 3, 3), + Edge(5, 6, 8), + Edge(5, 7, 2), + Edge(7, 6, 4), + Edge(7, 4, 1)) + + val edges = sc.parallelize(edgeArr) + val graph = Graph.fromEdges(edges, true) + val results = ShortestPaths.run(graph, 5, true).vertices.collect.map { + case(id, distAndPaths) => distAndPaths._2.map{ + path => (distAndPaths._1, path) + } + }.flatMap(pair => pair) + + val shortestPaths = Set( + (3.0, List[VertexId](5, 7, 4)), + (4.0, List[VertexId](5, 7, 4, 1)), + (0.0, List[VertexId](5)), + (6.0, List[VertexId](5, 7, 6)), + (6.0, List[VertexId](5, 3, 6)), + (2.0, List[VertexId](5, 2)), + (3.0, List[VertexId](5, 3)), + (2.0, List[VertexId](5, 7)) + ) + + assert(results.toSet === shortestPaths) + } + } + + test("Shortest Path single source all paths in unweighted graph") { + withSpark { sc => + val edgeArr: Array[Edge[Double]] = Array(Edge(2, 1, 7), + Edge(2, 4, 2), + Edge(3, 2, 4), + Edge(3, 6, 3), + Edge(4, 1, 1), + Edge(5, 2, 2), + Edge(5, 3, 3), + Edge(5, 6, 8), + Edge(5, 7, 2), + Edge(7, 6, 4), + Edge(7, 4, 1)) + + val edges = sc.parallelize(edgeArr) + val graph = Graph.fromEdges(edges, true) + val results = ShortestPaths.run(graph, 5, false).vertices.collect.map { + case(id, distAndPaths) => distAndPaths._2.map{ + path => (distAndPaths._1, path) + } + }.flatMap(pair => pair) + + val shortestPaths = Set( + (2.0, List[VertexId](5, 2, 4)), + (2.0, List[VertexId](5, 7, 4)), + (2.0, List[VertexId](5, 2, 1)), + (0.0, List[VertexId](5)), + (1.0, List[VertexId](5, 6)), + (1.0, List[VertexId](5, 2)), + (1.0, List[VertexId](5, 3)), + (1.0, List[VertexId](5, 7)) + ) + + assert(results.toSet === shortestPaths) + } + } } + From 401391c703464efffc5c730fb775a18c061fc5c3 Mon Sep 17 00:00:00 2001 From: Ji Dai Date: Tue, 9 May 2017 14:52:43 -0700 Subject: [PATCH 2/2] Update ShortestPaths.scala --- .../main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index 252da82b10667..b6034b14ca082 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -77,6 +77,8 @@ object ShortestPaths { } /** + * Copyright [2017] [Ji Dai, Suning Commerce R&D Center USA at Palo Alto, CA] + * * Computes shortest paths from the source vertex to all other vertices. * Weights of edges in the graph should be positive *