From d26777eb02faf36ea237a4ad2e81641e6f3693c8 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 1 Sep 2017 22:17:14 +0800 Subject: [PATCH 1/7] init pr --- .../spark/ml/stat/KolmogorovSmirnovTest.scala | 104 ++++++++++++++ .../ml/stat/KolmogorovSmirnovTestSuite.scala | 133 ++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala new file mode 100644 index 0000000000000..62caae8211aa3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import scala.annotation.varargs + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.col + +/** + * :: Experimental :: + * + * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: + * @see + * Kolmogorov-Smirnov test (Wikipedia) + */ +@Experimental +@Since("2.3.0") +object KolmogorovSmirnovTest { + + /** Used to construct output schema of test */ + private case class KolmogorovSmirnovTestResult( + pValues: Double, + statistics: Double) + + private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = { + SchemaUtils.checkNumericType(dataset.schema, sampleCol) + dataset.select(col(sampleCol).cast("double")).rdd.map { + case Row(sample: Double) => sample + } + } + + /** + * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * + * @param dataset a `DataFrame` containing the sample of data to test + * @param sampleCol Name of sample column in dataset, of any numerical type + * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value + * @return DataFrame containing the test result for every feature against the label. + * This DataFrame will contain a single Row with the following fields: + * - `pValue: Double` + * - `statistic: Double` + */ + @Since("2.3.0") + def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): DataFrame = { + val spark = dataset.sparkSession + + val rdd = getSampleRDD(dataset, sampleCol) + val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, cdf) + spark.createDataFrame(Seq(KolmogorovSmirnovTestResult( + testResult.pValue, testResult.statistic))) + } + + /** + * Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability + * distribution equality. Currently supports the normal distribution, taking as parameters + * the mean and standard deviation. + * + * @param dataset a `DataFrame` containing the sample of data to test + * @param sampleCol Name of sample column in dataset, of any numerical type + * @param distName a `String` name for a theoretical distribution, currently only support "norm". + * @param params `Double*` specifying the parameters to be used for the theoretical distribution + * @return DataFrame containing the test result for every feature against the label. + * This DataFrame will contain a single Row with the following fields: + * - `pValue: Double` + * - `statistic: Double` + */ + @Since("2.3.0") + @varargs + def test(dataset: DataFrame, sampleCol: String, distName: String, params: Double*): DataFrame = { + val spark = dataset.sparkSession + + val rdd = getSampleRDD(dataset, sampleCol) + val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, distName, params: _*) + spark.createDataFrame(Seq(KolmogorovSmirnovTestResult( + testResult.pValue, testResult.statistic))) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala new file mode 100644 index 0000000000000..4ef44c950559f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution} +import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => Math3KSTest} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row + +class KolmogorovSmirnovTestSuite + extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + + import testImplicits._ + + test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") { + // Create theoretical distributions + val stdNormalDist = new NormalDistribution(0, 1) + val expDist = new ExponentialDistribution(0.6) + + // set seeds + val seed = 10L + stdNormalDist.reseedRandomGenerator(seed) + expDist.reseedRandomGenerator(seed) + + // Sample data from the distributions and parallelize it + val n = 100000 + val sampledNormArray = stdNormalDist.sample(n) + val sampledNormDF = sc.parallelize(sampledNormArray, 10).toDF("sample") + val sampledExpArray = expDist.sample(n) + val sampledExpDF = sc.parallelize(sampledExpArray, 10).toDF("sample") + + // Use a apache math commons local KS test to verify calculations + val ksTest = new Math3KSTest() + val pThreshold = 0.05 + + // Comparing a standard normal sample to a standard normal distribution + val Row(pValue1: Double, statistic1: Double) = KolmogorovSmirnovTest + .test(sampledNormDF, "sample", "norm", 0.0, 1.0).head() + val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNormArray) + val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n) + // Verify vs apache math commons ks test + assert(statistic1 ~== referenceStat1 relTol 1e-4) + assert(pValue1 ~== referencePVal1 relTol 1e-4) + // Cannot reject null hypothesis + assert(pValue1 > pThreshold) + + // Comparing an exponential sample to a standard normal distribution + val Row(pValue2: Double, statistic2: Double) = KolmogorovSmirnovTest + .test(sampledExpDF, "sample", "norm", 0, 1).head() + val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExpArray) + val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n) + // verify vs apache math commons ks test + assert(statistic2 ~== referenceStat2 relTol 1e-4) + assert(pValue2 ~== referencePVal2 relTol 1e-4) + // reject null hypothesis + assert(pValue2 < pThreshold) + + // Testing the use of a user provided CDF function + // Distribution is not serializable, so will have to create in the lambda + val expCDF = (x: Double) => new ExponentialDistribution(0.2).cumulativeProbability(x) + + // Comparing an exponential sample with mean X to an exponential distribution with mean Y + // Where X != Y + val Row(pValue3: Double, statistic3: Double) = KolmogorovSmirnovTest + .test(sampledExpDF, "sample", expCDF).head() + val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2), + sampledExpArray) + val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledExpArray.length) + // verify vs apache math commons ks test + assert(statistic3 ~== referenceStat3 relTol 1e-4) + assert(pValue3 ~== referencePVal3 relTol 1e-4) + // reject null hypothesis + assert(pValue3 < pThreshold) + } + + test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") { + /* + Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample + > sessionInfo() + R version 3.2.0 (2015-04-16) + Platform: x86_64-apple-darwin13.4.0 (64-bit) + > set.seed(20) + > v <- rnorm(20) + > v + [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612 + [7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222 + [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264 + [19] -0.08578219 0.38921440 + > ks.test(v, pnorm, alternative = "two.sided") + + One-sample Kolmogorov-Smirnov test + + data: v + D = 0.18874, p-value = 0.4223 + alternative hypothesis: two-sided + */ + + val rKSStat = 0.18874 + val rKSPVal = 0.4223 + val rData = sc.parallelize( + Array( + 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501, + -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555, + -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063, + -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691, + 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942 + ) + ).toDF("sample") + val Row(pValue: Double, statistic: Double) = KolmogorovSmirnovTest + .test(rData, "sample", "norm", 0, 1).head() + assert(statistic ~== rKSStat relTol 1e-4) + assert(pValue ~== rKSPVal relTol 1e-4) + } +} From 283e75a53c3eef73d665f928097af64d6b666f50 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 1 Mar 2018 18:12:31 +0800 Subject: [PATCH 2/7] update --- .../apache/spark/ml/stat/KolmogorovSmirnovTest.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala index 62caae8211aa3..91a4eef704099 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.functions.col * Kolmogorov-Smirnov test (Wikipedia) */ @Experimental -@Since("2.3.0") +@Since("2.4.0") object KolmogorovSmirnovTest { /** Used to construct output schema of test */ @@ -48,9 +48,7 @@ object KolmogorovSmirnovTest { private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = { SchemaUtils.checkNumericType(dataset.schema, sampleCol) - dataset.select(col(sampleCol).cast("double")).rdd.map { - case Row(sample: Double) => sample - } + dataset.select(col(sampleCol).cast("double")).as[Double].rdd } /** @@ -62,7 +60,7 @@ object KolmogorovSmirnovTest { * @param dataset a `DataFrame` containing the sample of data to test * @param sampleCol Name of sample column in dataset, of any numerical type * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value - * @return DataFrame containing the test result for every feature against the label. + * @return DataFrame containing the test result for the input sampled data. * This DataFrame will contain a single Row with the following fields: * - `pValue: Double` * - `statistic: Double` @@ -86,7 +84,7 @@ object KolmogorovSmirnovTest { * @param sampleCol Name of sample column in dataset, of any numerical type * @param distName a `String` name for a theoretical distribution, currently only support "norm". * @param params `Double*` specifying the parameters to be used for the theoretical distribution - * @return DataFrame containing the test result for every feature against the label. + * @return DataFrame containing the test result for the input sampled data. * This DataFrame will contain a single Row with the following fields: * - `pValue: Double` * - `statistic: Double` From 4d1076e5aeea5fe8382c4fbd198d69fb0ce681b1 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 1 Mar 2018 20:57:21 +0800 Subject: [PATCH 3/7] fix build error --- .../scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala index 91a4eef704099..eeb573c1175fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -48,6 +48,7 @@ object KolmogorovSmirnovTest { private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = { SchemaUtils.checkNumericType(dataset.schema, sampleCol) + import dataset.sparkSession.implicits._ dataset.select(col(sampleCol).cast("double")).as[Double].rdd } From aa9772e227a6406596d9cec97d73ef285204f785 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 2 Mar 2018 10:19:26 +0800 Subject: [PATCH 4/7] update --- .../org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala index eeb573c1175fe..f602bb10b7eef 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -66,7 +66,7 @@ object KolmogorovSmirnovTest { * - `pValue: Double` * - `statistic: Double` */ - @Since("2.3.0") + @Since("2.4.0") def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): DataFrame = { val spark = dataset.sparkSession @@ -90,7 +90,7 @@ object KolmogorovSmirnovTest { * - `pValue: Double` * - `statistic: Double` */ - @Since("2.3.0") + @Since("2.4.0") @varargs def test(dataset: DataFrame, sampleCol: String, distName: String, params: Double*): DataFrame = { val spark = dataset.sparkSession From ccd22f553a37ba166dd2881cb965edc19ff653fc Mon Sep 17 00:00:00 2001 From: jkbradley Date: Mon, 19 Mar 2018 18:51:36 -0700 Subject: [PATCH 5/7] Java test for KSTest (#4) --- .../stat/JavaKolmogorovSmirnovTestSuite.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java diff --git a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java new file mode 100644 index 0000000000000..bc6d7fbba2cad --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.math3.distribution.NormalDistribution; +import org.junit.Test; + +import org.apache.spark.SharedSparkSession; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + + +public class JavaKolmogorovSmirnovTestSuite extends SharedSparkSession { + + private transient Dataset dataset; + + @Override + public void setUp() throws IOException { + super.setUp(); + List points = new ArrayList<>(); + points.add(0.1); + points.add(1.1); + points.add(10.1); + points.add(-1.1); + + dataset = + spark.createDataFrame(jsc.parallelize(points, 2), Double.class).toDF("sample"); + } + + /* + @Test + public void testKSTestCDF() { + // Create theoretical distributions + NormalDistribution stdNormalDist = new NormalDistribution(0, 1); + + // set seeds + Long seed = 10L; + stdNormalDist.reseedRandomGenerator(seed); + Function stdNormalCDF = (x) -> stdNormalDist.cumulativeProbability(x); + + double pThreshold = 0.05; + + // Comparing a standard normal sample to a standard normal distribution + Row results = KolmogorovSmirnovTest + .test(dataset, "sample", stdNormalCDF).head(); + double pValue1 = results.getDouble(0); + // Cannot reject null hypothesis + assert(pValue1 > pThreshold); + } + */ + + @Test + public void testKSTestNamedDistribution() { + double pThreshold = 0.05; + + // Comparing a standard normal sample to a standard normal distribution + Row results = KolmogorovSmirnovTest + .test(dataset, "sample", "norm", 0.0, 1.0).head(); + double pValue1 = results.getDouble(0); + // Cannot reject null hypothesis + assert(pValue1 > pThreshold); + } +} From 48c077bba48b87846806a5ac023f12440a449d7c Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 20 Mar 2018 17:32:59 +0800 Subject: [PATCH 6/7] test refactor & add uniform dist test --- .../spark/ml/stat/KolmogorovSmirnovTest.scala | 4 +- .../ml/stat/KolmogorovSmirnovTestSuite.scala | 95 ++++++++++--------- 2 files changed, 53 insertions(+), 46 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala index f602bb10b7eef..fdd5d465efa2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -43,8 +43,8 @@ object KolmogorovSmirnovTest { /** Used to construct output schema of test */ private case class KolmogorovSmirnovTestResult( - pValues: Double, - statistics: Double) + pValue: Double, + statistic: Double) private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = { SchemaUtils.checkNumericType(dataset.schema, sampleCol) diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala index 4ef44c950559f..1312de3a1b522 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.ml.stat -import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution} +import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution, + RealDistribution, UniformRealDistribution} import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => Math3KSTest} import org.apache.spark.SparkFunSuite @@ -31,65 +32,71 @@ class KolmogorovSmirnovTestSuite import testImplicits._ - test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") { - // Create theoretical distributions - val stdNormalDist = new NormalDistribution(0, 1) - val expDist = new ExponentialDistribution(0.6) + def apacheCommonMath3EquivalenceTest( + sampleDist: RealDistribution, + theoreticalDist: RealDistribution, + theoreticalDistByName: (String, Array[Double]), + rejectNullHypothesis: Boolean): Unit = { // set seeds val seed = 10L - stdNormalDist.reseedRandomGenerator(seed) - expDist.reseedRandomGenerator(seed) + sampleDist.reseedRandomGenerator(seed) + if (theoreticalDist != null) { + theoreticalDist.reseedRandomGenerator(seed) + } // Sample data from the distributions and parallelize it val n = 100000 - val sampledNormArray = stdNormalDist.sample(n) - val sampledNormDF = sc.parallelize(sampledNormArray, 10).toDF("sample") - val sampledExpArray = expDist.sample(n) - val sampledExpDF = sc.parallelize(sampledExpArray, 10).toDF("sample") + val sampledArray = sampleDist.sample(n) + val sampledDF = sc.parallelize(sampledArray, 10).toDF("sample") // Use a apache math commons local KS test to verify calculations val ksTest = new Math3KSTest() val pThreshold = 0.05 // Comparing a standard normal sample to a standard normal distribution - val Row(pValue1: Double, statistic1: Double) = KolmogorovSmirnovTest - .test(sampledNormDF, "sample", "norm", 0.0, 1.0).head() - val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNormArray) + val Row(pValue1: Double, statistic1: Double) = + if (theoreticalDist != null) { + val cdf = (x: Double) => theoreticalDist.cumulativeProbability(x) + KolmogorovSmirnovTest.test(sampledDF, "sample", cdf).head() + } else { + KolmogorovSmirnovTest.test(sampledDF, "sample", + theoreticalDistByName._1, + theoreticalDistByName._2: _* + ).head() + } + val theoreticalDistMath3 = if (theoreticalDist == null) { + assert(theoreticalDistByName._1 == "norm") + val params = theoreticalDistByName._2 + new NormalDistribution(params(0), params(1)) + } else { + theoreticalDist + } + val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(theoreticalDistMath3, sampledArray) val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n) // Verify vs apache math commons ks test assert(statistic1 ~== referenceStat1 relTol 1e-4) assert(pValue1 ~== referencePVal1 relTol 1e-4) - // Cannot reject null hypothesis - assert(pValue1 > pThreshold) - - // Comparing an exponential sample to a standard normal distribution - val Row(pValue2: Double, statistic2: Double) = KolmogorovSmirnovTest - .test(sampledExpDF, "sample", "norm", 0, 1).head() - val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExpArray) - val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n) - // verify vs apache math commons ks test - assert(statistic2 ~== referenceStat2 relTol 1e-4) - assert(pValue2 ~== referencePVal2 relTol 1e-4) - // reject null hypothesis - assert(pValue2 < pThreshold) - - // Testing the use of a user provided CDF function - // Distribution is not serializable, so will have to create in the lambda - val expCDF = (x: Double) => new ExponentialDistribution(0.2).cumulativeProbability(x) - - // Comparing an exponential sample with mean X to an exponential distribution with mean Y - // Where X != Y - val Row(pValue3: Double, statistic3: Double) = KolmogorovSmirnovTest - .test(sampledExpDF, "sample", expCDF).head() - val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2), - sampledExpArray) - val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledExpArray.length) - // verify vs apache math commons ks test - assert(statistic3 ~== referenceStat3 relTol 1e-4) - assert(pValue3 ~== referencePVal3 relTol 1e-4) - // reject null hypothesis - assert(pValue3 < pThreshold) + + if (rejectNullHypothesis) { + assert(pValue1 < pThreshold) + } else { + assert(pValue1 > pThreshold) + } + } + + test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") { + // Create theoretical distributions + val stdNormalDist = new NormalDistribution(0.0, 1.0) + val expDist = new ExponentialDistribution(0.6) + val uniformDist = new UniformRealDistribution(0.0, 1.0) + val expDist2 = new ExponentialDistribution(0.2) + val stdNormByName = Tuple2("norm", Array(0.0, 1.0)) + + apacheCommonMath3EquivalenceTest(stdNormalDist, null, stdNormByName, false) + apacheCommonMath3EquivalenceTest(expDist, null, stdNormByName, true) + apacheCommonMath3EquivalenceTest(uniformDist, null, stdNormByName, true) + apacheCommonMath3EquivalenceTest(expDist, expDist2, null, true) } test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") { From 6187d8893405afc3e488de55fe36d7f736b16cc3 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 20 Mar 2018 18:29:31 +0800 Subject: [PATCH 7/7] add java-friendly api --- .../spark/ml/stat/KolmogorovSmirnovTest.scala | 10 ++++++++++ .../stat/JavaKolmogorovSmirnovTestSuite.java | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala index fdd5d465efa2d..8d80e7768cb6e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.stat import scala.annotation.varargs import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.api.java.function.Function import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.mllib.stat.{Statistics => OldStatistics} import org.apache.spark.rdd.RDD @@ -76,6 +77,15 @@ object KolmogorovSmirnovTest { testResult.pValue, testResult.statistic))) } + /** + * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, cdf: Double => Double)` + */ + @Since("2.4.0") + def test(dataset: DataFrame, sampleCol: String, + cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = { + test(dataset, sampleCol, (x: Double) => cdf.call(x)) + } + /** * Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability * distribution equality. Currently supports the normal distribution, taking as parameters diff --git a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java index bc6d7fbba2cad..021272dd5a40c 100644 --- a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java @@ -19,9 +19,17 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.spark.ml.linalg.VectorUDT; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.junit.Test; import org.apache.spark.SharedSparkSession; @@ -37,17 +45,11 @@ public class JavaKolmogorovSmirnovTestSuite extends SharedSparkSession { @Override public void setUp() throws IOException { super.setUp(); - List points = new ArrayList<>(); - points.add(0.1); - points.add(1.1); - points.add(10.1); - points.add(-1.1); + List points = Arrays.asList(0.1, 1.1, 10.1, -1.1); - dataset = - spark.createDataFrame(jsc.parallelize(points, 2), Double.class).toDF("sample"); + dataset = spark.createDataset(points, Encoders.DOUBLE()).toDF("sample"); } - /* @Test public void testKSTestCDF() { // Create theoretical distributions @@ -67,7 +69,6 @@ public void testKSTestCDF() { // Cannot reject null hypothesis assert(pValue1 > pThreshold); } - */ @Test public void testKSTestNamedDistribution() {