[SPARK-30503][ML] OnlineLDAOptimizer does not handle persistance correctly#27261
[SPARK-30503][ML] OnlineLDAOptimizer does not handle persistance correctly#27261zhengruifeng wants to merge 1 commit intoapache:masterfrom
Conversation
|
testCode: import org.apache.spark.ml.clustering.LDA
val dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
val lda = new LDA().setK(10).setMaxIter(100).setOptimizer("em")
sc.getPersistentRDDs
val start = System.currentTimeMillis; val model = lda.fit(dataset); val end = System.currentTimeMillis; end - start
sc.getPersistentRDDs
sc.getPersistentRDDs.size
sc.getPersistentRDDs.foreach(println)this PR: start: Long = 1579250257523
model: org.apache.spark.ml.clustering.LDAModel = DistributedLDAModel: uid=lda_2a48ae87b788, k=10, numFeatures=11
end: Long = 1579250268529
res1: Long = 11006
scala> sc.getPersistentRDDs.foreach(println)
(2441,EdgeRDD MapPartitionsRDD[2441] at mapPartitions at EdgeRDDImpl.scala:119)
(2438,VertexRDD, VertexRDD ZippedPartitionsRDD2[2438] at zipPartitions at VertexRDD.scala:322)
(29,VertexRDD, VertexRDD ZippedPartitionsRDD2[29] at zipPartitions at VertexRDD.scala:322)
(32,EdgeRDD MapPartitionsRDD[32] at mapPartitions at EdgeRDDImpl.scala:119)master: scala> val start = System.currentTimeMillis; val model = lda.fit(dataset); val end = System.currentTimeMillis; end - start
start: Long = 1579255989886
model: org.apache.spark.ml.clustering.LDAModel = DistributedLDAModel: uid=lda_f600c29d8e0a, k=10, numFeatures=11
end: Long = 1579256001181
res1: Long = 11295
scala> sc.getPersistentRDDs.size
res2: Int = 106There seems no perfermance regression. |
|
The previous graph is handled like var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog)
graphCheckpointer.update(g)
...
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist()
prevG.unpersistVertices()
prevG.edges.unpersist()
// count the iteration
i += 1
}It is somewhat stranger that And the override protected def persist(data: Graph[VD, ED]): Unit = {
if (data.vertices.getStorageLevel == StorageLevel.NONE) {
/* We need to use cache because persist does not honor the default storage level requested
* when constructing the graph. Only cache does that.
*/
data.vertices.cache()
}
if (data.edges.getStorageLevel == StorageLevel.NONE) {
data.edges.cache()
}
}
override protected def unpersist(data: Graph[VD, ED]): Unit = data.unpersist() |
|
Test build #116933 has finished for PR 27261 at commit
|
|
ping @srowen |
| val docTopicDistributions: VertexRDD[TopicCounts] = | ||
| graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg) | ||
| .mapValues(_._2) | ||
| val prevGraph = graph |
There was a problem hiding this comment.
I buy it, but where is graph persisted again?
There was a problem hiding this comment.
@zhengruifeng I'm still kind of curious, but trust your judgment so I will merge it.
There was a problem hiding this comment.
Thanks! I am on the holidy, so can no reply quickly.
Current change is following what Pregel does,
I will go on looking into it, since the persistance in GraphX is a little complex.
What changes were proposed in this pull request?
unpersist graph outside checkpointer, like what Pregel does
Why are the changes needed?
Shown in SPARK-30503, intermediate edges are not unpersisted
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing testsuites and manual test