Skip to content

Commit

Permalink
[SPARK-18847][GRAPHX] PageRank gives incorrect results for graphs wit…
Browse files Browse the repository at this point in the history
…h sinks

## What changes were proposed in this pull request?

Graphs with sinks (vertices with no outgoing edges) don't have the expected rank sum of n (or 1 for personalized). We fix this by normalizing to the expected sum at the end of each implementation.

Additionally this fixes the dynamic version of personal pagerank which gave incorrect answers that were not detected by existing unit tests.

## How was this patch tested?

Revamped existing and additional unit tests with reference values (and reproduction code) from igraph and NetworkX.

Note that for comparison on personal pagerank we use the arpack algorithm in igraph as prpack (the  current default) redistributes rank to all vertices uniformly instead of just to the personalization source. We could take the alternate convention (redistribute rank to all vertices uniformly) but that would involve more extensive changes to the algorithms (the dynamic version would no longer be able to use Pregel).

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #16483 from aray/pagerank-sink2.
  • Loading branch information
aray authored and rxin committed Mar 17, 2017
1 parent 376d782 commit bfdeea5
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 59 deletions.
45 changes: 32 additions & 13 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ object PageRank extends Logging {
iteration += 1
}

rankGraph
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
}

/**
Expand All @@ -179,7 +180,8 @@ object PageRank extends Logging {
* @param resetProb The random reset probability
* @param sources The list of sources to compute personalized pagerank from
* @return the graph with vertex attributes
* containing the pagerank relative to all starting nodes (as a sparse vector) and
* containing the pagerank relative to all starting nodes (as a sparse vector
* indexed by the position of nodes in the sources list) and
* edge attributes the normalized edge weight
*/
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
Expand All @@ -194,6 +196,8 @@ object PageRank extends Logging {

// TODO if one sources vertex id is outside of the int range
// we won't be able to store its activations in a sparse vector
require(sources.max <= Int.MaxValue.toLong,
s"This implementation currently only works for source vertex ids at most ${Int.MaxValue}")
val zero = Vectors.sparse(sources.size, List()).asBreeze
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
Expand Down Expand Up @@ -245,8 +249,10 @@ object PageRank extends Logging {
i += 1
}

// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
val rankSums = rankGraph.vertices.values.fold(zero)(_ :+ _)
rankGraph.mapVertices { (vid, attr) =>
Vectors.fromBreeze(attr)
Vectors.fromBreeze(attr :/ rankSums)
}
}

Expand Down Expand Up @@ -307,7 +313,7 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initialPR, delta = 0)
.mapVertices { (id, attr) =>
if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0)
if (id == src) (0.0, Double.NegativeInfinity) else (0.0, 0.0)
}
.cache()

Expand All @@ -322,13 +328,12 @@ object PageRank extends Logging {
def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
var teleport = oldPR
val delta = if (src==id) resetProb else 0.0
teleport = oldPR*delta

val newPR = teleport + (1.0 - resetProb) * msgSum
val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
(newPR, newDelta)
val newPR = if (lastDelta == Double.NegativeInfinity) {
1.0
} else {
oldPR + (1.0 - resetProb) * msgSum
}
(newPR, newPR - oldPR)
}

def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
Expand All @@ -353,9 +358,23 @@ object PageRank extends Logging {
vertexProgram(id, attr, msgSum)
}

Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
val rankGraph = Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vp, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
} // end of deltaPageRank

// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
}

// Normalizes the sum of ranks to n (or 1 if personalized)
private def normalizeRankSum(rankGraph: Graph[Double, Double], personalized: Boolean) = {
val rankSum = rankGraph.vertices.values.sum()
if (personalized) {
rankGraph.mapVertices((id, rank) => rank / rankSum)
} else {
val numVertices = rankGraph.numVertices
val correctionFactor = numVertices.toDouble / rankSum
rankGraph.mapVertices((id, rank) => rank * correctionFactor)
}
}
}
158 changes: 112 additions & 46 deletions graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ object GridPageRank {
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
}
}
(0L until (nRows * nCols)).zip(pr)
val prSum = pr.sum
(0L until (nRows * nCols)).zip(pr.map(_ * pr.length / prSum))
}

}
Expand All @@ -68,26 +69,34 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
val tol = 0.0001
val numIter = 2
val errorTol = 1.0e-5

val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices
val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache()
val staticRanks = starGraph.staticPageRank(numIter, resetProb).vertices.cache()
val staticRanks2 = starGraph.staticPageRank(numIter + 1, resetProb).vertices

// Static PageRank should only take 3 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
// Static PageRank should only take 2 iterations to converge
val notMatching = staticRanks.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum()
assert(notMatching === 0)

val staticErrors = staticRanks2.map { case (vid, pr) =>
val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) ))
val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5)
if (!correct) 1 else 0
}
assert(staticErrors.sum === 0)
val dynamicRanks = starGraph.pageRank(tol, resetProb).vertices.cache()
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)

// Computed in igraph 1.0 w/ R bindings:
// > page_rank(make_star(100, mode = "in"))
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]))
// We multiply by the number of vertices to account for difference in normalization
val centerRank = 0.462394787 * nVertices
val othersRank = 0.005430356 * nVertices
val igraphPR = centerRank +: Seq.fill(nVertices - 1)(othersRank)
val ranks = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR))
assert(compareRanks(staticRanks, ranks) < errorTol)
assert(compareRanks(dynamicRanks, ranks) < errorTol)

val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
}
} // end of test Star PageRank

Expand All @@ -96,51 +105,62 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
val tol = 0.0001
val numIter = 2
val errorTol = 1.0e-5

val staticRanks1 = starGraph.staticPersonalizedPageRank(0, numIter = 1, resetProb).vertices
val staticRanks2 = starGraph.staticPersonalizedPageRank(0, numIter = 2, resetProb)
.vertices.cache()

// Static PageRank should only take 2 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
val staticRanks = starGraph.staticPersonalizedPageRank(0, numIter, resetProb).vertices.cache()

val staticErrors = staticRanks2.map { case (vid, pr) =>
val correct = (vid > 0 && pr == 0.0) ||
(vid == 0 && pr == resetProb)
if (!correct) 1 else 0
}
assert(staticErrors.sum === 0)

val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
val dynamicRanks = starGraph.personalizedPageRank(0, tol, resetProb).vertices.cache()
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)

val parallelStaticRanks1 = starGraph
.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
val parallelStaticRanks = starGraph
.staticParallelPersonalizedPageRank(Array(0), numIter, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)

// Computed in igraph 1.0 w/ R bindings:
// > page_rank(make_star(100, mode = "in"), personalized = c(1, rep(0, 99)), algo = "arpack")
// NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all
// vertices uniformly instead of just to the personalization source.
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]),
// personalization=dict([(x, 1 if x == 0 else 0) for x in range(0,100)]))
// We multiply by the number of vertices to account for difference in normalization
val igraphPR0 = 1.0 +: Seq.fill(nVertices - 1)(0.0)
val ranks0 = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR0))
assert(compareRanks(staticRanks, ranks0) < errorTol)
assert(compareRanks(dynamicRanks, ranks0) < errorTol)

val parallelStaticRanks2 = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)

// We have one outbound edge from 1 to 0
val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
val otherStaticRanks = starGraph.staticPersonalizedPageRank(1, numIter, resetProb)
.vertices.cache()
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
val otherParallelStaticRanks2 = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
val otherDynamicRanks = starGraph.personalizedPageRank(1, tol, resetProb).vertices.cache()
val otherParallelStaticRanks = starGraph
.staticParallelPersonalizedPageRank(Array(0, 1), numIter, resetProb).mapVertices {
case (vertexId, vector) => vector(1)
}.vertices.cache()
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
assert(compareRanks(otherDynamicRanks, otherStaticRanks) < errorTol)
assert(compareRanks(otherStaticRanks, otherParallelStaticRanks) < errorTol)
assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks) < errorTol)

// Computed in igraph 1.0 w/ R bindings:
// > page_rank(make_star(100, mode = "in"),
// personalized = c(0, 1, rep(0, 98)), algo = "arpack")
// NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all
// vertices uniformly instead of just to the personalization source.
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]),
// personalization=dict([(x, 1 if x == 1 else 0) for x in range(0,100)]))
val centerRank = 0.4594595
val sourceRank = 0.5405405
val igraphPR1 = centerRank +: sourceRank +: Seq.fill(nVertices - 2)(0.0)
val ranks1 = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR1))
assert(compareRanks(otherStaticRanks, ranks1) < errorTol)
assert(compareRanks(otherDynamicRanks, ranks1) < errorTol)
assert(compareRanks(otherParallelStaticRanks, ranks1) < errorTol)
}
} // end of test Star PersonalPageRank

Expand Down Expand Up @@ -229,4 +249,50 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {

}
}

test("Loop with sink PageRank") {
withSpark { sc =>
val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 1L) :: (1L, 4L) :: Nil)
val g = Graph.fromEdgeTuples(edges, 1)
val resetProb = 0.15
val tol = 0.0001
val numIter = 20
val errorTol = 1.0e-5

val staticRanks = g.staticPageRank(numIter, resetProb).vertices.cache()
val dynamicRanks = g.pageRank(tol, resetProb).vertices.cache()

assert(compareRanks(staticRanks, dynamicRanks) < errorTol)

// Computed in igraph 1.0 w/ R bindings:
// > page_rank(graph_from_literal( A -+ B -+ C -+ A -+ D))
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,1),(1,4)]))
// We multiply by the number of vertices to account for difference in normalization
val igraphPR = Seq(0.3078534, 0.2137622, 0.2646223, 0.2137622).map(_ * 4)
val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
assert(compareRanks(staticRanks, ranks) < errorTol)
assert(compareRanks(dynamicRanks, ranks) < errorTol)

val p1staticRanks = g.staticPersonalizedPageRank(1, numIter, resetProb).vertices.cache()
val p1dynamicRanks = g.personalizedPageRank(1, tol, resetProb).vertices.cache()
val p1parallelDynamicRanks =
g.staticParallelPersonalizedPageRank(Array(1, 2, 3, 4), numIter, resetProb)
.vertices.mapValues(v => v(0)).cache()

// Computed in igraph 1.0 w/ R bindings:
// > page_rank(graph_from_literal( A -+ B -+ C -+ A -+ D), personalized = c(1, 0, 0, 0),
// algo = "arpack")
// NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all
// vertices uniformly instead of just to the personalization source.
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,1),(1,4)]), personalization={1:1, 2:0, 3:0, 4:0})
val igraphPR2 = Seq(0.4522329, 0.1921990, 0.1633691, 0.1921990)
val ranks2 = VertexRDD(sc.parallelize(1L to 4L zip igraphPR2))
assert(compareRanks(p1staticRanks, ranks2) < errorTol)
assert(compareRanks(p1dynamicRanks, ranks2) < errorTol)
assert(compareRanks(p1parallelDynamicRanks, ranks2) < errorTol)

}
}
}

0 comments on commit bfdeea5

Please sign in to comment.