From d4284fa14bdfd9d7f15e98d4600de2e2f7d822d2 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 8 Jul 2015 18:31:24 -0700 Subject: [PATCH 1/5] Generalize OnlineLDA to asymmetric priors, no tests --- .../apache/spark/mllib/clustering/LDA.scala | 48 ++++++++++++++----- .../spark/mllib/clustering/LDAOptimizer.scala | 21 +++++--- .../spark/mllib/clustering/LDASuite.scala | 6 +-- 3 files changed, 53 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index a410547a72fda..2767504a06b0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -23,7 +23,7 @@ import org.apache.spark.Logging import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -49,12 +49,24 @@ import org.apache.spark.util.Utils class LDA private ( private var k: Int, private var maxIterations: Int, - private var docConcentration: Double, + private var docConcentration: Vector, private var topicConcentration: Double, private var seed: Long, private var checkpointInterval: Int, private var ldaOptimizer: LDAOptimizer) extends Logging { + def this( + k: Int, + maxIterations: Int, + docConcentration: Double, + topicConcentration: Double, + seed: Long, + checkpointInterval: Int, + ldaOptimizer: LDAOptimizer) = { + this(k, maxIterations, Vectors.dense(Array.fill(k)(docConcentration)), topicConcentration, + seed, checkpointInterval, ldaOptimizer) + } + def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer) @@ -77,37 +89,47 @@ class LDA private ( * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * - * This is the parameter to a symmetric Dirichlet distribution. + * This is the parameter to a Dirichlet distribution. */ - def getDocConcentration: Double = this.docConcentration + def getDocConcentration: Vector = this.docConcentration /** * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * - * This is the parameter to a symmetric Dirichlet distribution, where larger values - * mean more smoothing (more regularization). + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). * - * If set to -1, then docConcentration is set automatically. - * (default = -1 = automatic) + * If set to a vector of -1, then docConcentration is set automatically. + * (default = a vector of -1 = automatic) * * Optimizer-specific parameter settings: * - EM + * - Currently only supports symmetric distributions, so values in the vector must be the same * - Value should be > 1.0 * - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows * Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - Online - * - Value should be >= 0 - * - default = (1.0 / k), following the implementation from + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. */ - def setDocConcentration(docConcentration: Double): this.type = { + def setDocConcentration(docConcentration: Vector): this.type = { this.docConcentration = docConcentration this } + /** Replicates Double to create a symmetric prior */ + def setDocConcentration(docConcentration: Double): this.type = { + this.docConcentration = Vectors.dense(Array.fill(k)(docConcentration)) + this + } + /** Alias for [[getDocConcentration]] */ - def getAlpha: Double = getDocConcentration + def getAlpha: Vector = getDocConcentration + + /** Alias for [[setDocConcentration()]] */ + def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha) /** Alias for [[setDocConcentration()]] */ def setAlpha(alpha: Double): this.type = setDocConcentration(alpha) @@ -242,6 +264,8 @@ class LDA private ( * @return Inferred LDA model */ def run(documents: RDD[(Long, Vector)]): LDAModel = { + require(docConcentration.size == this.k, "Dimension of alpha and k must be equal.") + val state = ldaOptimizer.initialize(documents, this) var iter = 0 val iterationTimes = Array.fill[Double](maxIterations)(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 8e5154b902d1d..8178e5a9efd90 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector} +import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD /** @@ -95,8 +95,11 @@ final class EMLDAOptimizer extends LDAOptimizer { * Compute bipartite term/doc graph. */ override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + val docConcentration = breeze.stats.mean(lda.getDocConcentration.toBreeze) + require({ + lda.getDocConcentration.toArray.forall(x => math.abs(x - docConcentration) < 1E-3) + }, "EMLDAOptimizer currently only supports symmetric document-topic priors") - val docConcentration = lda.getDocConcentration val topicConcentration = lda.getTopicConcentration val k = lda.getK @@ -229,10 +232,10 @@ final class OnlineLDAOptimizer extends LDAOptimizer { private var vocabSize: Int = 0 /** alias for docConcentration */ - private var alpha: Double = 0 + private var alpha: Vector = Vectors.dense(0) /** (private[clustering] for debugging) Get docConcentration */ - private[clustering] def getAlpha: Double = alpha + private[clustering] def getAlpha: Vector = alpha /** alias for topicConcentration */ private var eta: Double = 0 @@ -343,7 +346,11 @@ final class OnlineLDAOptimizer extends LDAOptimizer { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size - this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration + this.alpha = if (lda.getDocConcentration.toArray.forall(_ == -1.0)) { + Vectors.dense(Array.fill(k)(1.0 / k)) + } else { + lda.getDocConcentration + } this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration this.randomGenerator = new Random(lda.getSeed) @@ -372,7 +379,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val vocabSize = this.vocabSize val Elogbeta = dirichletExpectation(lambda) val expElogbeta = exp(Elogbeta) - val alpha = this.alpha + val alpha = this.alpha.toBreeze val gammaShape = this.gammaShape val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => @@ -400,7 +407,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { while (meanchange > 1e-3) { val lastgamma = gammad // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha + gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) :+ alpha.t Elogthetad = digamma(gammad) - digamma(sum(gammad)) expElogthetad = exp(Elogthetad) phinorm = expElogthetad * expElogbetad + 1e-100 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 721a065658951..3ac6e8c40fcfa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -132,8 +132,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { test("setter alias") { val lda = new LDA().setAlpha(2.0).setBeta(3.0) - assert(lda.getAlpha === 2.0) - assert(lda.getDocConcentration === 2.0) + assert(lda.getAlpha.toArray.forall(_ === 2.0)) + assert(lda.getDocConcentration.toArray.forall(_ === 2.0)) assert(lda.getBeta === 3.0) assert(lda.getTopicConcentration === 3.0) } @@ -143,7 +143,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { val corpus = sc.parallelize(tinyCorpus, 2) val op = new OnlineLDAOptimizer().initialize(corpus, lda) op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau0(567) - assert(op.getAlpha == 0.5) // default 1.0 / k + assert(op.getAlpha.toArray.forall(_ == 0.5)) // default 1.0 / k assert(op.getEta == 0.5) // default 1.0 / k assert(op.getKappa == 0.9876) assert(op.getMiniBatchFraction == 0.123) From 72038ff88e7aad5798cae815530ec4e6217938c2 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 9 Jul 2015 16:04:01 -0700 Subject: [PATCH 2/5] Add tests referenced against gensim --- .../apache/spark/mllib/clustering/LDA.scala | 1 + .../spark/mllib/clustering/LDASuite.scala | 50 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 2767504a06b0e..0a234e5b92c61 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -115,6 +115,7 @@ class LDA private ( * [[https://github.com/Blei-Lab/onlineldavb]]. */ def setDocConcentration(docConcentration: Vector): this.type = { + docConcentration.toArray.iterator this.docConcentration = docConcentration this } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 3ac6e8c40fcfa..295060d017dc9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -218,6 +218,56 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { } } + test("OnlineLDAOptimizer with asymmetric prior") { + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(0, 1), Array(1, 1)), + Vectors.sparse(6, Array(1, 2), Array(1, 1)), + Vectors.sparse(6, Array(0, 2), Array(1, 1)), + Vectors.sparse(6, Array(3, 4), Array(1, 1)), + Vectors.sparse(6, Array(3, 5), Array(1, 1)), + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + + val docs = sc.parallelize(toydata) + val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51) + .setGammaShape(1e10) + val lda = new LDA().setK(2) + .setDocConcentration(Vectors.dense(0.00001, 0.1)) + .setTopicConcentration(0.01) + .setMaxIterations(100) + .setOptimizer(op) + .setSeed(12345) + + val ldaModel = lda.run(docs) + val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) + val topics = topicIndices.map { case (terms, termWeights) => + terms.zip(termWeights) + } + + /* Verify results with Python: + + import numpy as np + from gensim import models + corpus = [ + [(0, 1.0), (1, 1.0)], + [(1, 1.0), (2, 1.0)], + [(0, 1.0), (2, 1.0)], + [(3, 1.0), (4, 1.0)], + [(3, 1.0), (5, 1.0)], + [(4, 1.0), (5, 1.0)]] + np.random.seed(10) + lda = models.ldamodel.LdaModel( + corpus=corpus, alpha=np.array([0.00001, 0.1]), num_topics=2, update_every=0, passes=100) + lda.print_topics() + + > ['0.167*0 + 0.167*1 + 0.167*2 + 0.167*3 + 0.167*4 + 0.167*5', + '0.167*0 + 0.167*1 + 0.167*2 + 0.167*4 + 0.167*3 + 0.167*5'] + */ + topics.foreach { topic => + assert(topic.forall { case (_, p) => p ~= 0.167 absTol 0.05 }) + } + } + test("model save/load") { // Test for LocalLDAModel. val localModel = new LocalLDAModel(tinyTopics) From a6dcf706ac6f43b746d55e369bae098a66ad414e Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 9 Jul 2015 19:31:15 -0700 Subject: [PATCH 3/5] Change docConcentration interface and move LDAOptimizer validation to initialize, add sad path tests --- .../apache/spark/mllib/clustering/LDA.scala | 40 +++++++------------ .../spark/mllib/clustering/LDAOptimizer.scala | 18 ++++++--- .../spark/mllib/clustering/LDASuite.scala | 28 ++++++++++--- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 0a234e5b92c61..ab124e6d77c5e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -23,11 +23,10 @@ import org.apache.spark.Logging import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ -import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils - /** * :: Experimental :: * @@ -55,20 +54,9 @@ class LDA private ( private var checkpointInterval: Int, private var ldaOptimizer: LDAOptimizer) extends Logging { - def this( - k: Int, - maxIterations: Int, - docConcentration: Double, - topicConcentration: Double, - seed: Long, - checkpointInterval: Int, - ldaOptimizer: LDAOptimizer) = { - this(k, maxIterations, Vectors.dense(Array.fill(k)(docConcentration)), topicConcentration, - seed, checkpointInterval, ldaOptimizer) - } - - def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1, - seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer) + def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1), + topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10, + ldaOptimizer = new EMLDAOptimizer) /** * Number of topics to infer. I.e., the number of soft cluster centers. @@ -100,29 +88,31 @@ class LDA private ( * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing * (more regularization). * - * If set to a vector of -1, then docConcentration is set automatically. - * (default = a vector of -1 = automatic) + * If set to a singleton vector Vector(-1), then docConcentration is set automatically. If set to + * singleton vector Vector(t) where t != -1, then t is replicated to a vector of length k during + * [[LDAOptimizer.initialize()]]. Otherwise, the [[docConcentration]] vector must be length k. + * (default = Vector(-1) = automatic) * * Optimizer-specific parameter settings: * - EM - * - Currently only supports symmetric distributions, so values in the vector must be the same - * - Value should be > 1.0 - * - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows - * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - Online * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. */ def setDocConcentration(docConcentration: Vector): this.type = { - docConcentration.toArray.iterator this.docConcentration = docConcentration this } /** Replicates Double to create a symmetric prior */ def setDocConcentration(docConcentration: Double): this.type = { - this.docConcentration = Vectors.dense(Array.fill(k)(docConcentration)) + this.docConcentration = Vectors.dense(docConcentration) this } @@ -265,8 +255,6 @@ class LDA private ( * @return Inferred LDA model */ def run(documents: RDD[(Long, Vector)]): LDAModel = { - require(docConcentration.size == this.k, "Dimension of alpha and k must be equal.") - val state = ldaOptimizer.initialize(documents, this) var iter = 0 val iterationTimes = Array.fill[Double](maxIterations)(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 8178e5a9efd90..bc20f75133509 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -19,8 +19,8 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize, kron} -import breeze.numerics.{digamma, exp, abs} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum} +import breeze.numerics.{abs, digamma, exp} import breeze.stats.distributions.{Gamma, RandBasis} import org.apache.spark.annotation.DeveloperApi @@ -97,7 +97,7 @@ final class EMLDAOptimizer extends LDAOptimizer { override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { val docConcentration = breeze.stats.mean(lda.getDocConcentration.toBreeze) require({ - lda.getDocConcentration.toArray.forall(x => math.abs(x - docConcentration) < 1E-3) + lda.getDocConcentration.toArray.forall(_ == docConcentration) }, "EMLDAOptimizer currently only supports symmetric document-topic priors") val topicConcentration = lda.getTopicConcentration @@ -346,9 +346,17 @@ final class OnlineLDAOptimizer extends LDAOptimizer { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size - this.alpha = if (lda.getDocConcentration.toArray.forall(_ == -1.0)) { - Vectors.dense(Array.fill(k)(1.0 / k)) + this.alpha = if (lda.getDocConcentration.size == 1) { + if (lda.getDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k)) + else { + require(lda.getDocConcentration(0) >= 0, "all entries in alpha must be >=0") + Vectors.dense(Array.fill(k)(lda.getDocConcentration(0))) + } } else { + require(lda.getDocConcentration.size == k, "alpha must have length k") + lda.getDocConcentration.foreachActive { case (_, x) => + require(x >= 0, "all entries in alpha must be >= 0") + } lda.getDocConcentration } this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 295060d017dc9..c6f6d2a73c4b0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils @@ -138,16 +138,32 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { assert(lda.getTopicConcentration === 3.0) } + test("initializing with alpha length != k or 1 fails") { + intercept[IllegalArgumentException] { + val lda = new LDA().setK(2).setAlpha(Vectors.dense(1, 2, 3, 4)) + val corpus = sc.parallelize(tinyCorpus, 2) + lda.run(corpus) + } + } + + test("initializing with elements in alpha < 0 fails") { + intercept[IllegalArgumentException] { + val lda = new LDA().setK(2).setAlpha(Vectors.dense(-1, 2, 3, 4)) + val corpus = sc.parallelize(tinyCorpus, 2) + lda.run(corpus) + } + } + test("OnlineLDAOptimizer initialization") { val lda = new LDA().setK(2) val corpus = sc.parallelize(tinyCorpus, 2) val op = new OnlineLDAOptimizer().initialize(corpus, lda) op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau0(567) - assert(op.getAlpha.toArray.forall(_ == 0.5)) // default 1.0 / k - assert(op.getEta == 0.5) // default 1.0 / k - assert(op.getKappa == 0.9876) - assert(op.getMiniBatchFraction == 0.123) - assert(op.getTau0 == 567) + assert(op.getAlpha.toArray.forall(_ === 0.5)) // default 1.0 / k + assert(op.getEta === 0.5) // default 1.0 / k + assert(op.getKappa === 0.9876) + assert(op.getMiniBatchFraction === 0.123) + assert(op.getTau0 === 567) } test("OnlineLDAOptimizer one iteration") { From 58f1d7b6f2632e00451cf4becf2486cbc9650d03 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 21 Jul 2015 11:16:31 -0700 Subject: [PATCH 4/5] Fix from review feedback --- .../org/apache/spark/mllib/clustering/LDAOptimizer.scala | 8 ++++---- .../org/apache/spark/mllib/clustering/LDASuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index bc20f75133509..b2f3a2538d0b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -95,7 +95,7 @@ final class EMLDAOptimizer extends LDAOptimizer { * Compute bipartite term/doc graph. */ override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { - val docConcentration = breeze.stats.mean(lda.getDocConcentration.toBreeze) + val docConcentration = lda.getDocConcentration(0) require({ lda.getDocConcentration.toArray.forall(_ == docConcentration) }, "EMLDAOptimizer currently only supports symmetric document-topic priors") @@ -349,13 +349,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { this.alpha = if (lda.getDocConcentration.size == 1) { if (lda.getDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k)) else { - require(lda.getDocConcentration(0) >= 0, "all entries in alpha must be >=0") + require(lda.getDocConcentration(0) >= 0, s"all entries in alpha must be >=0, got: $alpha") Vectors.dense(Array.fill(k)(lda.getDocConcentration(0))) } } else { - require(lda.getDocConcentration.size == k, "alpha must have length k") + require(lda.getDocConcentration.size == k, s"alpha must have length k, got: $alpha") lda.getDocConcentration.foreachActive { case (_, x) => - require(x >= 0, "all entries in alpha must be >= 0") + require(x >= 0, s"all entries in alpha must be >= 0, got: $alpha") } lda.getDocConcentration } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index c6f6d2a73c4b0..da70d9bd7c790 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -148,7 +148,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { test("initializing with elements in alpha < 0 fails") { intercept[IllegalArgumentException] { - val lda = new LDA().setK(2).setAlpha(Vectors.dense(-1, 2, 3, 4)) + val lda = new LDA().setK(4).setAlpha(Vectors.dense(-1, 2, 3, 4)) val corpus = sc.parallelize(tinyCorpus, 2) lda.run(corpus) } From af8fbb71bee49526ade85ec52076f726d7b45ad1 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 22 Jul 2015 14:18:06 -0700 Subject: [PATCH 5/5] Fix merge errors --- .../scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 35486e2c6b3d4..f4170a3d98dd8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector} +import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors} import org.apache.spark.rdd.RDD /**