-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11898] [MLlib] Use broadcast for the global tables in Word2Vec #9878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #46468 has finished for PR 9878 at commit
|
|
I think this looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcasting is a good idea, but these values are modified (based on aggregated updates) on each iteration. You'll need to make a new broadcast variable on each iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may also change the results of your timing tests; would you be able to re-run them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Joseph, actually I found broadcast variable can somehow automatically get updated...
an example
val arr = (0 to 16).toArray
val bc = sc.broadcast(arr)
val rdd = sc.parallelize(1 to 8)
for(w <- 1 to 10){
val result = rdd.map(i => bc.value(2)).collect().mkString(", ")
println(result)
arr(2) = new Random().nextInt()
}
The code will print different numbers in the 10 iterations.
I'm not sure if it's by design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hhbyyh Joseph is correct. What you see only happens to work since you are running locally in one JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got where it went wrong.
I tested on the cluster with a different edition
val value = bc.value
val result = rdd.map(i => value(2)).collect().mkString(", ")
Anyway, you are correct. Thanks
|
I changed it to creating a new broadcast variable in each iteration. |
|
Test build #46599 has finished for PR 9878 at commit
|
|
LGTM again -- good thing @jkbradley reviewed so I will pause a bit for him to give an OK, if possible. |
|
Yes, I think it's correct now. My only question is if we should explicitly unpersist the broadcast vars after synAgg is created (by a collect, so it should be safe to unpersist then). I don't really know how quickly they would be cleaned up after going out of scope. (Do you?) |
|
@jkbradley I tried to add unpersist(false) at the end but it seems made no difference. |
|
Still I get no difference. Yet I think it's still reasonable to add the unpersist. |
|
Test build #46759 has finished for PR 9878 at commit
|
|
@jkbradley OK with merging? |
|
Merged to master |
|
Thanks for merging it. Yes, it looks good to me. Thanks @hhbyyh |
jira: https://issues.apache.org/jira/browse/SPARK-11898
syn0Global and sync1Global in word2vec are quite large objects with size (vocab * vectorSize * 8), yet they are passed to worker using basic task serialization.
Use broadcast can greatly improve the performance. My benchmark shows that, for 1M vocabulary and default vectorSize 100, changing to broadcast can help,
This will also help extend the upper limit for Word2Vec.