From a9a59cba0d0ba29c8a483a6bf18426db14d59860 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Wed, 16 Mar 2016 03:34:45 -0700 Subject: [PATCH 1/3] ad test --- .../apache/spark/mllib/stat/Statistics.scala | 31 +- .../mllib/stat/test/AndersonDarlingTest.scala | 278 ++++++++++++++++++ .../spark/mllib/stat/test/TestResult.scala | 19 ++ .../mllib/stat/HypothesisTestSuite.scala | 174 +++++++++++ 4 files changed, 500 insertions(+), 2 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index f3159f7e724cc..9f9e56e7f15c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -25,8 +25,7 @@ import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.correlation.Correlations -import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult, KolmogorovSmirnovTest, - KolmogorovSmirnovTestResult} +import org.apache.spark.mllib.stat.test._ import org.apache.spark.rdd.RDD /** @@ -226,4 +225,32 @@ object Statistics { params: Double*): KolmogorovSmirnovTestResult = { kolmogorovSmirnovTest(data.rdd.asInstanceOf[RDD[Double]], distName, params: _*) } + + /** + * Conduct one-sample Anderson-Darling test for the null hypothesis that the data + * comes from a given theoretical distribution. The Anderson-Darling test is an alternative + * to the Kolmogorov-Smirnov test, and is more adequate at identifying departures from the + * theoretical distribution at the tails. The implementation returns an + * `AndersonDarlingTestResult`, which includes the statistic, the critical values at varying + * significance levels, and the null hypothesis. Note that the critical values are calculated + * assuming the parameters have been calculated from the data sample. + * @param data the data to be tested + * @param distName {"norm", "exp", "gumbel", "logistic" or "weibull"}. Name of the distribution + * to test against. + * @param params provides the parameters for the theoretical distribution. + * The order of parameters are as follow + * Normal -> [mu, sigma] (location, scale) + * Exponential -> [1 / lambda] (scale) + * Gumbel -> [mu, beta] (location, scale) + * Logistic -> [mu, s] (location, scale) + * Weibull -> [lambda, k] (scale, shape) + * @return [[org.apache.spark.mllib.stat.test.AndersonDarlingTestResult]] object containing + * the test statistic, various critical values at different significance levels, + * and a summary of the null hypothesis + */ + @varargs + def andersonDarlingTest(data: RDD[Double], distName: String, params: Double*) + : AndersonDarlingTestResult = { + AndersonDarlingTest.testOneSample(data, distName, params: _*) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala new file mode 100644 index 0000000000000..152e017d828df --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { + type NullHypothesis = Value + val OneSample = Value("Sample follows theoretical distribution.") + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { + // parameters used to initialized the distribution + val params: Seq[Double] + // calculate the cdf under the given distribution for value x + def cdf(x: Double): Double + // return appropriate CVs, adjusted for sample size + def getCriticalValues(n: Double): Map[Double, Double] + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs + * -test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends AndersonDarlingTheoreticalDist { + private val theoretical = new ExponentialDistribution(params(0)) + + private val rawCriticalValues = ListMap( + 0.15 -> 0.922, 0.10 -> 1.078, 0.05 -> 1.341, 0.025 -> 1.606, 0.01 -> 1.957 + ) + + def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + + def getCriticalValues(n: Double): Map[Double, Double] = { + rawCriticalValues.map { case (sig, cv) => sig -> cv / (1 + 0.6 / n) } + } + } + + // Normal Distribution + class AndersonDarlingNormal(val params: Seq[Double]) extends AndersonDarlingTheoreticalDist { + private val theoretical = new NormalDistribution(params(0), params(1)) + + private val rawCriticalValues = ListMap( + 0.15 -> 0.576, 0.10 -> 0.656, 0.05 -> 0.787, 0.025 -> 0.918, 0.01 -> 1.092 + ) + + def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + + def getCriticalValues(n: Double): Map[Double, Double] = { + rawCriticalValues.map { case (sig, cv) => sig -> cv / (1 + 4.0 / n - 25.0 / (n * n)) } + } + } + + // Gumbel distribution + class AndersonDarlingGumbel(val params: Seq[Double]) extends AndersonDarlingTheoreticalDist { + private val theoretical = new GumbelDistribution(params(0), params(1)) + + private val rawCriticalValues = ListMap( + 0.25 -> 0.474, 0.10 -> 0.637, 0.05 -> 0.757, 0.025 -> 0.877, 0.01 -> 1.038 + ) + + def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + + def getCriticalValues(n: Double): Map[Double, Double] = { + rawCriticalValues.map { case (sig, cv) => sig -> cv / (1 + 0.2 / math.sqrt(n)) } + } + } + + // Logistic distribution + class AndersonDarlingLogistic(val params: Seq[Double]) extends AndersonDarlingTheoreticalDist { + private val theoretical = new LogisticDistribution(params(0), params(1)) + + private val rawCriticalValues = ListMap( + 0.25 -> 0.426, 0.10 -> 0.563, 0.05 -> 0.660, 0.025 -> 0.769, 0.01 -> 0.906, 0.005 -> 1.010 + ) + + def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + + def getCriticalValues(n: Double): Map[Double, Double] = { + rawCriticalValues.map { case (sig, cv) => sig -> cv / (1 + 0.25 / n) } + } + } + + // Weibull distribution + class AndersonDarlingWeibull(val params: Seq[Double]) extends AndersonDarlingTheoreticalDist { + private val theoretical = new WeibullDistribution(params(0), params(1)) + + private val rawCriticalValuess = ListMap( + 0.25 -> 0.474, 0.10 -> 0.637, 0.05 -> 0.757, 0.025 -> 0.877, 0.01 -> 1.038 + ) + + def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + + def getCriticalValues(n: Double): Map[Double, Double] = { + rawCriticalValuess.map { case (sig, cv) => sig -> cv / (1 + 0.2 / math.sqrt(n)) } + } + } + + /** + * Perform a one sample Anderson-Darling test + * @param data data to test for a given distribution + * @param distName name of theoretical distribution: currently supports normal, + * exponential, gumbel, logistic, weibull as + * ['norm', 'exp', 'gumbel', 'logistic', 'weibull'] + * @param params variable-length argument providing parameters for given distribution. When none + * are provided, default parameters appropriate to each distribution are chosen. In + * either case, critical values reflect adjustments that assume the parameters were + * estimated from the data + * @return + */ + @varargs + def testOneSample(data: RDD[Double], distName: String, params: Double*) + : AndersonDarlingTestResult = { + val n = data.count() + val dist = initDist(distName, params) + val localData = data.sortBy(x => x).mapPartitions(calcPartAD(_, dist, n)).collect() + val s = localData.foldLeft((0.0, 0.0)) { case ((prevStat, prevCt), (rawStat, adj, ct)) => + val adjVal = 2 * prevCt * adj + val adjustedStat = rawStat + adjVal + val cumCt = prevCt + ct + (prevStat + adjustedStat, cumCt) + }._1 + val ADStat = -1 * n - s / n + val criticalVals = dist.getCriticalValues(n) + new AndersonDarlingTestResult(ADStat, criticalVals, NullHypothesis.OneSample.toString) + } + + + /** + * Calculate a partition's contribution to the Anderson-Darling statistic. + * In each partition we calculate 2 values, an unadjusted value that is contributed to the AD + * statistic directly, a value that must be adjusted by the number of values in the prior + * partitions, and a count of the elements in that partition + * @param part a partition of the data sample to be analyzed + * @param dist a theoretical distribution that extends the AndersonDarlingTheoreticalDist trait, + * used to calculate CDF values and critical values + * @param n the total size of the data sample + * @return The first element corresponds to the position-independent contribution to the + * statistic, the second is the value that must be scaled by the number of elements in + * prior partitions, and the third is the number of elements in this partition + */ + private def calcPartAD(part: Iterator[Double], dist: AndersonDarlingTheoreticalDist, n: Double) + : Iterator[(Double, Double, Double)] = { + val initAcc = (0.0, 0.0, 0.0) + val pResult = part.zipWithIndex.foldLeft(initAcc) { case ((prevS, prevC, prevCt), (v, i)) => + val y = dist.cdf(v) + val a = math.log(y) + val b = math.log(1 - y) + val unAdjusted = a * (2 * i + 1) + b * (2 * n - 2 * i - 1) + val adjConstant = a - b + (prevS + unAdjusted, prevC + adjConstant, prevCt + 1) + } + Array(pResult).iterator + } + + /** + * Create a theoretical distribution to be used in the one sample Anderson-Darling test + * @param distName name of distribution + * @param params Initialization parameters for distribution, if none provided, default values + * are chosen. + * @return distribution object used to calculate CDF values + */ + private def initDist(distName: String, params: Seq[Double]): AndersonDarlingTheoreticalDist = { + distName match { + case "norm" => + val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + new AndersonDarlingNormal(checkedParams) + case "exp" => + val checkedParams = validateParams(distName, params, 1, Seq(1.0)) + new AndersonDarlingExponential(checkedParams) + case "gumbel" => + val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + new AndersonDarlingGumbel(checkedParams) + case "logistic" => + val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + new AndersonDarlingLogistic(checkedParams) + case "weibull" => + val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + new AndersonDarlingWeibull(checkedParams) + case _ => throw new IllegalArgumentException( + s"Anderson-Darling does not currently support $distName distribution" + + " must be one of 'norm', 'exp', 'gumbel', 'logistic', or 'weibull'") + } + } + + /** + * Validate the length of parameters passed in by the user, if none are passed, return default + * values + * @param distName name of distribution + * @param params parameters passed by user + * @param reqLen the required length of the parameter sequence + * @param defParams default alternative for the parameter in case `params` is empty + * @return parameters that will be used to initialize the distribution + */ + private def validateParams( + distName: String, + params: Seq[Double], + reqLen: Int, + defParams: Seq[Double]): Seq[Double] = { + if (params.nonEmpty) { + require(params.length == reqLen, s"$distName distribution requires $reqLen parameters.") + params + } else { + logWarning(s"No parameters passed for $distName distribution, " + + s"initialized with " + defParams.mkString(", ")) + defParams + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index 8a29fd39a9106..8bceeb805ae4d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -133,3 +133,22 @@ private[stat] class StreamingTestResult @Since("1.6.0") ( } } +/** + * :: Experimental :: + * Object containing the test results for the Anderson-Darling test. + */ +@Experimental +class AndersonDarlingTestResult private[stat] ( + val statistic: Double, + val criticalValues: Map[Double, Double], + val nullHypothesis: String) { + + override def toString: String = { + "Anderson-Darling test summary:\n" + + s"statistic = $statistic \n" + + s"critical-values for a given significance:\n" + + criticalValues.map { case (sig, cv) => + "\t" + sig.toString + " -> " + cv.toString + }.mkString("\n") + "\n" + s"Null Hypothesis: $nullHypothesis" + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 46fcebe132749..c3722de3c6c9a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -254,4 +254,178 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(rCompResult.statistic ~== rKSStat relTol 1e-4) assert(rCompResult.pValue ~== rKSPVal relTol 1e-4) } + + test("one sample Anderson-Darling test: R implementation equivalence") { + /* + Data generated with R + > sessionInfo() #truncated + R version 3.2.0 (2015-04-16) + Platform: x86_64-apple-darwin13.4.0 (64-bit) + Running under: OS X 10.10.2 (Yosemite) + > set.seed(10) + > dataNorm <- rnorm(20) + > dataExp <- rexp(20) + > dataUnif <- runif(20) + > mean(dataNorm) + [1] -0.06053267 + > sd(dataNorm) + [1] 0.7999093 + > mean(dataExp) + [1] 1.044636 + > sd(dataExp) + [1] 0.96727 + > mean(dataUnif) + [1] 0.4420219 + > sd(dataUnif) + [1] 0.2593285 + */ + + val dataNorm = sc.parallelize( + Array(0.0187461709418264, -0.184252542069064, -1.37133054992251, + -0.599167715783718, 0.294545126567508, 0.389794300700167, -1.20807617542949, + -0.363676017470862, -1.62667268170309, -0.256478394123992, 1.10177950308713, + 0.755781508027337, -0.238233556018718, 0.98744470341339, 0.741390128383824, + 0.0893472664958216, -0.954943856152377, -0.195150384667239, 0.92552126209408, + 0.482978524836611) + ) + + val dataExp = sc.parallelize( + Array(0.795082630547595, 1.39629918233218, 1.39810742601556, 1.11045944034578, + 0.170421596598791, 1.91878133072498, 0.166443939786404, 0.97028998914142, 0.010571192484349, + 2.79300971312409, 2.35461177957702, 0.667238388210535, 0.522243486717343, 0.146712897811085, + 0.751234306178963, 2.28856621111248, 0.0688535687513649, 0.282713153399527, + 0.0514786350540817, 3.02959313971882) + ) + + val dataUnif = sc.parallelize( + Array(0.545859839767218, 0.372763097286224, 0.961302414536476, 0.257341569056734, + 0.207951683318242, 0.861382439732552, 0.464391982648522, 0.222867433447391, + 0.623549601528794, 0.203647700604051, 0.0196734135970473, 0.797993005951867, + 0.274318896699697, 0.166609104024246, 0.170151718193665, 0.4885059366934, + 0.711409077281132, 0.591934921452776, 0.517156876856461, 0.381627685856074) + ) + + /* normality test in R + > library(nortest) + > ad.test(dataNorm) + Anderson-Darling normality test + data: dataNorm + A = 0.27523, p-value = 0.6216 + > ad.test(dataExp) + Anderson-Darling normality test + data: dataExp + A = 0.79034, p-value = 0.03336 + > ad.test(dataUnif) + Anderson-Darling normality test + data: dataUnif + A = 0.31831, p-value = 0.5114 + */ + + val rNormADStats = Map("norm" -> 0.27523, "exp" -> 0.79034, "unif" -> 0.31831) + val params = Map( + "norm" -> (-0.06053267, 0.7999093), + "exp" -> (1.044636, 0.96727), + "unif" -> (0.4420219, 0.2593285) + ) + + assert( + Statistics.andersonDarlingTest( + dataNorm, + "norm", + params("norm")._1, + params("norm")._2 + ).statistic + ~== rNormADStats("norm") relTol 1e-4 + ) + + assert( + Statistics.andersonDarlingTest( + dataExp, + "norm", + params("exp")._1, + params("exp")._2 + ).statistic + ~== rNormADStats("exp") relTol 1e-4 + ) + + assert( + Statistics.andersonDarlingTest( + dataUnif, + "norm", + params("unif")._1, + params("unif")._2 + ).statistic + ~== rNormADStats("unif") relTol 1e-4 + ) + } + + test("one sample Anderson-Darling test: SciPy implementation equivalence") { + val dataNorm = sc.parallelize( + Array(0.0187461709418264, -0.184252542069064, -1.37133054992251, + -0.599167715783718, 0.294545126567508, 0.389794300700167, -1.20807617542949, + -0.363676017470862, -1.62667268170309, -0.256478394123992, 1.10177950308713, + 0.755781508027337, -0.238233556018718, 0.98744470341339, 0.741390128383824, + 0.0893472664958216, -0.954943856152377, -0.195150384667239, 0.92552126209408, + 0.482978524836611) + ) + + val dataExp = sc.parallelize( + Array(0.795082630547595, 1.39629918233218, 1.39810742601556, 1.11045944034578, + 0.170421596598791, 1.91878133072498, 0.166443939786404, 0.97028998914142, 0.010571192484349, + 2.79300971312409, 2.35461177957702, 0.667238388210535, 0.522243486717343, 0.146712897811085, + 0.751234306178963, 2.28856621111248, 0.0688535687513649, 0.282713153399527, + 0.0514786350540817, 3.02959313971882) + ) + + val params = Map("norm" -> (-0.06053267, 0.7999093)) + + /* + normality test in scipy: comparing critical values + >>> from scipy.stats import anderson + >>> # drop in values as arrays + ... + >>> anderson(dataNorm, "norm") + (0.27523090925717852, array([ 0.506, 0.577, 0.692, 0.807, 0.96 ]), + array([ 15. , 10. , 5. , 2.5, 1. ])) + >>> anderson(dataExp, "expon") + (0.45714575153590431, array([ 0.895, 1.047, 1.302, 1.559, 1.9 ]), + array([ 15. , 10. , 5. , 2.5, 1. ])) + */ + val sciPyNormCVs = Array(0.506, 0.577, 0.692, 0.807, 0.96) + val adNormCVs = Statistics.andersonDarlingTest( + dataNorm, + "norm", + params("norm")._1, + params("norm")._2 + ).criticalValues.values.toArray + + assert(Vectors.dense(adNormCVs) ~== Vectors.dense(sciPyNormCVs) relTol 1e-3) + + val sciPyExpCVs = Array(0.895, 1.047, 1.302, 1.559, 1.9) + val scaleParam = dataExp.mean() + val adExpCVs = Statistics.andersonDarlingTest(dataExp, "exp", scaleParam).criticalValues + assert(Vectors.dense(adExpCVs.values.toArray) ~== Vectors.dense(sciPyExpCVs) relTol 1e-3) + + /* + >>> from scipy.stats.distributions import logistic + >>> logistic.fit(dataExp) + (0.93858397620886713, 0.55032469036705811) + >>> anderson(dataExp, "logistic") + (0.72718900969834621, array([ 0.421, 0.556, 0.652, 0.76 , 0.895, 0.998]), + array([ 25. , 10. , 5. , 2.5, 1. , 0.5])) + */ + val sciPyLogADStat = 0.72718900969834621 + val sciPyLogCVs = Array(0.421, 0.556, 0.652, 0.76, 0.895, 0.998) + val adTestExpLog = Statistics.andersonDarlingTest( + dataExp, + "logistic", + 0.93858397620886713, + 0.55032469036705811) + + assert(adTestExpLog.statistic ~== sciPyLogADStat relTol 1e-4) + assert( + Vectors.dense(adTestExpLog.criticalValues.values.toArray) + ~== Vectors.dense(sciPyLogCVs) relTol 1e-3 + ) + } } From deda4600b92f69d2f9df395007d146d27e8823c8 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 21 Mar 2016 17:51:58 +0800 Subject: [PATCH 2/3] simplify implementation and add new ut --- .../mllib/stat/test/AndersonDarlingTest.scala | 63 +++++-------------- .../mllib/stat/HypothesisTestSuite.scala | 22 +++++++ 2 files changed, 36 insertions(+), 49 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala index 152e017d828df..afa480bf046f1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala @@ -49,10 +49,7 @@ private[stat] object AndersonDarlingTest extends Logging { /** * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must - * extend. The rationale for this is that the AD test has distribution-dependent critical values, - * and by requiring extension of this trait we guarantee that future additional distributions - * make sure to add the appropriate critical values (CVs) (or at least acknowledge - * that they should be added) + * extend. Extensions should add distribution-dependent critical values (CVs). */ sealed trait AndersonDarlingTheoreticalDist extends Serializable { // parameters used to initialized the distribution @@ -168,9 +165,8 @@ private[stat] object AndersonDarlingTest extends Logging { /** * Perform a one sample Anderson-Darling test * @param data data to test for a given distribution - * @param distName name of theoretical distribution: currently supports normal, - * exponential, gumbel, logistic, weibull as - * ['norm', 'exp', 'gumbel', 'logistic', 'weibull'] + * @param distName name of theoretical distribution: currently supports normal, exponential, + * gumbel, logistic, weibull as ['norm', 'exp', 'gumbel', 'logistic', 'weibull'] * @param params variable-length argument providing parameters for given distribution. When none * are provided, default parameters appropriate to each distribution are chosen. In * either case, critical values reflect adjustments that assume the parameters were @@ -182,46 +178,15 @@ private[stat] object AndersonDarlingTest extends Logging { : AndersonDarlingTestResult = { val n = data.count() val dist = initDist(distName, params) - val localData = data.sortBy(x => x).mapPartitions(calcPartAD(_, dist, n)).collect() - val s = localData.foldLeft((0.0, 0.0)) { case ((prevStat, prevCt), (rawStat, adj, ct)) => - val adjVal = 2 * prevCt * adj - val adjustedStat = rawStat + adjVal - val cumCt = prevCt + ct - (prevStat + adjustedStat, cumCt) - }._1 - val ADStat = -1 * n - s / n + val interRDD = data.sortBy(x => x).zipWithIndex().map { case(v, i) => + val c = dist.cdf(v) + (2 * i + 1) * math.log(c) + (2 * n - 2 * i - 1) * math.log(1 - c) + } + val ADStat = - n - interRDD.sum() / n val criticalVals = dist.getCriticalValues(n) new AndersonDarlingTestResult(ADStat, criticalVals, NullHypothesis.OneSample.toString) } - - /** - * Calculate a partition's contribution to the Anderson-Darling statistic. - * In each partition we calculate 2 values, an unadjusted value that is contributed to the AD - * statistic directly, a value that must be adjusted by the number of values in the prior - * partitions, and a count of the elements in that partition - * @param part a partition of the data sample to be analyzed - * @param dist a theoretical distribution that extends the AndersonDarlingTheoreticalDist trait, - * used to calculate CDF values and critical values - * @param n the total size of the data sample - * @return The first element corresponds to the position-independent contribution to the - * statistic, the second is the value that must be scaled by the number of elements in - * prior partitions, and the third is the number of elements in this partition - */ - private def calcPartAD(part: Iterator[Double], dist: AndersonDarlingTheoreticalDist, n: Double) - : Iterator[(Double, Double, Double)] = { - val initAcc = (0.0, 0.0, 0.0) - val pResult = part.zipWithIndex.foldLeft(initAcc) { case ((prevS, prevC, prevCt), (v, i)) => - val y = dist.cdf(v) - val a = math.log(y) - val b = math.log(1 - y) - val unAdjusted = a * (2 * i + 1) + b * (2 * n - 2 * i - 1) - val adjConstant = a - b - (prevS + unAdjusted, prevC + adjConstant, prevCt + 1) - } - Array(pResult).iterator - } - /** * Create a theoretical distribution to be used in the one sample Anderson-Darling test * @param distName name of distribution @@ -232,19 +197,19 @@ private[stat] object AndersonDarlingTest extends Logging { private def initDist(distName: String, params: Seq[Double]): AndersonDarlingTheoreticalDist = { distName match { case "norm" => - val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + val checkedParams = getParams(distName, params, 2, Seq(0.0, 1.0)) new AndersonDarlingNormal(checkedParams) case "exp" => - val checkedParams = validateParams(distName, params, 1, Seq(1.0)) + val checkedParams = getParams(distName, params, 1, Seq(1.0)) new AndersonDarlingExponential(checkedParams) case "gumbel" => - val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + val checkedParams = getParams(distName, params, 2, Seq(0.0, 1.0)) new AndersonDarlingGumbel(checkedParams) case "logistic" => - val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + val checkedParams = getParams(distName, params, 2, Seq(0.0, 1.0)) new AndersonDarlingLogistic(checkedParams) case "weibull" => - val checkedParams = validateParams(distName, params, 2, Seq(0.0, 1.0)) + val checkedParams = getParams(distName, params, 2, Seq(0.0, 1.0)) new AndersonDarlingWeibull(checkedParams) case _ => throw new IllegalArgumentException( s"Anderson-Darling does not currently support $distName distribution" + @@ -261,7 +226,7 @@ private[stat] object AndersonDarlingTest extends Logging { * @param defParams default alternative for the parameter in case `params` is empty * @return parameters that will be used to initialize the distribution */ - private def validateParams( + private def getParams( distName: String, params: Seq[Double], reqLen: Int, diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index c3722de3c6c9a..19a665f1457f7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -428,4 +428,26 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { ~== Vectors.dense(sciPyLogCVs) relTol 1e-3 ) } + + test("one sample Anderson-Darling test: gumbel and weibull") { + val dataGumbel = sc.parallelize( + Array(2.566070163268321, 2.797136867663637, 1.301680801603979, 4.968891406617431, + 2.736825109869105, 1.7156182506225437, 4.849728670708153, 2.2718140406461034, + 1.7349531624810886, 1.7220364354275257, 2.755987927298529, 1.5140172593783077, + 1.3178298892942695, 4.031431825825987, 2.062204842676713, 1.3800199221612661, + 3.4851421470537165, 1.8851564957011937, 3.6773776009634975, 2.5361429810121083) + ) + val gumbelResult = Statistics.andersonDarlingTest(dataGumbel, "gumbel", 2, 1) + assert(gumbelResult.statistic < gumbelResult.criticalValues(0.25)) + + val dataWeibull = sc.parallelize( + Array(0.14611490926735415, 0.7338331105666904, 1.3064941546142883, 0.8848645072661437, + 0.4041816812459189, 1.1742756450140364, 0.9327038477408471, 0.34414711566818174, + 1.7224814642768693, 0.9173020785734072, 0.6353718988539053, 1.657112759681775, + 0.5634766278998253, 1.2568957312640854, 1.7248031467826292, 0.8116808125903335, + 1.1108852360343489, 0.2983634954239019, 0.7063880887496812, 0.6382180292078736) + ) + val weibullResult = Statistics.andersonDarlingTest(dataWeibull, "weibull", 2, 1) + assert(weibullResult.statistic < weibullResult.criticalValues(0.25)) + } } From e5e5035896f78554c29d271956ebdf034274e5b5 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 22 Mar 2016 10:08:29 +0800 Subject: [PATCH 3/3] change logging package --- .../org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala index afa480bf046f1..ad90ce01d90cb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala @@ -23,7 +23,7 @@ import collection.immutable.ListMap import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, LogisticDistribution, NormalDistribution, WeibullDistribution} -import org.apache.spark.Logging +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD /**