Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-1405][MLLIB]collapsed Gibbs sampling based latent Dirichlet allocation #1983

Closed
wants to merge 1 commit into from

Conversation

@witgo
Copy link
Contributor

witgo commented Aug 16, 2014

This PR is based on @yinxusen's #476
The performance test:

500topics:

Item value
The cluster resource 36 executors(36 cores, 216g memory)
The corpus size 196558 document, 27719730 words
The number of iterations 100
The number of term 4853087
The number of topics 500
alpha 0.1
beta 0.01
The running time 266 minutes

1000 topics:

Item value
The cluster resource 36 executors(36 cores, 216g memory)
The corpus size 196558 document, 27719730 words
The number of iterations 100
The number of term 4853087
The number of topics 1000
alpha 0.1
beta 0.01
The running time 443 minutes

2000 topics:

Item value
The cluster resource 36 executors(36 cores, 216g memory)
The corpus size 253064 document, 29696335 words
The number of iterations 150
The number of term 75496
The number of topics 2000
alpha 0.1
beta 0.01
The running time 795 minutes

conf/spark-defaults.conf:

spark.akka.frameSize   20
spark.executor.instances 36
spark.rdd.compress true
spark.executor.memory   6g
spark.default.parallelism  72
spark.broadcast.blockSize  8192
spark.storage.memoryFraction 0.4
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1983 at commit 58888c1.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1983 at commit 58888c1.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleBlockManager(blockManager: BlockManager,
    • case class Document(docId: Int, content: Iterable[Int], var topics: Iterable[Int] = null,
    • class LDAParamsAccumulableParam extends AccumulableParam[LDAParams, (Int, Int, Int, Int)]
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1983 at commit 645f4ef.

  • This patch merges cleanly.
@witgo witgo changed the title [WIP][SPARK-1405]Collapsed Gibbs sampling based Latent Dirichlet Allocation [WIP][SPARK-1405][MLLIB]Collapsed Gibbs sampling based Latent Dirichlet Allocation Aug 16, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1983 at commit 645f4ef.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Iterable[Int], var topics: Iterable[Int] = null,
    • class LDAParamsAccumulableParam extends AccumulableParam[LDAParams, (Int, Int, Int, Int)]
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 17, 2014

QA tests have started for PR 1983 at commit 016bd7b.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 17, 2014

QA tests have finished for PR 1983 at commit 016bd7b.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1983 at commit 2bdce26.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 1983 at commit 2bdce26.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@witgo witgo force-pushed the witgo:cgs_lda branch Aug 25, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 25, 2014

QA tests have started for PR 1983 at commit b8db245.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 25, 2014

QA tests have finished for PR 1983 at commit b8db245.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Iterable[Int], var topics: Iterable[Int] = null,
@witgo witgo force-pushed the witgo:cgs_lda branch Aug 26, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 26, 2014

QA tests have started for PR 1983 at commit 05a1c79.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 26, 2014

QA tests have finished for PR 1983 at commit 05a1c79.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Iterable[Int], var topics: Iterable[Int] = null,
@mengxr

This comment has been minimized.

Copy link
Contributor

mengxr commented Aug 27, 2014

@witgo Thanks for working on LDA! Could you briefly describe what you changed in this PR? The major feedback of #476 is how we store the model, which may be worth more discussion before we update the implementation. Let's talk about this in the JIRA page.

@witgo witgo force-pushed the witgo:cgs_lda branch Aug 28, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 28, 2014

QA tests have started for PR 1983 at commit 46cf160.

  • This patch merges cleanly.
@witgo witgo force-pushed the witgo:cgs_lda branch Aug 28, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 28, 2014

QA tests have started for PR 1983 at commit 9bb1931.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 28, 2014

QA tests have finished for PR 1983 at commit 9bb1931.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Iterable[Int], var topics: Iterable[Int] = null,
    • class TopicCounters(val topicCounts: BV[Double],
@witgo

This comment has been minimized.

Copy link
Contributor Author

witgo commented Aug 28, 2014

@mengxr This patch removed the accumulable operation . repair formula errors in dropOneDistSampler method and some of the performance optimization. About how I store model, I have not yet mature ideas.

@witgo witgo force-pushed the witgo:cgs_lda branch Aug 28, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 28, 2014

QA tests have started for PR 1983 at commit c575afd.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 28, 2014

QA tests have finished for PR 1983 at commit 46cf160.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 28, 2014

QA tests have finished for PR 1983 at commit c575afd.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Iterable[Int], var topics: Iterable[Int] = null,
    • class TopicCounters(val topicCounts: BV[Double],
@witgo witgo force-pushed the witgo:cgs_lda branch Aug 29, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 29, 2014

QA tests have started for PR 1983 at commit 5a207c7.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Aug 29, 2014

Tests timed out after a configured wait of 120m.

@witgo witgo force-pushed the witgo:cgs_lda branch to b21b9c1 Sep 1, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 1983 at commit 41b03c2.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 1983 at commit 41b03c2.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Array[Int])
@allwefantasy

This comment has been minimized.

Copy link

allwefantasy commented Sep 11, 2014

@witgo 下面这一段代码可以多线程化么?

   for (i <- 0 until content.length) {
        val term = content(i)
        val topic = topics(i)
        val chosenTopic = topicModel.dropOneDistSampler(topicsDist, topicThisTerm,
          rand, term, topic)
        if (topic != chosenTopic) {
          topics(i) = chosenTopic
          topicsDist(topic) += -1
          topicsDist(chosenTopic) += 1
          topicModel.update(term, topic, -1)
          topicModel.update(term, chosenTopic, 1)
        }
      }

将此代码改成

 content.zipWithIndex.map(f=>f._2).toList.par.foreach{i=>
        val term = content(i)
        val topic = topics(i)
        val chosenTopic = topicModel.dropOneDistSampler(topicsDist, topicThisTerm,
          rand, term, topic)
        if (topic != chosenTopic) {
          topics(i) = chosenTopic
          topicsDist(topic) += -1
          topicsDist(chosenTopic) += 1
          topicModel.update(term, topic, -1)
          topicModel.update(term, chosenTopic, 1)
        }
      }

我目前的情况是集群中单机CPU核多,24核,但内存有限,所以无法充分利用cpu资源。希望多线程化一部分代码。

@witgo

This comment has been minimized.

Copy link
Contributor Author

witgo commented Sep 11, 2014

@allwefantasy Spark是可以调整executor同时运行的task数量的.
如果你想让每个executor同时可以运行17个task. 可以在conf/spark-defaults.conf 文件添加如下配置

 spark.executor.cores 17
@allwefantasy

This comment has been minimized.

Copy link

allwefantasy commented Sep 11, 2014

@witgo 感谢这个技巧的分享。 我目前还遇到一个问题。昨天你问我这边24w文档的words是多少,我统计了下,是 2400w words 计算方式是(parsedData.map(f:Document=>f.content.size).sum()),term 数是8w。 初始化非常快,只要分钟左右就跑完。但进行第一轮迭代时候,每个task 大概需要序列化26m的数据。然后到Cleaned broadcast 后 spark-shell 就没有反应了。 进入类似 http://csdn-hdp-nn-01:4040/stages/stage/?id=11 这种url 后task 显示都是running,然后我看了下每个worker 老年代什么的都是正常的。但是cpu很空闲,感觉人物都没有在跑的样子。你有遇到这个问题么?

qq20140911-3

之后就一直卡在这了 没反应。

qq20140911-2
qq20140911-1

@witgo

This comment has been minimized.

Copy link
Contributor Author

witgo commented Sep 11, 2014

@allwefantasy 现有的代码在迭代计算过程中创建了太多的TopicModel实例, 我现在正在尝试解决这个问题.
感谢你的反馈.

@allwefantasy

This comment has been minimized.

Copy link

allwefantasy commented Sep 11, 2014

@witgo 好的。如果有更新后请通知我。我这里也可以第一时间进行测试。

@witgo witgo force-pushed the witgo:cgs_lda branch Sep 12, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 12, 2014

QA tests have started for PR 1983 at commit 14284f1.

  • This patch merges cleanly.
@mengxr

This comment has been minimized.

Copy link
Contributor

mengxr commented Sep 12, 2014

@witgo @allwefantasy

English 自动翻译的中文
Let's try to keep the comments in English as much as possible. 让我们尽量保持意见的英文尽可能。
If you feel hard or impossible to express something in English, maybe we can try putting Chinese and auto-translated English side-by-side. 如果你觉得很难或无法表达的东西用英文,也许我们可以尝试把中国和自动翻译英文并排侧。
This is definitely better than writing nothing and also help non-Chinese developers understand what is going on at a high level. 这绝对是比写什么好,也有利于非中国开发商明白什么是在一个较高的水平怎么回事。
I'm trying to demo this by putting Google-translated Chinese on the right side. 我试图通过将谷歌翻译的中国右侧来演示这一点。
It is definitely not high-quality but better than nothing. 这绝对不是高品质的,但总比没有好。
@mengxr

This comment has been minimized.

Copy link
Contributor

mengxr commented Sep 12, 2014

@witgo @allwefantasy We had an offline discussion about LDA's implementation. Please check the JIRA page for the notes.

我们有大约LDA的实现脱机讨论。请检查JIRA页的注释。

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 12, 2014

QA tests have finished for PR 1983 at commit 14284f1.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Array[Int])
@witgo

This comment has been minimized.

Copy link
Contributor Author

witgo commented Sep 12, 2014

@mengxr @allwefantasy

The current broadcast-based implementation, especially in the corpus is large, the performance loss is more serious. Next week I will submit a graphx based implementation.
当前的基于broadcast的实现,在语料库特别大的情况下,性能损耗比较严重.下周我会提交一个基于graphx实现.

@witgo
witgo reviewed Sep 12, 2014
View changes
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala Outdated
logInfo("Start Gibbs sampling (Iteration %d/%d)".format(iter, totalIter))
val broadcastModel = data.context.broadcast(topicModel)
val previousCorpus = corpus
corpus = corpus.mapPartitions { docs =>

This comment has been minimized.

Copy link
@witgo

witgo Sep 12, 2014

Author Contributor

@rxin @mengxr
mapPartitions 方法的closure似乎没有正确清理. 序列化后的corpusRDD和序列化后topicModel broadcast 差不多一样大.
mapPartitions method seems to be no correct cleaning. The serialized corpus RDD and serialized topicModel broadcast almost as big.
cat spark.log | grep 'stored as values in memory' =>

14/09/13 00:47:59 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 218.2 KB, free 2.8 GB)
14/09/13 00:48:04 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.1 KB, free 2.8 GB)
14/09/13 00:48:08 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.7 KB, free 2.8 GB)
14/09/13 00:48:20 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.4 KB, free 2.8 GB)
14/09/13 00:48:23 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.6 KB, free 2.8 GB)
14/09/13 00:48:25 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.6 KB, free 2.8 GB)
14/09/13 00:48:25 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.1 KB, free 2.8 GB)
14/09/13 00:48:30 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.9 KB, free 2.8 GB)
14/09/13 00:48:35 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 3.2 KB, free 2.8 GB)
14/09/13 00:48:44 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 68.6 KB, free 2.8 GB)
14/09/13 00:48:45 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 41.7 KB, free 2.8 GB)
14/09/13 00:49:21 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 197.5 MB, free 2.6 GB)
14/09/13 00:49:24 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 197.7 MB, free 2.3 GB)
14/09/13 00:53:25 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 163.9 MB, free 2.1 GB)
14/09/13 00:53:28 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 164.0 MB, free 1878.0 MB)
14/09/13 00:57:34 INFO MemoryStore: Block broadcast_15 stored as values in memory (estimated size 149.7 MB, free 1658.5 MB)
14/09/13 00:57:36 INFO MemoryStore: Block broadcast_16 stored as values in memory (estimated size 150.0 MB, free 1444.0 MB)
14/09/13 01:01:34 INFO MemoryStore: Block broadcast_17 stored as values in memory (estimated size 141.1 MB, free 1238.3 MB)
14/09/13 01:01:36 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 141.2 MB, free 1036.2 MB)
14/09/13 01:05:12 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 134.5 MB, free 840.7 MB)
14/09/13 01:05:14 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 134.7 MB, free 647.8 MB)
14/09/13 01:08:39 INFO MemoryStore: Block broadcast_21 stored as values in memory (estimated size 218.3 KB, free 589.5 MB)
14/09/13 01:08:39 INFO MemoryStore: Block broadcast_22 stored as values in memory (estimated size 218.3 KB, free 589.2 MB)
14/09/13 01:08:40 INFO MemoryStore: Block broadcast_23 stored as values in memory (estimated size 134.6 MB, free 454.6 MB)
14/09/13 01:08:53 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 129.3 MB, free 267.1 MB)
14/09/13 01:08:55 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 129.4 MB, free 82.0 MB)
@witgo witgo force-pushed the witgo:cgs_lda branch Sep 13, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 1983 at commit 9eba369.

  • This patch merges cleanly.
@witgo

This comment has been minimized.

Copy link
Contributor Author

witgo commented Sep 13, 2014

@allwefantasy
I have updated the code, you can try the latest code.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 1983 at commit 9eba369.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Array[Int])
    • class Dummy(object):
@witgo witgo force-pushed the witgo:cgs_lda branch Sep 14, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 14, 2014

QA tests have started for PR 1983 at commit 34082b9.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 14, 2014

QA tests have finished for PR 1983 at commit 34082b9.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Array[Int])
@allwefantasy

This comment has been minimized.

Copy link

allwefantasy commented Sep 15, 2014

@witgo i have saw ur spark configuration for new performance test。 I will try your latest code and test in my data today

@allwefantasy

This comment has been minimized.

Copy link

allwefantasy commented Sep 15, 2014

@witgo i have try ur latest code in my corpus 。 it will not Stuck in broadcasting . However ,some exception are throw。
qq20140915-1

@witgo witgo force-pushed the witgo:cgs_lda branch to c22e8c2 Sep 15, 2014
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 15, 2014

QA tests have started for PR 1983 at commit c22e8c2.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Sep 15, 2014

QA tests have finished for PR 1983 at commit c22e8c2.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Document(docId: Int, content: Array[Int])
@witgo witgo changed the title [WIP][SPARK-1405][MLLIB]Collapsed Gibbs sampling based Latent Dirichlet Allocation [WIP][SPARK-1405][MLLIB]collapsed Gibbs sampling based latent Dirichlet allocation Sep 16, 2014
@mengxr

This comment has been minimized.

Copy link
Contributor

mengxr commented Sep 26, 2014

@witgo Since we are converging on a GraphX-based implementation and distributed representation of the topic model, do you mind closing this PR? Thanks!

@witgo witgo closed this Sep 27, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.