Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Shortest-path computations to graphx.lib with unit tests. #10

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7496d6b
Add Shortest-path computations to graphx.lib with unit tests.
andy327 Feb 17, 2014
f8f6d91
Revert "Add Shortest-path computations to graphx.lib with unit tests."
andy327 Feb 17, 2014
a3bdb0e
Merge branch 'master' of https://github.com/apache/incubator-spark
koertkuipers Feb 24, 2014
2cbfe45
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Feb 26, 2014
6cd90a5
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Feb 26, 2014
2d5e788
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 2, 2014
ee9d90b
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 3, 2014
5c5b197
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 5, 2014
ba6e530
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 8, 2014
d47865f
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 17, 2014
9319fac
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 18, 2014
745a7a1
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Mar 18, 2014
9ee0d89
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Apr 5, 2014
25fbe10
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Apr 6, 2014
4986f80
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Apr 7, 2014
44d19e5
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Apr 7, 2014
47e22db
Remove algebird dependency from ShortestPaths.
andy327 Apr 23, 2014
c9d1ee8
Merge branch 'master' of https://github.com/apache/spark
koertkuipers Apr 23, 2014
88d80da
Merge from master.
andy327 Apr 23, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import org.apache.spark.graphx._

object ShortestPaths {
type SPMap = Map[VertexId, Int] // map of landmarks -> minimum distance to landmark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scala map data-structures can be pretty costly and inefficient. Instead you could use an array containing the distances and then maintain a global map (shared by broadcast variable) with the mapping from vertex id to index in the array. This should also reduce the memory overhead substantially since each vertex will not need to maintain its own locally Map data structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially remap the "landmarks" to a consecutive landmark id set and then on the initial creation of spGraph you would require using the single broadcast map but from then on no map data structures would be required.

def SPMap(x: (VertexId, Int)*) = Map(x: _*)
def increment(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
def plus(spmap1: SPMap, spmap2: SPMap): SPMap =
(spmap1.keySet ++ spmap2.keySet).map{
k => k -> scala.math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}.toMap

/**
* Compute the shortest paths to each landmark for each vertex and
* return an RDD with the map of landmarks to their shortest-path
* lengths.
*
* @tparam VD the shortest paths map for the vertex
* @tparam ED the incremented shortest-paths map of the originating
* vertex (discarded in the computation)
*
* @param graph the graph for which to compute the shortest paths
* @param landmarks the list of landmark vertex ids
*
* @return a graph with vertex attributes containing a map of the
* shortest paths to each landmark
*/
def run[VD, ED](graph: Graph[VD, ED], landmarks: Seq[VertexId])
(implicit m1: Manifest[VD], m2: Manifest[ED]): Graph[SPMap, SPMap] = {

val spGraph = graph
.mapVertices{ (vid, attr) =>
if (landmarks.contains(vid)) SPMap(vid -> 0)
else SPMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we switch to an array implementation of the map then perhaps set the distance to MaxInt (or MaxDouble if we switch to weighted edge).

}
.mapTriplets{ edge => edge.srcAttr }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mapTriplets call do? It doesn't seem that we access edge.attr later on.


val initialMessage = SPMap()

def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
plus(attr, msg)
}

def sendMessage(edge: EdgeTriplet[SPMap, SPMap]): Iterator[(VertexId, SPMap)] = {
val newAttr = increment(edge.srcAttr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth considering adding support for edge weights instead of assuming all edges are length 1.

if (edge.dstAttr != plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr))
else Iterator.empty
}

def messageCombiner(s1: SPMap, s2: SPMap): SPMap = {
plus(s1, s2)
}

Pregel(spGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import org.scalatest.FunSuite

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._

class ShortestPathsSuite extends FunSuite with LocalSparkContext {

test("Shortest Path Computations") {
withSpark { sc =>
val shortestPaths = Set((1,Map(1 -> 0, 4 -> 2)), (2,Map(1 -> 1, 4 -> 2)), (3,Map(1 -> 2, 4 -> 1)),
(4,Map(1 -> 2, 4 -> 0)), (5,Map(1 -> 1, 4 -> 1)), (6,Map(1 -> 3, 4 -> 1)))
val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap{ case e => Seq(e, e.swap) }
val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
val graph = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { case (v, spMap) => (v, spMap.mapValues(_.get)) }
assert(results.toSet === shortestPaths)
}
}

}