Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18845][GraphX] PageRank has incorrect initialization value that leads to slow convergence #16271

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 12 additions & 12 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Expand Up @@ -115,17 +115,17 @@ object PageRank extends Logging {
val src: VertexId = srcId.getOrElse(-1L)

// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute resetProb.
// weight 1/outDegree and each vertex with attribute 1.0.
// When running personalized pagerank, only the source vertex
// has an attribute resetProb. All others are set to 0.
// has an attribute 1.0. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices { (id, attr) =>
if (!(id != src && personalized)) resetProb else 0.0
if (!(id != src && personalized)) 1.0 else 0.0
}

def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
Expand All @@ -150,8 +150,8 @@ object PageRank extends Logging {
(src: VertexId, id: VertexId) => resetProb
}

rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
(id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * msgSumOpt.getOrElse(0.0)
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
Expand Down Expand Up @@ -196,7 +196,7 @@ object PageRank extends Logging {
// we won't be able to store its activations in a sparse vector
val zero = Vectors.sparse(sources.size, List()).asBreeze
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
(vid, v)
}.toMap
val sc = graph.vertices.sparkContext
Expand Down Expand Up @@ -225,11 +225,11 @@ object PageRank extends Logging {
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
(a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)

rankGraph = rankGraph.joinVertices(rankUpdates) {
(vid, oldRank, msgSum) =>
val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a related but slightly separate fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its an intertwined bug in the implementation that was introduced at the same time (when moving away from Pregel in 15a5645). The only vertices not included in the original joinVertices are source vertices (those with no incoming edges). Normally (in the absence of sinks) source vertices would have page rank equal to the reset probability. Since source vertices were not included in the join their rank was fixed at their initial value, which fortunately was the correct value. When we change the initial value of all vertices to 1 it exposes this error.

(vid, oldRank, msgSumOpt) =>
val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - resetProb)
val resetActivations = if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
sourcesInitMapBC.value(vid) :* resetProb
} else {
zero
}
Expand Down Expand Up @@ -307,7 +307,7 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initialPR, delta = 0)
.mapVertices { (id, attr) =>
if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0)
}
.cache()

Expand All @@ -323,7 +323,7 @@ object PageRank extends Logging {
msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
var teleport = oldPR
val delta = if (src==id) 1.0 else 0.0
val delta = if (src==id) resetProb else 0.0
teleport = oldPR*delta

val newPR = teleport + (1.0 - resetProb) * msgSum
Expand Down
Expand Up @@ -41,7 +41,7 @@ object GridPageRank {
}
}
// compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb)
var pr = Array.fill(nRows * nCols)(1.0)
for (iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
Expand Down Expand Up @@ -70,10 +70,10 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val resetProb = 0.15
val errorTol = 1.0e-5

val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices
val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache()
val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few extra iterations makes the test more robust?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really more robust since it has a sink and thus is still wrong pending SPARK-18847. But it is needed with the change to fully propagate the change in rank of source vertices in the first iteration as explained above.

val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache()

// Static PageRank should only take 2 iterations to converge
// Static PageRank should only take 3 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum()
Expand Down Expand Up @@ -203,4 +203,30 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
}
}

test("Loop with source PageRank") {
withSpark { sc =>
val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) :: Nil)
val g = Graph.fromEdgeTuples(edges, 1)
val resetProb = 0.15
val tol = 0.0001
val numIter = 50
val errorTol = 1.0e-5

val staticRanks = g.staticPageRank(numIter, resetProb).vertices
val dynamicRanks = g.pageRank(tol, resetProb).vertices
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)

// Computed in igraph 1.0 w/ R bindings:
// > page_rank(graph_from_literal( A -+ B -+ C -+ D -+ B))
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,4),(4,2)]))
// We multiply by the number of vertices to account for difference in normalization
val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4)
val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
assert(compareRanks(staticRanks, ranks) < errorTol)
assert(compareRanks(dynamicRanks, ranks) < errorTol)

}
}
}