From a933cb7f902815114355fde66d46242227f0f9fd Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 12 Aug 2015 02:15:01 +0530 Subject: [PATCH 1/5] [SPARK-9661] Java compatibility --- .../spark/mllib/clustering/LDAModel.scala | 28 +++++++++++++++++-- .../mllib/optimization/GradientDescent.scala | 17 +++++++++++ .../apache/spark/mllib/stat/Statistics.scala | 12 ++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 5dc637ebdc133..c462fc1a9955a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -26,7 +26,8 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId} import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} @@ -228,6 +229,11 @@ class LocalLDAModel private[clustering] ( docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, vocabSize) + /** Java-friendly version of [[logLikelihood]] */ + def logLikelihood(documents: JavaRDD[(java.lang.Long, Vector)]): Double = { + logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) + } + /** * Calculate an upper bound bound on perplexity. (Lower is better.) * See Equation (16) in original Online LDA paper. @@ -242,6 +248,11 @@ class LocalLDAModel private[clustering] ( -logLikelihood(documents) / corpusTokenCount } + /** Java-friendly version of [[logPerplexity]] */ + def logPerplexity(documents: JavaRDD[(java.lang.Long, Vector)]): Double = { + logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) + } + /** * Estimate the variational likelihood bound of from `documents`: * log p(documents) >= E_q[log p(documents)] - E_q[log q(documents)] @@ -341,8 +352,14 @@ class LocalLDAModel private[clustering] ( } } -} + /** Java-friendly version of [[topicDistributions]] */ + def topicDistributions( + documents: JavaRDD[(java.lang.Long, Vector)]): JavaRDD[(java.lang.Long, Vector)] = { + val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) + distributions.asInstanceOf[JavaRDD[(java.lang.Long, Vector)]] + } +} @Experimental object LocalLDAModel extends Loader[LocalLDAModel] { @@ -657,6 +674,13 @@ class DistributedLDAModel private[clustering] ( } } + /** Java-friendly version of [[topTopicsPerDocument]] */ + def javaTopTopicsPerDocument( + k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])] = { + val topics = topTopicsPerDocument(k) + topics.asInstanceOf[JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])]] + } + // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 8f0d1e4aa010a..20c841a32d976 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -17,11 +17,13 @@ package org.apache.spark.mllib.optimization +import collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV, norm} import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.api.java.JavaRDD import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} @@ -144,6 +146,21 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va */ @DeveloperApi object GradientDescent extends Logging { + /** Java-friendly version of [[runMiniBatchSGD()]] */ + def runMiniBatchSGD( + data: JavaRDD[(java.lang.Double, Vector)], + gradient: Gradient, + updater: Updater, + stepSize: Double, + numIterations: Int, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Vector, + convergenceTol: Double): (Vector, Array[Double]) = { + runMiniBatchSGD(data.rdd.asInstanceOf[RDD[(Double, Vector)]], gradient, updater, stepSize, + numIterations, regParam, miniBatchFraction, initialWeights, convergenceTol) + } + /** * Run stochastic gradient descent (SGD) in parallel using mini batches. * In each iteration, we sample a subset (fraction miniBatchFraction) of the total data diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index f84502919e381..bf7b6ddbf4270 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.stat import scala.annotation.varargs +import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD @@ -212,4 +213,15 @@ object Statistics { : KolmogorovSmirnovTestResult = { KolmogorovSmirnovTest.testOneSample(data, distName, params: _*) } + + /** Java-friendly version of [[kolmogorovSmirnovTest()]] */ + @varargs + def kolmogorovSmirnovTest( + data: JavaRDD[java.lang.Double], + distName: String, + params: java.lang.Double*): KolmogorovSmirnovTestResult = { + val javaParams = params.toSeq.asInstanceOf[Seq[Double]] + KolmogorovSmirnovTest.testOneSample(data.rdd.asInstanceOf[RDD[Double]], + distName, javaParams: _*) + } } From f35799d3ff97ec8accda6576a629c99d838d3e30 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 12 Aug 2015 16:37:11 +0530 Subject: [PATCH 2/5] minor --- .../scala/org/apache/spark/mllib/clustering/LDAModel.scala | 1 - .../org/apache/spark/mllib/optimization/GradientDescent.scala | 1 - .../main/scala/org/apache/spark/mllib/stat/Statistics.scala | 3 +-- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index c462fc1a9955a..03c27fb62ff0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -27,7 +27,6 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId} import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 20c841a32d976..d258371ff3515 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -17,7 +17,6 @@ package org.apache.spark.mllib.optimization -import collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV, norm} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index bf7b6ddbf4270..d4d0a0332a1c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -18,7 +18,6 @@ package org.apache.spark.mllib.stat import scala.annotation.varargs -import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD @@ -220,7 +219,7 @@ object Statistics { data: JavaRDD[java.lang.Double], distName: String, params: java.lang.Double*): KolmogorovSmirnovTestResult = { - val javaParams = params.toSeq.asInstanceOf[Seq[Double]] + val javaParams = params.asInstanceOf[Seq[Double]] KolmogorovSmirnovTest.testOneSample(data.rdd.asInstanceOf[RDD[Double]], distName, javaParams: _*) } From 171379300440b5a5e159f11111980019fda4c740 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 12 Aug 2015 22:49:31 +0530 Subject: [PATCH 3/5] Add tests --- .../spark/mllib/clustering/LDAModel.scala | 8 ++--- .../mllib/optimization/GradientDescent.scala | 4 +-- .../spark/mllib/clustering/JavaLDASuite.java | 31 +++++++++++++++++++ .../spark/mllib/stat/JavaStatisticsSuite.java | 9 ++++++ .../spark/mllib/clustering/LDASuite.scala | 13 ++++++++ 5 files changed, 59 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 03c27fb62ff0e..b86ba71206019 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -229,7 +229,7 @@ class LocalLDAModel private[clustering] ( vocabSize) /** Java-friendly version of [[logLikelihood]] */ - def logLikelihood(documents: JavaRDD[(java.lang.Long, Vector)]): Double = { + def logLikelihood(documents: JavaPairRDD[java.lang.Long, Vector]): Double = { logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } @@ -248,7 +248,7 @@ class LocalLDAModel private[clustering] ( } /** Java-friendly version of [[logPerplexity]] */ - def logPerplexity(documents: JavaRDD[(java.lang.Long, Vector)]): Double = { + def logPerplexity(documents: JavaPairRDD[java.lang.Long, Vector]): Double = { logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } @@ -353,9 +353,9 @@ class LocalLDAModel private[clustering] ( /** Java-friendly version of [[topicDistributions]] */ def topicDistributions( - documents: JavaRDD[(java.lang.Long, Vector)]): JavaRDD[(java.lang.Long, Vector)] = { + documents: JavaPairRDD[java.lang.Long, Vector]): JavaPairRDD[java.lang.Long, Vector] = { val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) - distributions.asInstanceOf[JavaRDD[(java.lang.Long, Vector)]] + JavaPairRDD.fromRDD(distributions.asInstanceOf[RDD[(java.lang.Long, Vector)]]) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index d258371ff3515..6be5d0d502970 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV, norm} import org.apache.spark.annotation.{Experimental, DeveloperApi} -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaRDD, JavaPairRDD} import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} @@ -147,7 +147,7 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va object GradientDescent extends Logging { /** Java-friendly version of [[runMiniBatchSGD()]] */ def runMiniBatchSGD( - data: JavaRDD[(java.lang.Double, Vector)], + data: JavaPairRDD[java.lang.Double, Vector], gradient: Gradient, updater: Updater, stepSize: Double, diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index d272a42c8576f..befdf61e1ac31 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -160,11 +160,42 @@ public void OnlineOptimizerCompatibility() { assertEquals(roundedLocalTopicSummary.length, k); } + @Test + public void logPerplexity(){ + // Blindly copied from LDASuite. + JavaRDD> docs = sc.parallelize(toyData, 2); + assertEquals(toyModel.logPerplexity(JavaPairRDD.fromJavaRDD(docs)), 3.690, 1e-3); + } + + @Test + public void logLikelihood() { + // Blindly copied from LDASuite. + ArrayList> docsSingleWord = new ArrayList>(); + ArrayList> docsRepeatWord = new ArrayList>(); + docsSingleWord.add(new Tuple2( + Long.valueOf(0), Vectors.dense(1.0, 0.0, 0.0, 0.0, 0.0, 0.0))); + docsRepeatWord.add(new Tuple2( + Long.valueOf(0), Vectors.dense(5.0, 0.0, 0.0, 0.0, 0.0, 0.0))); + JavaPairRDD single = JavaPairRDD.fromJavaRDD(sc.parallelize(docsSingleWord)); + JavaPairRDD repeat = JavaPairRDD.fromJavaRDD(sc.parallelize(docsRepeatWord)); + assertEquals(toyModel.logLikelihood(single), -25.971, 1e-3); + assertEquals(toyModel.logLikelihood(repeat), -31.441, 1e-3); + } + + @Test + public void topicDistributions(){ + JavaRDD> docs = sc.parallelize(toyData, 2); + JavaPairRDD pairedDocs = JavaPairRDD.fromJavaRDD(docs); + assertEquals(toyModel.topicDistributions(pairedDocs).count(), pairedDocs.count()); + } + private static int tinyK = LDASuite$.MODULE$.tinyK(); private static int tinyVocabSize = LDASuite$.MODULE$.tinyVocabSize(); private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics(); private static Tuple2[] tinyTopicDescription = LDASuite$.MODULE$.tinyTopicDescription(); private JavaPairRDD corpus; + private LocalLDAModel toyModel = LDASuite$.MODULE$.toyModel(); + private ArrayList> toyData = LDASuite$.MODULE$.javaToyData(); } diff --git a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java index 62f7f26b7c98f..342f57a6ea4a6 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult; public class JavaStatisticsSuite implements Serializable { private transient JavaSparkContext sc; @@ -53,4 +54,12 @@ public void testCorr() { // Check default method assertEquals(corr1, corr2); } + + @Test + public void kolmogorovSmirnovTest() { + JavaRDD data = sc.parallelize(Lists.newArrayList(0.2, 1.0, -1.0, 2.0)); + KolmogorovSmirnovTestResult testResult1 = Statistics.kolmogorovSmirnovTest(data, "norm"); + KolmogorovSmirnovTestResult testResult2 = Statistics.kolmogorovSmirnovTest( + data, "norm", 0.0, 1.0); + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index ce6a8eb8e8c46..926185e90bcf9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import java.util.{ArrayList => JArrayList} + import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax} import org.apache.spark.SparkFunSuite @@ -575,6 +577,17 @@ private[clustering] object LDASuite { Vectors.sparse(6, Array(4, 5), Array(1, 1)) ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + /** Used in the Java Test Suite */ + def javaToyData: JArrayList[(java.lang.Long, Vector)] = { + val javaData = new JArrayList[(java.lang.Long, Vector)] + var i = 0 + while (i < toyData.size) { + javaData.add((toyData(i)._1, toyData(i)._2)) + i += 1 + } + javaData + } + def toyModel: LocalLDAModel = { val k = 2 val vocabSize = 6 From 5fa5f7aafbd547d50eaa17f7f657ba29751babfa Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 13 Aug 2015 13:07:49 +0530 Subject: [PATCH 4/5] removed unnecessary tests --- .../spark/mllib/clustering/LDAModel.scala | 2 +- .../mllib/optimization/GradientDescent.scala | 16 -------- .../apache/spark/mllib/stat/Statistics.scala | 7 +++- .../spark/mllib/clustering/JavaLDASuite.java | 37 ++++++++----------- .../spark/mllib/stat/JavaStatisticsSuite.java | 15 +++++++- 5 files changed, 35 insertions(+), 42 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index b86ba71206019..f31949f13a4cf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -677,7 +677,7 @@ class DistributedLDAModel private[clustering] ( def javaTopTopicsPerDocument( k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])] = { val topics = topTopicsPerDocument(k) - topics.asInstanceOf[JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])]] + topics.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[java.lang.Double])]].toJavaRDD() } // TODO: diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 6be5d0d502970..8f0d1e4aa010a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV, norm} import org.apache.spark.annotation.{Experimental, DeveloperApi} -import org.apache.spark.api.java.{JavaRDD, JavaPairRDD} import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} @@ -145,21 +144,6 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va */ @DeveloperApi object GradientDescent extends Logging { - /** Java-friendly version of [[runMiniBatchSGD()]] */ - def runMiniBatchSGD( - data: JavaPairRDD[java.lang.Double, Vector], - gradient: Gradient, - updater: Updater, - stepSize: Double, - numIterations: Int, - regParam: Double, - miniBatchFraction: Double, - initialWeights: Vector, - convergenceTol: Double): (Vector, Array[Double]) = { - runMiniBatchSGD(data.rdd.asInstanceOf[RDD[(Double, Vector)]], gradient, updater, stepSize, - numIterations, regParam, miniBatchFraction, initialWeights, convergenceTol) - } - /** * Run stochastic gradient descent (SGD) in parallel using mini batches. * In each iteration, we sample a subset (fraction miniBatchFraction) of the total data diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index d4d0a0332a1c8..24fe48cb8f71f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat import scala.annotation.varargs import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaRDD, JavaDoubleRDD} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.regression.LabeledPoint @@ -178,6 +178,9 @@ object Statistics { ChiSqTest.chiSquaredFeatures(data) } + /** Java-friendly version of [[chiSqTest()]] */ + def chiSqTest(data: JavaRDD[LabeledPoint]): Array[ChiSqTestResult] = chiSqTest(data.rdd) + /** * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a * continuous distribution. By comparing the largest difference between the empirical cumulative @@ -216,7 +219,7 @@ object Statistics { /** Java-friendly version of [[kolmogorovSmirnovTest()]] */ @varargs def kolmogorovSmirnovTest( - data: JavaRDD[java.lang.Double], + data: JavaDoubleRDD, distName: String, params: java.lang.Double*): KolmogorovSmirnovTestResult = { val javaParams = params.asInstanceOf[Seq[Double]] diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index befdf61e1ac31..427be9430d820 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -124,6 +124,10 @@ public Boolean call(Tuple2 tuple2) { } }); assertEquals(topicDistributions.count(), nonEmptyCorpus.count()); + + // Check: javaTopTopicsPerDocuments + JavaRDD> topTopics = + model.javaTopTopicsPerDocument(3); } @Test @@ -161,32 +165,21 @@ public void OnlineOptimizerCompatibility() { } @Test - public void logPerplexity(){ - // Blindly copied from LDASuite. + public void localLdaMethods() { JavaRDD> docs = sc.parallelize(toyData, 2); - assertEquals(toyModel.logPerplexity(JavaPairRDD.fromJavaRDD(docs)), 3.690, 1e-3); - } + JavaPairRDD pairedDocs = JavaPairRDD.fromJavaRDD(docs); - @Test - public void logLikelihood() { - // Blindly copied from LDASuite. + // check: topicDistributions + assertEquals(toyModel.topicDistributions(pairedDocs).count(), pairedDocs.count()); + + // check: logPerplexity + double logPerplexity = toyModel.logPerplexity(pairedDocs); + + // check: logLikelihood. ArrayList> docsSingleWord = new ArrayList>(); - ArrayList> docsRepeatWord = new ArrayList>(); - docsSingleWord.add(new Tuple2( - Long.valueOf(0), Vectors.dense(1.0, 0.0, 0.0, 0.0, 0.0, 0.0))); - docsRepeatWord.add(new Tuple2( - Long.valueOf(0), Vectors.dense(5.0, 0.0, 0.0, 0.0, 0.0, 0.0))); + docsSingleWord.add(new Tuple2(Long.valueOf(0), Vectors.dense(1.0, 0.0, 0.0))); JavaPairRDD single = JavaPairRDD.fromJavaRDD(sc.parallelize(docsSingleWord)); - JavaPairRDD repeat = JavaPairRDD.fromJavaRDD(sc.parallelize(docsRepeatWord)); - assertEquals(toyModel.logLikelihood(single), -25.971, 1e-3); - assertEquals(toyModel.logLikelihood(repeat), -31.441, 1e-3); - } - - @Test - public void topicDistributions(){ - JavaRDD> docs = sc.parallelize(toyData, 2); - JavaPairRDD pairedDocs = JavaPairRDD.fromJavaRDD(docs); - assertEquals(toyModel.topicDistributions(pairedDocs).count(), pairedDocs.count()); + double logLikelihood = toyModel.logLikelihood(single); } private static int tinyK = LDASuite$.MODULE$.tinyK(); diff --git a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java index 342f57a6ea4a6..9cd85bca77439 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java @@ -27,7 +27,11 @@ import static org.junit.Assert.assertEquals; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.stat.test.ChiSqTestResult; import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult; public class JavaStatisticsSuite implements Serializable { @@ -57,9 +61,18 @@ public void testCorr() { @Test public void kolmogorovSmirnovTest() { - JavaRDD data = sc.parallelize(Lists.newArrayList(0.2, 1.0, -1.0, 2.0)); + JavaDoubleRDD data = sc.parallelizeDoubles(Lists.newArrayList(0.2, 1.0, -1.0, 2.0)); KolmogorovSmirnovTestResult testResult1 = Statistics.kolmogorovSmirnovTest(data, "norm"); KolmogorovSmirnovTestResult testResult2 = Statistics.kolmogorovSmirnovTest( data, "norm", 0.0, 1.0); } + + @Test + public void chiSqTest() { + JavaRDD data = sc.parallelize(Lists.newArrayList( + new LabeledPoint(0.0, Vectors.dense(0.1, 2.3)), + new LabeledPoint(1.0, Vectors.dense(1.5, 5.1)), + new LabeledPoint(0.0, Vectors.dense(2.4, 8.1)))); + ChiSqTestResult[] testResults = Statistics.chiSqTest(data); + } } From 302cdb16caaa8f6afb95daa0f157d693bc49f004 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 13 Aug 2015 21:47:38 +0530 Subject: [PATCH 5/5] indent --- .../java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java index 9cd85bca77439..eb4e3698624bc 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java @@ -73,6 +73,6 @@ public void chiSqTest() { new LabeledPoint(0.0, Vectors.dense(0.1, 2.3)), new LabeledPoint(1.0, Vectors.dense(1.5, 5.1)), new LabeledPoint(0.0, Vectors.dense(2.4, 8.1)))); - ChiSqTestResult[] testResults = Statistics.chiSqTest(data); + ChiSqTestResult[] testResults = Statistics.chiSqTest(data); } }