From 9149ca233722e3d70aefb223dfd6a16ee8dbf924 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Mon, 12 Dec 2016 12:34:49 -0600 Subject: [PATCH 1/6] fix --- .../main/scala/org/apache/spark/graphx/lib/PageRank.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index feb3f47667f8c..51236ec93e2c9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -125,7 +125,7 @@ object PageRank extends Logging { .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) // Set the vertex attributes to the initial pagerank values .mapVertices { (id, attr) => - if (!(id != src && personalized)) resetProb else 0.0 + if (!(id != src && personalized)) 1.0 else 0.0 } def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } @@ -196,7 +196,7 @@ object PageRank extends Logging { // we won't be able to store its activations in a sparse vector val zero = Vectors.sparse(sources.size, List()).asBreeze val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => - val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze + val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze (vid, v) }.toMap val sc = graph.vertices.sparkContext @@ -307,7 +307,7 @@ object PageRank extends Logging { .mapTriplets( e => 1.0 / e.srcAttr ) // Set the vertex attributes to (initialPR, delta = 0) .mapVertices { (id, attr) => - if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0) + if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0) } .cache() From b145376d88b6f5e58e2b9d051d9c268a36b9f939 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Tue, 13 Dec 2016 09:51:06 -0600 Subject: [PATCH 2/6] fix initial value for grid graph independent calculation --- .../test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index b6305c8d00aba..ba3865116546f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -41,7 +41,7 @@ object GridPageRank { } } // compute the pagerank - var pr = Array.fill(nRows * nCols)(resetProb) + var pr = Array.fill(nRows * nCols)(1.0) for (iter <- 0 until nIter) { val oldPr = pr pr = new Array[Double](nRows * nCols) From d39d2f07ab1a1aadb24dbd67bbbe37400beaadb4 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Tue, 13 Dec 2016 10:25:15 -0600 Subject: [PATCH 3/6] use outer join so that sources are updated and fix reset probability for personalized --- .../org/apache/spark/graphx/lib/PageRank.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 51236ec93e2c9..db18d780de017 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -150,8 +150,8 @@ object PageRank extends Logging { (src: VertexId, id: VertexId) => resetProb } - rankGraph = rankGraph.joinVertices(rankUpdates) { - (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum + rankGraph = rankGraph.outerJoinVertices(rankUpdates) { + (id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * msgSumOpt.getOrElse(0.0) }.cache() rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices @@ -225,11 +225,11 @@ object PageRank extends Logging { ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src) - rankGraph = rankGraph.joinVertices(rankUpdates) { - (vid, oldRank, msgSum) => - val popActivations: BV[Double] = msgSum :* (1.0 - resetProb) + rankGraph = rankGraph.outerJoinVertices(rankUpdates) { + (vid, oldRank, msgSumOpt) => + val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { - sourcesInitMapBC.value(vid) + sourcesInitMapBC.value(vid) :* resetProb } else { zero } @@ -323,7 +323,7 @@ object PageRank extends Logging { msgSum: Double): (Double, Double) = { val (oldPR, lastDelta) = attr var teleport = oldPR - val delta = if (src==id) 1.0 else 0.0 + val delta = if (src==id) resetProb else 0.0 teleport = oldPR*delta val newPR = teleport + (1.0 - resetProb) * msgSum From 7ea03a88a3d9caa0ab7a7e6e681b8bf00b5cc128 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Tue, 13 Dec 2016 10:36:10 -0600 Subject: [PATCH 4/6] fix star page rank test to account for sources getting updated in the first iteration which then changes the center in the second iteration --- .../scala/org/apache/spark/graphx/lib/PageRankSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index ba3865116546f..4e5ec2ea56ccd 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -70,10 +70,10 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val resetProb = 0.15 val errorTol = 1.0e-5 - val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices - val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() + val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices + val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache() - // Static PageRank should only take 2 iterations to converge + // Static PageRank should only take 3 iterations to converge val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => if (pr1 != pr2) 1 else 0 }.map { case (vid, test) => test }.sum() From 33cd79400d546b60a8fd87c8a7a0612f97ea8ebb Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Tue, 13 Dec 2016 15:54:24 -0600 Subject: [PATCH 5/6] additional unit test with comparison to igraph/networkx --- .../spark/graphx/lib/PageRankSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 4e5ec2ea56ccd..6afbb5a959894 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -203,4 +203,30 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) } } + + test("Loop with source PageRank") { + withSpark { sc => + val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) :: Nil) + val g = Graph.fromEdgeTuples(edges, 1) + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 50 + val errorTol = 1.0e-5 + + val staticRanks = g.staticPageRank(numIter, resetProb).vertices + val dynamicRanks = g.pageRank(tol, resetProb).vertices + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + + // Computed in igraph 1.0 w/ R bindings: + // > page_rank(graph_from_literal( A -+ B -+ C -+ D -+ B)) + // Alternatively in NetworkX 1.11: + // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,4),(4,2)])) + // We multiply by the number of vertices to account for difference in normalization + val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4) + val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR)) + assert(compareRanks(staticRanks, ranks) < errorTol) + assert(compareRanks(dynamicRanks, ranks) < errorTol) + + } + } } From 8be9a9765d10331ae1b5c15ff753bb5c2697acfc Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Wed, 14 Dec 2016 11:33:29 -0600 Subject: [PATCH 6/6] update comment about rankGraph initialization --- .../src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index db18d780de017..37b6e453592e5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -115,9 +115,9 @@ object PageRank extends Logging { val src: VertexId = srcId.getOrElse(-1L) // Initialize the PageRank graph with each edge attribute having - // weight 1/outDegree and each vertex with attribute resetProb. + // weight 1/outDegree and each vertex with attribute 1.0. // When running personalized pagerank, only the source vertex - // has an attribute resetProb. All others are set to 0. + // has an attribute 1.0. All others are set to 0. var rankGraph: Graph[Double, Double] = graph // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }