From 9f4f9ea39a99627380cc58cd8f7862893b8191f2 Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Sat, 27 Jun 2015 21:15:39 -0700 Subject: [PATCH 1/9] [SPARK-5562][MLlib] LDA should handle empty document. --- .../spark/mllib/clustering/LDAOptimizer.scala | 16 ++++++++++++++- .../spark/mllib/clustering/LDASuite.scala | 20 ++++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 8e5154b902d1d..e5260c4ec6391 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -119,6 +119,18 @@ final class EMLDAOptimizer extends LDAOptimizer { } } + // create term vertices for empty docs + val emptyDocTermVertices: RDD[(VertexId, TopicCounts)] = { + //empty documents + val emptyDocs = docs.filter { case (d: Long, tc: Vector) => + tc.toBreeze.reduce(_+_) != BDV(0.0) + } + emptyDocs.map { case(docID: Long, termCounts: Vector) => + val tc: TopicCounts = BDV.fill[Double](termCounts.size)(0.0) + (docID, tc) + } + } + // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). val docTermVertices: RDD[(VertexId, TopicCounts)] = { @@ -134,8 +146,10 @@ final class EMLDAOptimizer extends LDAOptimizer { verticesTMP.reduceByKey(_ + _) } + // vertices are non empty docs and empty docs + val docAllTermVertices = docTermVertices union emptyDocTermVertices // Partition such that edges are grouped by document - this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) + this.graph = Graph(docAllTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k this.vocabSize = docs.take(1).head._2.size this.checkpointInterval = lda.getCheckpointInterval diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 406affa25539d..639f79f6ee82f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseMatrix => BDM} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} import org.apache.spark.SparkFunSuite import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors} @@ -99,9 +99,15 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { // Check: per-doc topic distributions val topicDistributions = model.topicDistributions.collect() + // Ensure all documents are covered. - assert(topicDistributions.length === tinyCorpus.length) - assert(tinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet) + // SPARK-5562. since the topicDistribution returns the distribution of docsI over topics . + // for empty docs, since the distribution of topic won't sum to 1 (and hence it is not a pdf) + // So the output will not contain the empty docs and hence we modify the unittest to + // compare against nonEmptyTinyCorpus + val nonEmptyTinyCorpus = getNonEmptyDoc(tinyCorpus) + assert(topicDistributions.length === nonEmptyTinyCorpus.length) + assert(nonEmptyTinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet) // Ensure we have proper distributions topicDistributions.foreach { case (docId, topicDistribution) => assert(topicDistribution.size === tinyK) @@ -232,12 +238,20 @@ private[clustering] object LDASuite { } def tinyCorpus: Array[(Long, Vector)] = Array( + Vectors.dense(0, 0, 0, 0, 0), // empty doc Vectors.dense(1, 3, 0, 2, 8), Vectors.dense(0, 2, 1, 0, 4), Vectors.dense(2, 3, 12, 3, 1), + Vectors.dense(0, 0, 0, 0, 0), // empty doc Vectors.dense(0, 3, 1, 9, 8), Vectors.dense(1, 1, 4, 2, 6) ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } assert(tinyCorpus.forall(_._2.size == tinyVocabSize)) // sanity check for test data + def getNonEmptyDoc(corpus:Array[(Long, Vector)]): Array[(Long, Vector)] = { + corpus.filter { case (docId: Long, wc: Vector) => + wc.toBreeze.reduce(_+_) != BDV(0.0) + } + + } } From ab55fbf72e37369e78c0ca8edbb0cf3e33658319 Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Sun, 28 Jun 2015 00:42:19 -0700 Subject: [PATCH 2/9] [SPARK-5562][MLlib] Change as per Sean Owen's comments https://github.com/apache/spark/pull/7064/files#diff-9236d23975e6f5a5608ffc81dfd79146 --- .../spark/mllib/clustering/LDAOptimizer.scala | 20 ++++++++----------- .../spark/mllib/clustering/LDASuite.scala | 9 +++------ 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index e5260c4ec6391..d767cc2501364 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector} +import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD /** @@ -120,16 +120,12 @@ final class EMLDAOptimizer extends LDAOptimizer { } // create term vertices for empty docs - val emptyDocTermVertices: RDD[(VertexId, TopicCounts)] = { - //empty documents - val emptyDocs = docs.filter { case (d: Long, tc: Vector) => - tc.toBreeze.reduce(_+_) != BDV(0.0) + val emptyDocTermVertices: RDD[(VertexId, TopicCounts)] = + docs.filter { case (_, termCounts: Vector) => + Vectors.norm(termCounts, p = 1.0) == 0.0 // empty docs has all zero termCounts + }.mapValues { case(termCounts: Vector) => + Vectors.zeros(termCounts.size).toBreeze.asInstanceOf[TopicCounts] } - emptyDocs.map { case(docID: Long, termCounts: Vector) => - val tc: TopicCounts = BDV.fill[Double](termCounts.size)(0.0) - (docID, tc) - } - } // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). @@ -146,8 +142,8 @@ final class EMLDAOptimizer extends LDAOptimizer { verticesTMP.reduceByKey(_ + _) } - // vertices are non empty docs and empty docs - val docAllTermVertices = docTermVertices union emptyDocTermVertices + // doc vertices are non empty docs and empty docs + val docAllTermVertices = docTermVertices.union(emptyDocTermVertices) // Partition such that edges are grouped by document this.graph = Graph(docAllTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 639f79f6ee82f..920109728de57 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} +import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.SparkFunSuite import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors} @@ -248,10 +248,7 @@ private[clustering] object LDASuite { ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } assert(tinyCorpus.forall(_._2.size == tinyVocabSize)) // sanity check for test data - def getNonEmptyDoc(corpus:Array[(Long, Vector)]): Array[(Long, Vector)] = { - corpus.filter { case (docId: Long, wc: Vector) => - wc.toBreeze.reduce(_+_) != BDV(0.0) - } - + def getNonEmptyDoc(corpus:Array[(Long, Vector)]): Array[(Long, Vector)] = corpus.filter { + case (_, wc: Vector) => Vectors.norm(wc, p = 1.0) != 0.0 } } From 2572a082b5f5ad3e4fe181e7f585e104c3a54992 Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Sun, 28 Jun 2015 00:54:50 -0700 Subject: [PATCH 3/9] [SPARK-5562][MLlib] change the import xyz._ to the import xyz.{c1, c2} .. --- .../scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index d767cc2501364..5b9f7312b8d2d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -27,7 +27,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector} +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD /** From c710cb68953bd6a659c8b7abdb0ebd7fde1ac57b Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Tue, 30 Jun 2015 19:44:42 -0700 Subject: [PATCH 4/9] fix the scala code style to have space after : --- .../test/scala/org/apache/spark/mllib/clustering/LDASuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 920109728de57..663aa1555d489 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -248,7 +248,7 @@ private[clustering] object LDASuite { ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } assert(tinyCorpus.forall(_._2.size == tinyVocabSize)) // sanity check for test data - def getNonEmptyDoc(corpus:Array[(Long, Vector)]): Array[(Long, Vector)] = corpus.filter { + def getNonEmptyDoc(corpus: Array[(Long, Vector)]): Array[(Long, Vector)] = corpus.filter { case (_, wc: Vector) => Vectors.norm(wc, p = 1.0) != 0.0 } } From 7c06251ecf72cd7c1b839f6fb4fc890839fd372f Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Wed, 1 Jul 2015 13:46:53 -0700 Subject: [PATCH 5/9] [SPARK-5562][MLlib] modified the JavaLDASuite for test passing --- .../spark/mllib/clustering/JavaLDASuite.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 581c033f08ebe..289305fd06a13 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -33,7 +33,8 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; - +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.linalg.Vectors; public class JavaLDASuite implements Serializable { private transient JavaSparkContext sc; @@ -110,7 +111,18 @@ public void distributedLDAModel() { // Check: topic distributions JavaPairRDD topicDistributions = model.javaTopicDistributions(); - assertEquals(topicDistributions.count(), corpus.count()); + // SPARK-5562. since the topicDistribution returns the distribution of docsI over topics . + // for empty docs, since the distribution of topic won't sum to 1 (and hence it is not a pdf) + // So the output will not contain the empty docs and hence we modify the unittest to + // compare against nonEmptyCorpus + JavaPairRDD nonEmptyCorpus = corpus.filter( + new Function, Boolean>() { + public Boolean call(Tuple2 tuple2) { + return Vectors.norm(tuple2._2(), 1.0) != 0.0; + } + + }); + assertEquals(topicDistributions.count(), nonEmptyCorpus.count()); } @Test From b271c8a5f5937aca791379cab3d6fab59225b615 Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Wed, 1 Jul 2015 17:35:56 -0700 Subject: [PATCH 6/9] [SPARK-5562][Mllib] As per github discussion with jkbradley. We would like to simply things. Here is the discussion text topicDistributions does not include the empty documents because those vertices are dropped during learning. Look at this line: [https://github.com/apache/spark/blob/1ce6428907b4ddcf52dbf0c86196d82ab7392442/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala#L186] I'd propose 1 of these 2 options: (1) We simplify things by simply making sure all empty documents are dropped before learning (tested in a unit test). We should also add documentation in the Scala doc to make it very clear. The downside is that users who want to join the topicDistributions with the original data will need to filter the original data---but I feel like they should anyways. (2) Alternatively, we could add the empty document vertices in at the end when creating the DistributedLDAModel. This might be easier than modifying learning itself. I'd prefer option (1). What do you think? --- docs/mllib-clustering.md | 3 ++- .../apache/spark/mllib/clustering/LDAOptimizer.scala | 12 +----------- .../apache/spark/mllib/clustering/JavaLDASuite.java | 6 ++---- .../org/apache/spark/mllib/clustering/LDASuite.scala | 6 ++---- 4 files changed, 7 insertions(+), 20 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index dcaa3784be874..677955515047c 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -401,7 +401,8 @@ It supports different inference algorithms via `setOptimizer` function. EMLDAOpt on the likelihood function and yields comprehensive results, while OnlineLDAOptimizer uses iterative mini-batch sampling for [online variational inference](https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) and is generally memory friendly. After fitting on the documents, LDA provides: * Topics: Inferred topics, each of which is a probability distribution over terms (words). -* Topic distributions for documents: For each document in the training set, LDA gives a probability distribution over topics. (EM only) +* Topic distributions for documents: For each non empty document in the training set, LDA gives a probability distribution over topics. (EM only). Note that for empty documents, we don't +create the topic distributions. (EM only) LDA takes the following parameters: diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 5b9f7312b8d2d..42b4841c9f58e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -28,7 +28,6 @@ import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector} -import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD /** @@ -120,14 +119,6 @@ final class EMLDAOptimizer extends LDAOptimizer { } } - // create term vertices for empty docs - val emptyDocTermVertices: RDD[(VertexId, TopicCounts)] = - docs.filter { case (_, termCounts: Vector) => - Vectors.norm(termCounts, p = 1.0) == 0.0 // empty docs has all zero termCounts - }.mapValues { case(termCounts: Vector) => - Vectors.zeros(termCounts.size).toBreeze.asInstanceOf[TopicCounts] - } - // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). val docTermVertices: RDD[(VertexId, TopicCounts)] = { @@ -143,8 +134,7 @@ final class EMLDAOptimizer extends LDAOptimizer { verticesTMP.reduceByKey(_ + _) } - // doc vertices are non empty docs and empty docs - val docAllTermVertices = docTermVertices.union(emptyDocTermVertices) + val docAllTermVertices = docTermVertices // Partition such that edges are grouped by document this.graph = Graph(docAllTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 289305fd06a13..3dc63acdc514b 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -111,10 +111,8 @@ public void distributedLDAModel() { // Check: topic distributions JavaPairRDD topicDistributions = model.javaTopicDistributions(); - // SPARK-5562. since the topicDistribution returns the distribution of docsI over topics . - // for empty docs, since the distribution of topic won't sum to 1 (and hence it is not a pdf) - // So the output will not contain the empty docs and hence we modify the unittest to - // compare against nonEmptyCorpus + // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs + // over topics. Compare it against nonEmptyCorpus instead of corpus JavaPairRDD nonEmptyCorpus = corpus.filter( new Function, Boolean>() { public Boolean call(Tuple2 tuple2) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 663aa1555d489..03a8a2538b464 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -101,10 +101,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { val topicDistributions = model.topicDistributions.collect() // Ensure all documents are covered. - // SPARK-5562. since the topicDistribution returns the distribution of docsI over topics . - // for empty docs, since the distribution of topic won't sum to 1 (and hence it is not a pdf) - // So the output will not contain the empty docs and hence we modify the unittest to - // compare against nonEmptyTinyCorpus + // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs + // over topics. Compare it against nonEmptyTinyCorpus instead of tinyCorpus val nonEmptyTinyCorpus = getNonEmptyDoc(tinyCorpus) assert(topicDistributions.length === nonEmptyTinyCorpus.length) assert(nonEmptyTinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet) From c01311b9f6839caa8386294456abb075385553eb Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Wed, 1 Jul 2015 17:39:41 -0700 Subject: [PATCH 7/9] [SPARK-5562][MLlib] fix the newline typo --- docs/mllib-clustering.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 677955515047c..f1203ee7149fe 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -401,8 +401,7 @@ It supports different inference algorithms via `setOptimizer` function. EMLDAOpt on the likelihood function and yields comprehensive results, while OnlineLDAOptimizer uses iterative mini-batch sampling for [online variational inference](https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) and is generally memory friendly. After fitting on the documents, LDA provides: * Topics: Inferred topics, each of which is a probability distribution over terms (words). -* Topic distributions for documents: For each non empty document in the training set, LDA gives a probability distribution over topics. (EM only). Note that for empty documents, we don't -create the topic distributions. (EM only) +* Topic distributions for documents: For each non empty document in the training set, LDA gives a probability distribution over topics. (EM only). Note that for empty documents, we don't create the topic distributions. (EM only) LDA takes the following parameters: From be48491b51dc5738ff666d1fdd4b1527998946cb Mon Sep 17 00:00:00 2001 From: Alok Singh Date: Thu, 2 Jul 2015 13:03:51 -0700 Subject: [PATCH 8/9] [SPARK-5562][MLlib] re-order import in alphabhetical order --- .../java/org/apache/spark/mllib/clustering/JavaLDASuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 3dc63acdc514b..3ecb372b7c2f0 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -28,12 +28,12 @@ import org.junit.Before; import org.junit.Test; +import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vectors; public class JavaLDASuite implements Serializable { From 259a0a7a612004a6a61128ee7611314becb0f0a4 Mon Sep 17 00:00:00 2001 From: Alok Singh <“singhal@us.ibm.com”> Date: Thu, 2 Jul 2015 21:20:42 -0700 Subject: [PATCH 9/9] change as per the comments by @jkbradley --- .../org/apache/spark/mllib/clustering/LDAOptimizer.scala | 3 +-- .../org/apache/spark/mllib/clustering/JavaLDASuite.java | 9 ++++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 42b4841c9f58e..8e5154b902d1d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -134,9 +134,8 @@ final class EMLDAOptimizer extends LDAOptimizer { verticesTMP.reduceByKey(_ + _) } - val docAllTermVertices = docTermVertices // Partition such that edges are grouped by document - this.graph = Graph(docAllTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) + this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k this.vocabSize = docs.take(1).head._2.size this.checkpointInterval = lda.getCheckpointInterval diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 3ecb372b7c2f0..b48f190f599a2 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -114,11 +114,10 @@ public void distributedLDAModel() { // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs // over topics. Compare it against nonEmptyCorpus instead of corpus JavaPairRDD nonEmptyCorpus = corpus.filter( - new Function, Boolean>() { - public Boolean call(Tuple2 tuple2) { - return Vectors.norm(tuple2._2(), 1.0) != 0.0; - } - + new Function, Boolean>() { + public Boolean call(Tuple2 tuple2) { + return Vectors.norm(tuple2._2(), 1.0) != 0.0; + } }); assertEquals(topicDistributions.count(), nonEmptyCorpus.count()); }