-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27295][GraphX] Provision to provide the initial scores for source nodes while running Personalized Page Rank #24230
Conversation
…g Personalized Page Rank - SPARK-27295
I have added a new API with the same name. I tried avoiding the private method by simply calling the new API from the old API method. Is that okay? |
@@ -196,11 +196,45 @@ object PageRank extends Logging { | |||
require(sources.nonEmpty, s"The list of sources must be non-empty," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove these checks if they are now checked in the other method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -196,11 +196,45 @@ object PageRank extends Logging { | |||
require(sources.nonEmpty, s"The list of sources must be non-empty," + | |||
s" but got ${sources.mkString("[", ",", "]")}") | |||
|
|||
val sourcesWithScores = sources zip Array.fill(sources.size)(1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use explicit '.' notation
But this can be sources.map((_, 1.0))
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val zero = Vectors.sparse(sources.size, List()).asBreeze | ||
// map of vid -> vector where for each vid, the _position of vid in source_ is set to 1.0 | ||
// map of vid -> vector where for each vid, the _position of vid in source_ is set to provided score | ||
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you're here, replace vid
with (vid, score)
so that you can avoid ._1
syntax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thing - can you add a test for the new method? can just follow any existing tests for the existing method.
@EshwarSR Ah, OK, we have a little more work here. This method is also, or 'really', exposed by GraphOps.staticParallelPersonalizedPageRank. And that is tested by PageRankSuite. I think we need the same sort of new method in GraphOps, just one that calls the new variation on the method you added. Then see about refactoring the existing tests to add in one simple new test of the new functionality. |
@EshwarSR if you can make a few more changes here per the last comment, I think this can be merged. |
HI @srowen, got held up with other work. Will get it done ASAP. Thanks! |
Hi @srowen I've done the changes mentioned in your comment. |
* indexed by the position of nodes in the sources list) and | ||
* edge attributes the normalized edge weight | ||
* | ||
* @since 3.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this has to be an annotation like @Since("3.0.0")
on the method, not within the scaladoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused by seeing this. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
I fixed it now.
@@ -115,7 +115,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { | |||
assert(compareRanks(staticRanks, dynamicRanks) < errorTol) | |||
|
|||
val parallelStaticRanks = starGraph | |||
.staticParallelPersonalizedPageRank(Array(0), numIter, resetProb).mapVertices { | |||
.staticParallelPersonalizedPageRank(Array((0, 1.0)), numIter, resetProb).mapVertices { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than modify the existing tests, is it possible to create a new test that uses initial values that aren't 1? that would help verify the behavior is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought logically it would be the same. So should I still create a separate test for it?
If yes, I need a little guidance here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose you aren't testing that the implementation correctly passes the initial scores and uses them, nor testing that the original method that causes 1 to be the default works now (unless some tests still cover this). There's no reason to make elaborate tests, but is there any simple test case you can copy/paste that shows the result is different and basically correct with initial scores that aren't 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@EshwarSR if you can add any minimal test of initial scores that aren't 1, and probably leave the existing tests, this can be merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @srowen I just tried comparing the scores b/w networkx and our new implementation. The scores are not aligned. Hence I was debugging if there is any issue with the code. Do you observer any place there is an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know this code much at all. Yeah that's the kind of thing that's important to test -- do scores that aren't 1 work as intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried running the old implementation with hardcoded 1.0 as initial scores. Even the scores from that and networkx dont match. I'm sort of confused what is going wrong. I guess I need to see the networkx implementation to understand the exact difference between the implementations.
Can one of the admins verify this patch? |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
The present implementation of parallel personalized page rank algorithm takes only node ids as the starting nodes for algorithm. And then it assigns initial value of 1.0 to all those source nodes.
But the user might also be interested in specifying the initial values for each node.