From 10865378c3aba5e639c352bded61a616933a5f1c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 10 May 2015 19:19:20 +0800 Subject: [PATCH 01/15] Add support for calculating approximate quantile. --- .../spark/sql/DataFrameStatFunctions.scala | 12 +- .../sql/execution/stat/StatFunctions.scala | 129 +++++++++++++++++- .../apache/spark/sql/DataFrameStatSuite.scala | 21 +++ 3 files changed, 160 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index a1e74470afc89..71f89a7bdbc41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -26,7 +26,17 @@ import org.apache.spark.sql.execution.stat._ */ @Experimental final class DataFrameStatFunctions private[sql](df: DataFrame) { - + + /** + * Calculate the approximate quantile of numerical column of a DataFrame. + * @param col the name of the column + * @param quantile the quantile number + * @return the approximate quantile + */ + def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = { + StatFunctions.approxQuantile(df, col, quantile, epsilon) + } + /** * Calculate the sample covariance of two numerical columns of a DataFrame. * @param col1 the name of the first column diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 71b7f6c2a6756..697b301f31852 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.stat +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.Logging import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Cast} @@ -25,7 +27,132 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ private[sql] object StatFunctions extends Logging { - + + /** Calculate the approximate quantile for the given column */ + private[sql] def approxQuantile( + df: DataFrame, + col: String, + quantile: Double, + epsilon: Double = 0.05): Double = { + require(quantile > 0.0 && quantile < 1.0, "Quantile must be in the range of (0.0, 1.0).") + val summeries = collectQuantileSummaries(df, col, epsilon) + summeries.query(quantile) + } + + private def collectQuantileSummaries( + df: DataFrame, + col: String, + epsilon: Double): QuantileSummaries = { + val data = df.schema.fields.find(_.name == col) + require(data.nonEmpty, s"Couldn't find column with name $col") + require(data.get.dataType.isInstanceOf[NumericType], "Quantile calculation for column " + + s"with dataType ${data.get.dataType} not supported.") + + val column = Column(Cast(Column(col).expr, DoubleType)) + df.select(column).rdd.aggregate(new QuantileSummaries(epsilon = epsilon))( + seqOp = (summeries, row) => { + summeries.insert(row.getDouble(0)) + }, + combOp = (baseSummeries, other) => { + baseSummeries.merge(other) + }) + } + + /** + * Helper class to compute approximate quantile summary. + * This implementation is based on the algorithm proposed in the paper: + * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael + * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) + * + */ + private class QuantileSummaries( + compress_threshold: Int = 1000, + epsilon: Double = 0.05) extends Serializable { + var sampled = new ArrayBuffer[(Double, Int, Int)]() // sampled examples + var count = 0L // count of observed examples + + def getConstant(): Double = 2 * epsilon * count + + def insert(x: Double): this.type = { + var idx = sampled.indexWhere(_._1 > x) + if (idx == -1) { + idx = sampled.size + } + val delta = if (idx == 0 || idx == sampled.size) { + 0 + } else { + math.floor(getConstant()).toInt + } + val tuple = (x, 1, delta) + sampled.insert(idx, tuple) + count += 1 + + if (sampled.size > compress_threshold) { + compress() + } + this + } + + def compress(): Unit = { + var i = 0 + while (i < sampled.size - 1) { + val sample1 = sampled(i) + val sample2 = sampled(i + 1) + if (sample1._2 + sample2._2 + sample2._3 < math.floor(getConstant())) { + sampled.update(i + 1, (sample2._1, sample1._2 + sample2._2, sample2._3)) + sampled.remove(i) + } + i += 1 + } + } + + def merge(other: QuantileSummaries): QuantileSummaries = { + if (other.count > 0 && count > 0) { + other.sampled.foreach { sample => + val idx = sampled.indexWhere(s => s._1 > sample._1) + if (idx == 0) { + val new_sampled = (sampled(0)._1, sampled(0)._2, sampled(1)._3 / 2) + sampled.update(0, new_sampled) + val new_sample = (sample._1, sample._2, 0) + sampled.insert(0, new_sample) + } else if (idx == -1) { + val new_sampled = (sampled(sampled.size - 1)._1, sampled(sampled.size - 1)._2, + (sampled(sampled.size - 2)._3 * 2 * epsilon).toInt) + sampled.update(sampled.size - 1, new_sampled) + val new_sample = (sample._1, sample._2, 0) + sampled.insert(sampled.size, new_sample) + } else { + val new_sample = (sample._1, sample._2, (sampled(idx - 1)._3 + sampled(idx)._3) / 2) + sampled.insert(idx, new_sample) + } + } + count += other.count + compress() + this + } else if (other.count > 0) { + other + } else { + this + } + } + + def query(quantile: Double): Double = { + val rank = (quantile * count).toInt + var minRank = 0 + var i = 1 + while (i < sampled.size) { + val curSample = sampled(i) + val prevSample = sampled(i - 1) + minRank += prevSample._2 + if (minRank + curSample._2 + curSample._3 > rank + getConstant()) { + return prevSample._1 + } + i += 1 + } + return sampled.last._1 + } + } + /** Calculate the Pearson Correlation Coefficient for the given columns */ private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { val counts = collectStatisticalData(df, cols) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 46b1845a9180c..b53a18141f586 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -64,6 +64,27 @@ class DataFrameStatSuite extends FunSuite { assert(math.abs(decimalRes) < 1e-12) } + test("approximate quantile") { + val df = Seq.tabulate(1000)(i => (i, 2.0 * i)).toDF("singles", "doubles") + + val expected_1 = 500.0 + val expected_2 = 1600.0 + + val epsilons = List(0.1, 0.05, 0.001) + + for (i <- 0 to 2) { + val epsilon = epsilons(i) + val result1 = df.stat.approxQuantile("singles", 0.5, epsilon) + val result2 = df.stat.approxQuantile("doubles", 0.8, epsilon) + + val error_1 = 2 * 1000 * epsilon + val error_2 = 2 * 2000 * epsilon + + assert(math.abs(result1 - expected_1) < error_1) + assert(math.abs(result2 - expected_2) < error_2) + } + } + test("crosstab") { val df = Seq((0, 0), (2, 1), (1, 0), (2, 0), (0, 0), (2, 0)).toDF("a", "b") val crosstab = df.stat.crosstab("a", "b") From e1e6d943736e7b5bb86e90cba9c6ee6614e6ef4c Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 11 Feb 2016 09:19:15 -0800 Subject: [PATCH 02/15] adding more tests --- .../org/apache/spark/sql/execution/stat/StatFunctions.scala | 4 ++-- .../spark/sql/execution/stat/ApproxQuantileSuite.scala | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 2687642c2cf09..03a504ed8cddb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -29,7 +29,7 @@ import org.apache.spark.unsafe.types.UTF8String private[sql] object StatFunctions extends Logging { /** Calculate the approximate quantile for the given column */ - private[sql] def approxQuantile( + def approxQuantile( df: DataFrame, col: String, quantile: Double, @@ -65,7 +65,7 @@ private[sql] object StatFunctions extends Logging { * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) * */ - private class QuantileSummaries( + case class QuantileSummaries( compress_threshold: Int = 1000, epsilon: Double = 0.05) extends Serializable { var sampled = new ArrayBuffer[(Double, Int, Int)]() // sampled examples diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala new file mode 100644 index 0000000000000..6e5df2fd8b845 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -0,0 +1,5 @@ +package org.apache.spark.sql.execution.stat + +class ApproxQuantileSuite { + +} From 1ce0bfc057f15a62161478769b1346d11e7c679a Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 11 Feb 2016 09:54:01 -0800 Subject: [PATCH 03/15] adding tests --- .../sql/execution/stat/StatFunctions.scala | 35 ++++++++---- .../execution/stat/ApproxQuantileSuite.scala | 56 ++++++++++++++++++- 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 03a504ed8cddb..f51ce9d702c7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -33,10 +33,11 @@ private[sql] object StatFunctions extends Logging { df: DataFrame, col: String, quantile: Double, - epsilon: Double = 0.05): Double = { + epsilon: Double = QuantileSummaries.defaultEpsilon): Double = { + // TODO: allow the extrema, they are stored in the statistics require(quantile > 0.0 && quantile < 1.0, "Quantile must be in the range of (0.0, 1.0).") - val summeries = collectQuantileSummaries(df, col, epsilon) - summeries.query(quantile) + val summaries = collectQuantileSummaries(df, col, epsilon) + summaries.query(quantile) } private def collectQuantileSummaries( @@ -49,12 +50,12 @@ private[sql] object StatFunctions extends Logging { s"with dataType ${data.get.dataType} not supported.") val column = Column(Cast(Column(col).expr, DoubleType)) - df.select(column).rdd.aggregate(new QuantileSummaries(epsilon = epsilon))( - seqOp = (summeries, row) => { - summeries.insert(row.getDouble(0)) + df.select(column).rdd.aggregate(QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))( + seqOp = (summaries, row) => { + summaries.insert(row.getDouble(0)) }, - combOp = (baseSummeries, other) => { - baseSummeries.merge(other) + combOp = (baseSummaries, other) => { + baseSummaries.merge(other) }) } @@ -66,8 +67,8 @@ private[sql] object StatFunctions extends Logging { * */ case class QuantileSummaries( - compress_threshold: Int = 1000, - epsilon: Double = 0.05) extends Serializable { + compressThreshold: Int, + epsilon: Double) extends Serializable { var sampled = new ArrayBuffer[(Double, Int, Int)]() // sampled examples var count = 0L // count of observed examples @@ -87,7 +88,7 @@ private[sql] object StatFunctions extends Logging { sampled.insert(idx, tuple) count += 1 - if (sampled.size > compress_threshold) { + if (sampled.size > compressThreshold) { compress() } this @@ -153,6 +154,18 @@ private[sql] object StatFunctions extends Logging { } } + object QuantileSummaries { + /** + * The default value for the compression threshold. + */ + val defaultCompressThreshold: Int = 1000 + + /** + * The default value for epsilon. + */ + val defaultEpsilon: Double = 0.01 + } + /** Calculate the Pearson Correlation Coefficient for the given columns */ private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { val counts = collectStatisticalData(df, cols, "correlation") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index 6e5df2fd8b845..0b7f1a08fc091 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -1,5 +1,59 @@ package org.apache.spark.sql.execution.stat -class ApproxQuantileSuite { +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries +class ApproxQuantileSuite extends SparkFunSuite { + + private val n = 10000 + private val increasing = (0 to n).map(_.toDouble) + private val decreasing = (n to 0 by -1).map(_.toDouble) + private val random = Seq.fill(n)(math.random) + + private def buildSummary( + data: Seq[Double], + epsi: Double, + threshold: Int = QuantileSummaries.defaultCompressThreshold): QuantileSummaries = { + val summary = QuantileSummaries(threshold, epsi) + data.foreach(summary.insert) + summary + } + + // The naive implementation of quantile calculations, for reference checks. + def naiveQuantile(quant: Double, sorted_data: Seq[Double]): Double = { + val n = sorted_data.size + sorted_data.take(math.floor(quant * n).toInt).last + } + + private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { + val approx = summary.query(quant) + val approx_rank = data.count(_ <= approx) + val lower = (quant - summary.epsilon) * data.size + assert(approx_rank >= lower, + s"approx_rank: $approx_rank ! >= $lower") + val upper = (quant + summary.epsilon) * data.size + assert(approx_rank <= upper, + s"approx_rank: $approx_rank ! <= $upper") + } + + for { + (data, seq_name) <- Seq(increasing, decreasing, random).zip(Seq("increasing", "decreasing", "random")) + epsi <- Seq(0.1, 0.01) + } { + + test(s"Extremas with epsi=$epsi and seq=$seq_name") { + val s = buildSummary(data, epsi) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(0.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + } + + test(s"Some quantile values with epsi=$epsi and seq=$seq_name") { + val s = buildSummary(data, epsi) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + } + } } From 21e81ef00e54e920db3b1f4566b65fd923c82e31 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 11 Feb 2016 14:23:51 -0800 Subject: [PATCH 04/15] started checking merging --- .../sql/execution/stat/StatFunctions.scala | 156 +++++++++++++++--- .../execution/stat/ApproxQuantileSuite.scala | 59 ++++--- 2 files changed, 164 insertions(+), 51 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index f51ce9d702c7d..77dadb48c214e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.stat +import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging @@ -28,6 +29,9 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String private[sql] object StatFunctions extends Logging { + + import QuantileSummaries.Stats + /** Calculate the approximate quantile for the given column */ def approxQuantile( df: DataFrame, @@ -50,7 +54,7 @@ private[sql] object StatFunctions extends Logging { s"with dataType ${data.get.dataType} not supported.") val column = Column(Cast(Column(col).expr, DoubleType)) - df.select(column).rdd.aggregate(QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))( + df.select(column).rdd.aggregate(new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))( seqOp = (summaries, row) => { summaries.insert(row.getDouble(0)) }, @@ -66,16 +70,20 @@ private[sql] object StatFunctions extends Logging { * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) * */ - case class QuantileSummaries( - compressThreshold: Int, - epsilon: Double) extends Serializable { - var sampled = new ArrayBuffer[(Double, Int, Int)]() // sampled examples - var count = 0L // count of observed examples + class QuantileSummaries( + val compressThreshold: Int, + val epsilon: Double, + val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty, + private var count: Long = 0L) extends Serializable { + +// +// val sampled = new ArrayBuffer[Stats]() // sampled examples +// var count = 0L // count of observed examples - def getConstant(): Double = 2 * epsilon * count + private def getConstant(): Double = 2 * epsilon * count def insert(x: Double): this.type = { - var idx = sampled.indexWhere(_._1 > x) + var idx = sampled.indexWhere(_.value > x) if (idx == -1) { idx = sampled.size } @@ -84,7 +92,7 @@ private[sql] object StatFunctions extends Logging { } else { math.floor(getConstant()).toInt } - val tuple = (x, 1, delta) + val tuple = Stats(x, 1, delta) sampled.insert(idx, tuple) count += 1 @@ -95,35 +103,78 @@ private[sql] object StatFunctions extends Logging { } def compress(): Unit = { + val compressed = compressImmut(sampled) + sampled.clear() + sampled.appendAll(compressed) + return var i = 0 while (i < sampled.size - 1) { val sample1 = sampled(i) val sample2 = sampled(i + 1) - if (sample1._2 + sample2._2 + sample2._3 < math.floor(getConstant())) { - sampled.update(i + 1, (sample2._1, sample1._2 + sample2._2, sample2._3)) + if (sample1.g + sample2.g + sample2.delta < math.floor(getConstant())) { + sampled.update(i + 1, Stats(sample2.value, sample1.g + sample2.g, sample2.delta)) sampled.remove(i) } i += 1 } } + def printBuffer(buff: Seq[Stats]): String = { + var rankMin = 0 + buff.map { s => + rankMin += s.g +// (s.value, rankMin, rankMin + rankMin + s.delta, s.g, s.delta) + s"${s.value}:$rankMin" + }.mkString(" ") + } + + def compressImmut(currentSamples: IndexedSeq[Stats]): ArrayBuffer[Stats] = { + val res: ArrayBuffer[Stats] = ArrayBuffer.empty + val mergeThreshold = 2 * epsilon * count + // Start for the last element, which is always part of the set. + // The head contains the current new head, that may be merged with the current element. + var head = currentSamples.last + var i = currentSamples.size - 2 + // Do not compress the last element + while (i >= 1) { + // The current sample: + val sample1 = currentSamples(i) + // Do we need to compress? + if (sample1.g + head.g + head.delta < mergeThreshold) { + // Do not insert yet, just merge the current element into the head. + head = head.copy(g = head.g + sample1.g) + } else { + // Prepend the current head, and keep the current sample as target for merging. + res.prepend(head) + head = sample1 + } + i -= 1 + } + res.prepend(head) + // If necessary, add the minimum element: + res.prepend(currentSamples.head) + println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") + res + } + def merge(other: QuantileSummaries): QuantileSummaries = { + return mergeImmutable(other) if (other.count > 0 && count > 0) { other.sampled.foreach { sample => - val idx = sampled.indexWhere(s => s._1 > sample._1) + val idx = sampled.indexWhere(s => s.value > sample.value) if (idx == 0) { - val new_sampled = (sampled(0)._1, sampled(0)._2, sampled(1)._3 / 2) + val new_sampled = Stats(sampled(0).value, sampled(0).g, sampled(1).delta / 2) sampled.update(0, new_sampled) - val new_sample = (sample._1, sample._2, 0) + val new_sample = Stats(sample.value, sample.g, 0) sampled.insert(0, new_sample) } else if (idx == -1) { - val new_sampled = (sampled(sampled.size - 1)._1, sampled(sampled.size - 1)._2, - (sampled(sampled.size - 2)._3 * 2 * epsilon).toInt) + val new_sampled = Stats(sampled(sampled.size - 1).value, sampled(sampled.size - 1).g, + (sampled(sampled.size - 2).delta * 2 * epsilon).toInt) sampled.update(sampled.size - 1, new_sampled) - val new_sample = (sample._1, sample._2, 0) + val new_sample = Stats(sample.value, sample.g, 0) sampled.insert(sampled.size, new_sample) } else { - val new_sample = (sample._1, sample._2, (sampled(idx - 1)._3 + sampled(idx)._3) / 2) + val new_sample = Stats(sample.value, sample.g, (sampled(idx - 1).delta + sampled(idx).delta) / 2) sampled.insert(idx, new_sample) } } @@ -137,20 +188,67 @@ private[sql] object StatFunctions extends Logging { } } + def mergeImmutable(other: QuantileSummaries): QuantileSummaries = { + if (other.count == 0) { + this + } else if (count == 0) { + other + } else { + // We rely on the fact that they are ordered to efficiently interleave them. + val thisSampled = sampled.toList + val otherSampled = other.sampled.toList + val res: ArrayBuffer[Stats] = ArrayBuffer.empty + + @tailrec + def mergeCurrent(thisList: List[Stats], otherList: List[Stats]): Unit = (thisList, otherList) match { + case (Nil, l) => + res.appendAll(l) + case (l, Nil) => + res.appendAll(l) + case (h1 :: t1, h2 :: t2) if h1.value > h2.value => + mergeCurrent(otherList, thisList) + case (h1 :: t1, l) => + // We know that h1.value <= all values in l + // TODO(thunterdb) do we need to adjust g and delta? + res.append(h1) + mergeCurrent(t1, l) + } + + mergeCurrent(thisSampled, otherSampled) + val comp = compressImmut(res) + new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count) + } + } + def query(quantile: Double): Double = { - val rank = (quantile * count).toInt + require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]") + + if (quantile <= epsilon) { + return sampled.head.value + } + + if (quantile >= 1 - epsilon) { + return sampled.last.value + } + + // Target rank + val rank = math.ceil(quantile * count).toInt + val targetError = math.ceil(epsilon * count) + println(s"query: quantile=$quantile, rank=$rank, targetError=$targetError") + // Minimum rank at current sample var minRank = 0 var i = 1 - while (i < sampled.size) { + while (i < sampled.size - 1) { val curSample = sampled(i) - val prevSample = sampled(i - 1) - minRank += prevSample._2 - if (minRank + curSample._2 + curSample._3 > rank + getConstant()) { - return prevSample._1 + minRank += curSample.g + val maxRank = minRank + curSample.delta + println(s"bracket $i: minRank=$minRank maxRank=$maxRank") + if (maxRank - targetError <= rank && rank <= minRank + targetError) { + return curSample.value } i += 1 } - return sampled.last._1 + sampled.last.value } } @@ -164,6 +262,14 @@ private[sql] object StatFunctions extends Logging { * The default value for epsilon. */ val defaultEpsilon: Double = 0.01 + + /** + * Statisttics from the Greenwald-Khanna paper. + * @param value the sampled value + * @param g the minimum rank jump from the previous value's minimum rank + * @param delta the maximum span of the rank. + */ + case class Stats(value: Double, g: Int, delta: Int) } /** Calculate the Pearson Correlation Coefficient for the given columns */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index 0b7f1a08fc091..3622c4c30255c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -1,59 +1,66 @@ package org.apache.spark.sql.execution.stat +import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries + class ApproxQuantileSuite extends SparkFunSuite { - private val n = 10000 - private val increasing = (0 to n).map(_.toDouble) - private val decreasing = (n to 0 by -1).map(_.toDouble) - private val random = Seq.fill(n)(math.random) + private val r = new Random(1) + private val n = 100 + private val increasing = "increasing" -> (0 to n).map(_.toDouble) + private val decreasing = "decreasing" -> (n to 0 by -1).map(_.toDouble) + private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000)) private def buildSummary( data: Seq[Double], epsi: Double, - threshold: Int = QuantileSummaries.defaultCompressThreshold): QuantileSummaries = { - val summary = QuantileSummaries(threshold, epsi) + threshold: Int): QuantileSummaries = { + val summary = new QuantileSummaries(threshold, epsi) data.foreach(summary.insert) summary } - // The naive implementation of quantile calculations, for reference checks. - def naiveQuantile(quant: Double, sorted_data: Seq[Double]): Double = { - val n = sorted_data.size - sorted_data.take(math.floor(quant * n).toInt).last - } - private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { val approx = summary.query(quant) - val approx_rank = data.count(_ <= approx) - val lower = (quant - summary.epsilon) * data.size - assert(approx_rank >= lower, - s"approx_rank: $approx_rank ! >= $lower") - val upper = (quant + summary.epsilon) * data.size - assert(approx_rank <= upper, - s"approx_rank: $approx_rank ! <= $upper") + // The rank of the approximation. + val rank = data.count(_ < approx) // has to be <, not <= to be exact + val lower = math.floor((quant - summary.epsilon) * data.size) + assert(rank >= lower, + s"approx_rank: $rank ! >= $lower, requested quantile = $quant") + val upper = math.ceil((quant + summary.epsilon) * data.size) + assert(rank <= upper, + s"approx_rank: $rank ! <= $upper, requested quantile = $quant") } for { - (data, seq_name) <- Seq(increasing, decreasing, random).zip(Seq("increasing", "decreasing", "random")) - epsi <- Seq(0.1, 0.01) + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) } { - test(s"Extremas with epsi=$epsi and seq=$seq_name") { - val s = buildSummary(data, epsi) + test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s = buildSummary(data, epsi, compression) +// println(s"samples: ${s.sampled}") val min_approx = s.query(0.0) assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(0.0) + val max_approx = s.query(1.0) assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") } - test(s"Some quantile values with epsi=$epsi and seq=$seq_name") { - val s = buildSummary(data, epsi) + test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s = buildSummary(data, epsi, compression) + println(s"samples: ${s.sampled}") + println(s"ranks: ${s.printBuffer(s.sampled)}") + checkQuantile(0.9999, data, s) checkQuantile(0.9, data, s) checkQuantile(0.5, data, s) checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) } } + + // Tests for merging procedure } From ac4bc97b2cfcfc9823f33ae2e35e65b223d952dd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 12 Feb 2016 12:41:41 +0000 Subject: [PATCH 05/15] Fix scala style. --- .../org/apache/spark/sql/DataFrameStatFunctions.scala | 6 +++--- .../apache/spark/sql/execution/stat/StatFunctions.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 9b872eb48fd51..7f110c4e7f34b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -35,17 +35,17 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch} */ @Experimental final class DataFrameStatFunctions private[sql](df: DataFrame) { - + /** * Calculate the approximate quantile of numerical column of a DataFrame. * @param col the name of the column * @param quantile the quantile number - * @return the approximate quantile + * @return the approximate quantile */ def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = { StatFunctions.approxQuantile(df, col, quantile, epsilon) } - + /** * Calculate the sample covariance of two numerical columns of a DataFrame. * @param col1 the name of the first column diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 2687642c2cf09..e98e579ee2328 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -36,7 +36,7 @@ private[sql] object StatFunctions extends Logging { epsilon: Double = 0.05): Double = { require(quantile > 0.0 && quantile < 1.0, "Quantile must be in the range of (0.0, 1.0).") val summeries = collectQuantileSummaries(df, col, epsilon) - summeries.query(quantile) + summeries.query(quantile) } private def collectQuantileSummaries( @@ -63,7 +63,7 @@ private[sql] object StatFunctions extends Logging { * This implementation is based on the algorithm proposed in the paper: * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) - * + * */ private class QuantileSummaries( compress_threshold: Int = 1000, @@ -77,12 +77,12 @@ private[sql] object StatFunctions extends Logging { var idx = sampled.indexWhere(_._1 > x) if (idx == -1) { idx = sampled.size - } + } val delta = if (idx == 0 || idx == sampled.size) { 0 } else { math.floor(getConstant()).toInt - } + } val tuple = (x, 1, delta) sampled.insert(idx, tuple) count += 1 From d320fd20afa9873f13794cb0976c7602d6278ee0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 12 Feb 2016 13:00:33 +0000 Subject: [PATCH 06/15] Fix scala style. --- .../scala/org/apache/spark/sql/DataFrameStatSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index ea6af410afee0..6cf823efb91e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -138,9 +138,9 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { val error_1 = 2 * 1000 * epsilon val error_2 = 2 * 2000 * epsilon - - assert(math.abs(result1 - expected_1) < error_1) - assert(math.abs(result2 - expected_2) < error_2) + + assert(math.abs(result1 - expected_1) < error_1) + assert(math.abs(result2 - expected_2) < error_2) } } From 253f488f642322cd9964aa25ca675219d47cab8e Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Wed, 17 Feb 2016 14:51:25 -0800 Subject: [PATCH 07/15] branched off to work on a simpler batch merging code --- .../apache/spark/sql/execution/stat/StatFunctions.scala | 7 ++++--- .../spark/sql/execution/stat/ApproxQuantileSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 77dadb48c214e..97d2ccf261b90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -82,7 +82,7 @@ private[sql] object StatFunctions extends Logging { private def getConstant(): Double = 2 * epsilon * count - def insert(x: Double): this.type = { + def insert(x: Double): QuantileSummaries = { var idx = sampled.indexWhere(_.value > x) if (idx == -1) { idx = sampled.size @@ -102,11 +102,11 @@ private[sql] object StatFunctions extends Logging { this } - def compress(): Unit = { + def compress(): QuantileSummaries = { val compressed = compressImmut(sampled) sampled.clear() sampled.appendAll(compressed) - return + return this var i = 0 while (i < sampled.size - 1) { val sample1 = sampled(i) @@ -117,6 +117,7 @@ private[sql] object StatFunctions extends Logging { } i += 1 } + this } def printBuffer(buff: Seq[Stats]): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index 3622c4c30255c..de7469feb904d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -18,8 +18,8 @@ class ApproxQuantileSuite extends SparkFunSuite { data: Seq[Double], epsi: Double, threshold: Int): QuantileSummaries = { - val summary = new QuantileSummaries(threshold, epsi) - data.foreach(summary.insert) + var summary = new QuantileSummaries(threshold, epsi) + data.foreach(x => summary = summary.insert(x)) summary } From e48baddbc530ea52bce4e2aa96800b4e658a3c47 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 18 Feb 2016 09:40:04 -0800 Subject: [PATCH 08/15] insert tests --- .../sql/execution/stat/StatFunctions.scala | 45 ++++++++++++++++- .../execution/stat/ApproxQuantileSuite.scala | 48 ++++++++++++++++++- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 97d2ccf261b90..d7df9e87e0435 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -74,7 +74,8 @@ private[sql] object StatFunctions extends Logging { val compressThreshold: Int, val epsilon: Double, val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty, - private var count: Long = 0L) extends Serializable { + private var count: Long = 0L, + val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable { // // val sampled = new ArrayBuffer[Stats]() // sampled examples @@ -83,6 +84,7 @@ private[sql] object StatFunctions extends Logging { private def getConstant(): Double = 2 * epsilon * count def insert(x: Double): QuantileSummaries = { + headSampled.append(x) var idx = sampled.indexWhere(_.value > x) if (idx == -1) { idx = sampled.size @@ -98,12 +100,51 @@ private[sql] object StatFunctions extends Logging { if (sampled.size > compressThreshold) { compress() + } else { + this } - this + } + + /** + * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse the summary statistics in + * a single batch. + * @param array + * @return a new quantile summary object. + */ + def insertBatch(array: Array[Double]): QuantileSummaries = { + var currentCount = count + val sorted = array.sorted + val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]() + // The index of the next element to insert + var sampleIdx = 0 + // The index of the sample currently being inserted. + var opsIdx: Int = 0 + while(opsIdx < sorted.length) { + val currentSample = sorted(opsIdx) + // Add all the samples before the next observation. + while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) { + newSamples.append(sampled(sampleIdx)) + sampleIdx += 1 + } + + // If it is the first one to insert, of if it is the last one + currentCount += 1 + val delta = if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { + 0 + } else { + math.floor(2 * epsilon * currentCount).toInt + } + val tuple = Stats(currentSample, 1, delta) + newSamples.append(tuple) + + opsIdx += 1 + } + new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount) } def compress(): QuantileSummaries = { val compressed = compressImmut(sampled) + return new QuantileSummaries(compressThreshold, epsilon, compressed, count) sampled.clear() sampled.appendAll(compressed) return this diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index de7469feb904d..c6e1b0d772bc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -20,7 +20,7 @@ class ApproxQuantileSuite extends SparkFunSuite { threshold: Int): QuantileSummaries = { var summary = new QuantileSummaries(threshold, epsi) data.foreach(x => summary = summary.insert(x)) - summary + summary.compress() } private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { @@ -63,4 +63,50 @@ class ApproxQuantileSuite extends SparkFunSuite { } // Tests for merging procedure + for { + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) + } { + + val (data1, data2) = { + val l = data.size + data.take(l / 2) -> data.drop(l / 2) + } + + test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s1 = buildSummary(data1, epsi, compression) + val s2 = buildSummary(data2, epsi, compression) + val s = s1.merge(s2) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + + val (data11, data12) = { + data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq + } + + test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s1 = buildSummary(data11, epsi, compression) + val s2 = buildSummary(data12, epsi, compression) + val s = s1.merge(s2) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + } + } From 2cba6c1b6ab0e08d5479f2148fd404cecd1152be Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 18 Feb 2016 09:43:07 -0800 Subject: [PATCH 09/15] tentative batch algorithm --- .../apache/spark/sql/execution/stat/StatFunctions.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index d7df9e87e0435..5b3fe9be330c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -77,6 +77,7 @@ private[sql] object StatFunctions extends Logging { private var count: Long = 0L, val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable { + import QuantileSummaries._ // // val sampled = new ArrayBuffer[Stats]() // sampled examples // var count = 0L // count of observed examples @@ -85,6 +86,9 @@ private[sql] object StatFunctions extends Logging { def insert(x: Double): QuantileSummaries = { headSampled.append(x) + if (headSampled.size >= defaultHeadSize) { + return insertBatch(headSampled.toArray) + } var idx = sampled.indexWhere(_.value > x) if (idx == -1) { idx = sampled.size @@ -300,6 +304,11 @@ private[sql] object StatFunctions extends Logging { */ val defaultCompressThreshold: Int = 1000 + /** + * The size of the head buffer. + */ + val defaultHeadSize: Int = 50 + /** * The default value for epsilon. */ From cbb1bb5e33f2242033ddd80e820e581808e870fc Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 18 Feb 2016 10:45:40 -0800 Subject: [PATCH 10/15] finally batch sampling is working --- .../sql/execution/stat/StatFunctions.scala | 147 ++++++++++++------ .../execution/stat/ApproxQuantileSuite.scala | 24 ++- 2 files changed, 117 insertions(+), 54 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 5b3fe9be330c5..d48ea9900cf7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -74,7 +74,7 @@ private[sql] object StatFunctions extends Logging { val compressThreshold: Int, val epsilon: Double, val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty, - private var count: Long = 0L, + private[stat] var count: Long = 0L, val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable { import QuantileSummaries._ @@ -87,7 +87,9 @@ private[sql] object StatFunctions extends Logging { def insert(x: Double): QuantileSummaries = { headSampled.append(x) if (headSampled.size >= defaultHeadSize) { - return insertBatch(headSampled.toArray) + return this.withHeadInserted + } else { + return this } var idx = sampled.indexWhere(_.value > x) if (idx == -1) { @@ -112,12 +114,19 @@ private[sql] object StatFunctions extends Logging { /** * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse the summary statistics in * a single batch. - * @param array + * + * This method does not modify the current object and returns if necessary a new copy. + * * @return a new quantile summary object. */ - def insertBatch(array: Array[Double]): QuantileSummaries = { + def withHeadInserted: QuantileSummaries = { + if (headSampled.isEmpty) { + return this + } + println(s"insertBatch: samples before = ${printBuffer(sampled)}") + println(s"insertBatch: head before = ${headSampled}") var currentCount = count - val sorted = array.sorted + val sorted = headSampled.toArray.sorted val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]() // The index of the next element to insert var sampleIdx = 0 @@ -140,15 +149,29 @@ private[sql] object StatFunctions extends Logging { } val tuple = Stats(currentSample, 1, delta) newSamples.append(tuple) - opsIdx += 1 } + + // Add all the remaining existing samples + while(sampleIdx < sampled.size) { + newSamples.append(sampled(sampleIdx)) + sampleIdx += 1 + } + + + println(s"insertBatch: samples after = ${printBuffer(newSamples)}") new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount) } def compress(): QuantileSummaries = { - val compressed = compressImmut(sampled) - return new QuantileSummaries(compressThreshold, epsilon, compressed, count) + // Inserts all the elements first + val inserted = this.withHeadInserted + println(s"compress: inserted samples = ${printBuffer(inserted.sampled)}") + assert(inserted.headSampled.isEmpty) + assert(inserted.count == count + headSampled.size) + val compressed = compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count) + println(s"compress: compressed samples = ${printBuffer(compressed)}") + return new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count) sampled.clear() sampled.appendAll(compressed) return this @@ -165,43 +188,34 @@ private[sql] object StatFunctions extends Logging { this } - def printBuffer(buff: Seq[Stats]): String = { - var rankMin = 0 - buff.map { s => - rankMin += s.g -// (s.value, rankMin, rankMin + rankMin + s.delta, s.g, s.delta) - s"${s.value}:$rankMin" - }.mkString(" ") - } - - def compressImmut(currentSamples: IndexedSeq[Stats]): ArrayBuffer[Stats] = { - val res: ArrayBuffer[Stats] = ArrayBuffer.empty - val mergeThreshold = 2 * epsilon * count - // Start for the last element, which is always part of the set. - // The head contains the current new head, that may be merged with the current element. - var head = currentSamples.last - var i = currentSamples.size - 2 - // Do not compress the last element - while (i >= 1) { - // The current sample: - val sample1 = currentSamples(i) - // Do we need to compress? - if (sample1.g + head.g + head.delta < mergeThreshold) { - // Do not insert yet, just merge the current element into the head. - head = head.copy(g = head.g + sample1.g) - } else { - // Prepend the current head, and keep the current sample as target for merging. - res.prepend(head) - head = sample1 - } - i -= 1 - } - res.prepend(head) - // If necessary, add the minimum element: - res.prepend(currentSamples.head) - println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") - res - } +// def compressImmut(currentSamples: IndexedSeq[Stats]): ArrayBuffer[Stats] = { +// val res: ArrayBuffer[Stats] = ArrayBuffer.empty +// val mergeThreshold = 2 * epsilon * count +// // Start for the last element, which is always part of the set. +// // The head contains the current new head, that may be merged with the current element. +// var head = currentSamples.last +// var i = currentSamples.size - 2 +// // Do not compress the last element +// while (i >= 1) { +// // The current sample: +// val sample1 = currentSamples(i) +// // Do we need to compress? +// if (sample1.g + head.g + head.delta < mergeThreshold) { +// // Do not insert yet, just merge the current element into the head. +// head = head.copy(g = head.g + sample1.g) +// } else { +// // Prepend the current head, and keep the current sample as target for merging. +// res.prepend(head) +// head = sample1 +// } +// i -= 1 +// } +// res.prepend(head) +// // If necessary, add the minimum element: +// res.prepend(currentSamples.head) +// println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") +// res +// } def merge(other: QuantileSummaries): QuantileSummaries = { return mergeImmutable(other) @@ -261,7 +275,7 @@ private[sql] object StatFunctions extends Logging { } mergeCurrent(thisSampled, otherSampled) - val comp = compressImmut(res) + val comp = compressImmut(res, mergeThreshold = 2 * epsilon * count) new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count) } } @@ -307,7 +321,7 @@ private[sql] object StatFunctions extends Logging { /** * The size of the head buffer. */ - val defaultHeadSize: Int = 50 + val defaultHeadSize: Int = 500000 /** * The default value for epsilon. @@ -321,6 +335,45 @@ private[sql] object StatFunctions extends Logging { * @param delta the maximum span of the rank. */ case class Stats(value: Double, g: Int, delta: Int) + + def printBuffer(buff: Seq[Stats]): String = { + var rankMin = 0 + buff.map { s => + rankMin += s.g + // (s.value, rankMin, rankMin + rankMin + s.delta, s.g, s.delta) + s"${s.value}:$rankMin" + }.mkString(" ") + } + + def compressImmut(currentSamples: IndexedSeq[Stats], mergeThreshold: Double): ArrayBuffer[Stats] = { + val res: ArrayBuffer[Stats] = ArrayBuffer.empty +// val mergeThreshold = 2 * epsilon * count + // Start for the last element, which is always part of the set. + // The head contains the current new head, that may be merged with the current element. + var head = currentSamples.last + var i = currentSamples.size - 2 + // Do not compress the last element + while (i >= 1) { + // The current sample: + val sample1 = currentSamples(i) + // Do we need to compress? + if (sample1.g + head.g + head.delta < mergeThreshold) { + // Do not insert yet, just merge the current element into the head. + head = head.copy(g = head.g + sample1.g) + } else { + // Prepend the current head, and keep the current sample as target for merging. + res.prepend(head) + head = sample1 + } + i -= 1 + } + res.prepend(head) + // If necessary, add the minimum element: + res.prepend(currentSamples.head) + println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") + res + } + } /** Calculate the Pearson Correlation Coefficient for the given columns */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index c6e1b0d772bc1..d4804915eee1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -10,8 +10,8 @@ class ApproxQuantileSuite extends SparkFunSuite { private val r = new Random(1) private val n = 100 - private val increasing = "increasing" -> (0 to n).map(_.toDouble) - private val decreasing = "decreasing" -> (n to 0 by -1).map(_.toDouble) + private val increasing = "increasing" -> (0 until n).map(_.toDouble) + private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble) private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000)) private def buildSummary( @@ -19,7 +19,10 @@ class ApproxQuantileSuite extends SparkFunSuite { epsi: Double, threshold: Int): QuantileSummaries = { var summary = new QuantileSummaries(threshold, epsi) - data.foreach(x => summary = summary.insert(x)) + data.foreach { x => + println(s"buildSummary: $x ${summary.hashCode()}") + summary = summary.insert(x) + } summary.compress() } @@ -36,9 +39,12 @@ class ApproxQuantileSuite extends SparkFunSuite { } for { - (seq_name, data) <- Seq(increasing, decreasing, random) - epsi <- Seq(0.1, 0.0001) - compression <- Seq(1000, 10) +// (seq_name, data) <- Seq(increasing, decreasing, random) +// epsi <- Seq(0.1, 0.0001) +// compression <- Seq(1000, 10) + (seq_name, data) <- Seq(increasing, decreasing, random).slice(0,3) + epsi <- Seq(0.1, 0.0001) //.headOption + compression <- Seq(1000, 10) //.drop(1) } { test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { @@ -52,8 +58,9 @@ class ApproxQuantileSuite extends SparkFunSuite { test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") { val s = buildSummary(data, epsi, compression) + assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}") println(s"samples: ${s.sampled}") - println(s"ranks: ${s.printBuffer(s.sampled)}") + println(s"ranks: ${QuantileSummaries.printBuffer(s.sampled)}") checkQuantile(0.9999, data, s) checkQuantile(0.9, data, s) checkQuantile(0.5, data, s) @@ -67,6 +74,9 @@ class ApproxQuantileSuite extends SparkFunSuite { (seq_name, data) <- Seq(increasing, decreasing, random) epsi <- Seq(0.1, 0.0001) compression <- Seq(1000, 10) +// (seq_name, data) <- Seq(increasing, decreasing, random) +// epsi <- Seq(0.1, 0.0001) +// compression <- Seq(1000, 10) } { val (data1, data2) = { From 773b20f88bd9a7cdcd3b3ab8db69f7a786b9780f Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 18 Feb 2016 11:35:32 -0800 Subject: [PATCH 11/15] cleanups --- .../sql/execution/stat/StatFunctions.scala | 132 ++++++++++++------ .../apache/spark/sql/DataFrameStatSuite.scala | 37 +++++ 2 files changed, 130 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index d48ea9900cf7e..b6fd29934a1cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -32,18 +32,85 @@ private[sql] object StatFunctions extends Logging { import QuantileSummaries.Stats - /** Calculate the approximate quantile for the given column */ + /** + * Calculates the approximate quantile for the given column. + * + * If you need to compute multiple quantiles at once, you should use [[multipleApproxQuantiles]] + * + * Note on the target error. + * + * The result of this algorithm has the following deterministic bound: if the DataFrame has N elements and if we + * request the quantile `phi` up to error `epsi`, then the algorithm will return a sample `x` from the DataFrame so + * that the *exact* rank of `x` close to (phi * N). More precisely: + * + * floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N) + * + * Note on the algorithm used. + * + * This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The + * algorithm was first present in the following article: + * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael + * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) + * + * The performance optimizations are detailed in the comments of the implementation. + * + * @param df the dataframe to estimate quantiles on + * @param col the name of the column + * @param quantile the target quantile of interest + * @param epsilon the target error. Should be >= 0. + * */ def approxQuantile( df: DataFrame, col: String, quantile: Double, epsilon: Double = QuantileSummaries.defaultEpsilon): Double = { - // TODO: allow the extrema, they are stored in the statistics - require(quantile > 0.0 && quantile < 1.0, "Quantile must be in the range of (0.0, 1.0).") + require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).") val summaries = collectQuantileSummaries(df, col, epsilon) summaries.query(quantile) } + /** + * Runs multiple quantile computations in a single pass, with the same target error. + * + * See [[approxQuantile)]] for more details on the approximation guarantees. + * + * @param df the dataframe + * @param cols columns of the dataframe + * @param quantiles target quantiles to compute + * @param epsilon the precision to achieve + * @return for each column, returns the requested approximations + */ + def multipleApproxQuantiles( + df: DataFrame, + cols: Seq[String], + quantiles: Seq[Double], + epsilon: Double): Seq[Seq[Double]] = { + val columns: Seq[Column] = cols.map { colName => + val field = df.schema(colName) + require(field.dataType.isInstanceOf[NumericType], + s"Quantile calculation for column $colName with data type ${field.dataType} is not supported.") + Column(Cast(Column(colName).expr, DoubleType)) + } + val emptySummaries = Array.fill(cols.size)( + new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon)) + + def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = { + var i = 0 + while (i < summaries.length) { + summaries(i) = summaries(i).insert(row.getDouble(i)) + i += 1 + } + summaries + } + + def merge(sum1: Array[QuantileSummaries], sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { + sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) } + } + val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge) + + summaries.map { summary => quantiles.map(summary.query) } + } + private def collectQuantileSummaries( df: DataFrame, col: String, @@ -51,7 +118,7 @@ private[sql] object StatFunctions extends Logging { val data = df.schema.fields.find(_.name == col) require(data.nonEmpty, s"Couldn't find column with name $col") require(data.get.dataType.isInstanceOf[NumericType], "Quantile calculation for column " + - s"with dataType ${data.get.dataType} not supported.") + s"with dataType ${data.get.dataType} not supported.") val column = Column(Cast(Column(col).expr, DoubleType)) df.select(column).rdd.aggregate(new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))( @@ -60,7 +127,7 @@ private[sql] object StatFunctions extends Logging { }, combOp = (baseSummaries, other) => { baseSummaries.merge(other) - }) + }) } /** @@ -68,7 +135,17 @@ private[sql] object StatFunctions extends Logging { * This implementation is based on the algorithm proposed in the paper: * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) - * + * + * In order to optimize for speed, it maintains an internal buffer of the last seen samples, and only inserts them + * after crossing a certain size threshold. This guarantees a near-constant runtime complexity compared to the + * original algorithm. + * + * @param compressThreshold the compression threshold: after the internal buffer of statistics crosses this size, it + * attempts to compress the statistics together + * @param epsilon the target precision + * @param sampled a buffer of quantile statistics. See the G-K article for more details + * @param count the count of all the elements *inserted in the sampled buffer* (excluding the head buffer) + * @param headSampled a buffer of latest samples seen so far */ class QuantileSummaries( val compressThreshold: Int, @@ -78,34 +155,13 @@ private[sql] object StatFunctions extends Logging { val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable { import QuantileSummaries._ -// -// val sampled = new ArrayBuffer[Stats]() // sampled examples -// var count = 0L // count of observed examples private def getConstant(): Double = 2 * epsilon * count def insert(x: Double): QuantileSummaries = { headSampled.append(x) if (headSampled.size >= defaultHeadSize) { - return this.withHeadInserted - } else { - return this - } - var idx = sampled.indexWhere(_.value > x) - if (idx == -1) { - idx = sampled.size - } - val delta = if (idx == 0 || idx == sampled.size) { - 0 - } else { - math.floor(getConstant()).toInt - } - val tuple = Stats(x, 1, delta) - sampled.insert(idx, tuple) - count += 1 - - if (sampled.size > compressThreshold) { - compress() + this.withHeadInserted } else { this } @@ -119,12 +175,10 @@ private[sql] object StatFunctions extends Logging { * * @return a new quantile summary object. */ - def withHeadInserted: QuantileSummaries = { + private def withHeadInserted: QuantileSummaries = { if (headSampled.isEmpty) { return this } - println(s"insertBatch: samples before = ${printBuffer(sampled)}") - println(s"insertBatch: head before = ${headSampled}") var currentCount = count val sorted = headSampled.toArray.sorted val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]() @@ -157,20 +211,17 @@ private[sql] object StatFunctions extends Logging { newSamples.append(sampled(sampleIdx)) sampleIdx += 1 } - - - println(s"insertBatch: samples after = ${printBuffer(newSamples)}") new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount) } def compress(): QuantileSummaries = { // Inserts all the elements first val inserted = this.withHeadInserted - println(s"compress: inserted samples = ${printBuffer(inserted.sampled)}") +// println(s"compress: inserted samples = ${printBuffer(inserted.sampled)}") assert(inserted.headSampled.isEmpty) assert(inserted.count == count + headSampled.size) val compressed = compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count) - println(s"compress: compressed samples = ${printBuffer(compressed)}") +// println(s"compress: compressed samples = ${printBuffer(compressed)}") return new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count) sampled.clear() sampled.appendAll(compressed) @@ -294,7 +345,7 @@ private[sql] object StatFunctions extends Logging { // Target rank val rank = math.ceil(quantile * count).toInt val targetError = math.ceil(epsilon * count) - println(s"query: quantile=$quantile, rank=$rank, targetError=$targetError") +// println(s"query: quantile=$quantile, rank=$rank, targetError=$targetError") // Minimum rank at current sample var minRank = 0 var i = 1 @@ -302,7 +353,7 @@ private[sql] object StatFunctions extends Logging { val curSample = sampled(i) minRank += curSample.g val maxRank = minRank + curSample.delta - println(s"bracket $i: minRank=$minRank maxRank=$maxRank") +// println(s"bracket $i: minRank=$minRank maxRank=$maxRank") if (maxRank - targetError <= rank && rank <= minRank + targetError) { return curSample.value } @@ -347,6 +398,9 @@ private[sql] object StatFunctions extends Logging { def compressImmut(currentSamples: IndexedSeq[Stats], mergeThreshold: Double): ArrayBuffer[Stats] = { val res: ArrayBuffer[Stats] = ArrayBuffer.empty + if (currentSamples.isEmpty) { + return res + } // val mergeThreshold = 2 * epsilon * count // Start for the last element, which is always part of the set. // The head contains the current new head, that may be merged with the current element. @@ -370,7 +424,7 @@ private[sql] object StatFunctions extends Logging { res.prepend(head) // If necessary, add the minimum element: res.prepend(currentSamples.head) - println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") +// println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") res } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index ea6af410afee0..d04342993db8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.Random +import org.apache.spark.Logging +import org.apache.spark.sql.execution.stat.StatFunctions import org.scalatest.Matchers._ import org.apache.spark.sql.functions.col @@ -290,3 +292,38 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(0.until(1000).forall(i => filter4.mightContain(i * 3))) } } + + +class DataFrameStatPerfSuite extends QueryTest with SharedSQLContext with Logging { + + // Turn on this test if you want to test the performance of approximate quantiles. + ignore("describe() should not be slowed down too much by quantiles") { + val df = sqlContext.range(5000000L).toDF("col1").cache() + def millis(f: => Any): Double = { + // Do some warmup + logDebug("warmup...") + for (i <- 1 to 10) { + f + } + logDebug("execute...") + // Do it 10 times and report median + val times = (1 to 10).map { i => + val start = System.nanoTime() + f + val end = System.nanoTime() + (end - start) / 1e9 + } + logDebug("execute done") + times.sum.toDouble / times.length.toDouble + + } + + logDebug("*** Normal describe ***") + val t1 = millis { df.describe() } + logDebug(s"T1 = $t1") + logDebug("*** Just quantiles ***") + val t2 = millis { StatFunctions.multipleApproxQuantiles(df, Seq("col1"), Seq(0.1, 0.25, 0.5, 0.75, 0.9), 0.01) } + logDebug(s"T1 = $t1, T2 = $t2") + } + +} From 5167bad639885dfb3ec1a78e8bc9383fa0f62be0 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 18 Feb 2016 12:01:17 -0800 Subject: [PATCH 12/15] finished cleanups --- .../sql/execution/stat/StatFunctions.scala | 127 ++---------------- .../apache/spark/sql/DataFrameStatSuite.scala | 42 +++--- .../execution/stat/ApproxQuantileSuite.scala | 16 +-- 3 files changed, 34 insertions(+), 151 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index b6fd29934a1cf..a7e9e92cc71ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -65,8 +65,8 @@ private[sql] object StatFunctions extends Logging { quantile: Double, epsilon: Double = QuantileSummaries.defaultEpsilon): Double = { require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).") - val summaries = collectQuantileSummaries(df, col, epsilon) - summaries.query(quantile) + val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), Seq(quantile), epsilon) + res } /** @@ -94,6 +94,8 @@ private[sql] object StatFunctions extends Logging { val emptySummaries = Array.fill(cols.size)( new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon)) + // Note that it works more or less by accident as `rdd.aggregate` is not a pure function: this function returns + // the same array as given in the input (because `aggregate` reuses the same argument). def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = { var i = 0 while (i < summaries.length) { @@ -111,25 +113,6 @@ private[sql] object StatFunctions extends Logging { summaries.map { summary => quantiles.map(summary.query) } } - private def collectQuantileSummaries( - df: DataFrame, - col: String, - epsilon: Double): QuantileSummaries = { - val data = df.schema.fields.find(_.name == col) - require(data.nonEmpty, s"Couldn't find column with name $col") - require(data.get.dataType.isInstanceOf[NumericType], "Quantile calculation for column " + - s"with dataType ${data.get.dataType} not supported.") - - val column = Column(Cast(Column(col).expr, DoubleType)) - df.select(column).rdd.aggregate(new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))( - seqOp = (summaries, row) => { - summaries.insert(row.getDouble(0)) - }, - combOp = (baseSummaries, other) => { - baseSummaries.merge(other) - }) - } - /** * Helper class to compute approximate quantile summary. * This implementation is based on the algorithm proposed in the paper: @@ -156,8 +139,6 @@ private[sql] object StatFunctions extends Logging { import QuantileSummaries._ - private def getConstant(): Double = 2 * epsilon * count - def insert(x: Double): QuantileSummaries = { headSampled.append(x) if (headSampled.size >= defaultHeadSize) { @@ -217,89 +198,13 @@ private[sql] object StatFunctions extends Logging { def compress(): QuantileSummaries = { // Inserts all the elements first val inserted = this.withHeadInserted -// println(s"compress: inserted samples = ${printBuffer(inserted.sampled)}") assert(inserted.headSampled.isEmpty) assert(inserted.count == count + headSampled.size) val compressed = compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count) -// println(s"compress: compressed samples = ${printBuffer(compressed)}") - return new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count) - sampled.clear() - sampled.appendAll(compressed) - return this - var i = 0 - while (i < sampled.size - 1) { - val sample1 = sampled(i) - val sample2 = sampled(i + 1) - if (sample1.g + sample2.g + sample2.delta < math.floor(getConstant())) { - sampled.update(i + 1, Stats(sample2.value, sample1.g + sample2.g, sample2.delta)) - sampled.remove(i) - } - i += 1 - } - this + new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count) } -// def compressImmut(currentSamples: IndexedSeq[Stats]): ArrayBuffer[Stats] = { -// val res: ArrayBuffer[Stats] = ArrayBuffer.empty -// val mergeThreshold = 2 * epsilon * count -// // Start for the last element, which is always part of the set. -// // The head contains the current new head, that may be merged with the current element. -// var head = currentSamples.last -// var i = currentSamples.size - 2 -// // Do not compress the last element -// while (i >= 1) { -// // The current sample: -// val sample1 = currentSamples(i) -// // Do we need to compress? -// if (sample1.g + head.g + head.delta < mergeThreshold) { -// // Do not insert yet, just merge the current element into the head. -// head = head.copy(g = head.g + sample1.g) -// } else { -// // Prepend the current head, and keep the current sample as target for merging. -// res.prepend(head) -// head = sample1 -// } -// i -= 1 -// } -// res.prepend(head) -// // If necessary, add the minimum element: -// res.prepend(currentSamples.head) -// println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") -// res -// } - def merge(other: QuantileSummaries): QuantileSummaries = { - return mergeImmutable(other) - if (other.count > 0 && count > 0) { - other.sampled.foreach { sample => - val idx = sampled.indexWhere(s => s.value > sample.value) - if (idx == 0) { - val new_sampled = Stats(sampled(0).value, sampled(0).g, sampled(1).delta / 2) - sampled.update(0, new_sampled) - val new_sample = Stats(sample.value, sample.g, 0) - sampled.insert(0, new_sample) - } else if (idx == -1) { - val new_sampled = Stats(sampled(sampled.size - 1).value, sampled(sampled.size - 1).g, - (sampled(sampled.size - 2).delta * 2 * epsilon).toInt) - sampled.update(sampled.size - 1, new_sampled) - val new_sample = Stats(sample.value, sample.g, 0) - sampled.insert(sampled.size, new_sample) - } else { - val new_sample = Stats(sample.value, sample.g, (sampled(idx - 1).delta + sampled(idx).delta) / 2) - sampled.insert(idx, new_sample) - } - } - count += other.count - compress() - this - } else if (other.count > 0) { - other - } else { - this - } - } - - def mergeImmutable(other: QuantileSummaries): QuantileSummaries = { if (other.count == 0) { this } else if (count == 0) { @@ -345,7 +250,6 @@ private[sql] object StatFunctions extends Logging { // Target rank val rank = math.ceil(quantile * count).toInt val targetError = math.ceil(epsilon * count) -// println(s"query: quantile=$quantile, rank=$rank, targetError=$targetError") // Minimum rank at current sample var minRank = 0 var i = 1 @@ -353,7 +257,6 @@ private[sql] object StatFunctions extends Logging { val curSample = sampled(i) minRank += curSample.g val maxRank = minRank + curSample.delta -// println(s"bracket $i: minRank=$minRank maxRank=$maxRank") if (maxRank - targetError <= rank && rank <= minRank + targetError) { return curSample.value } @@ -364,15 +267,17 @@ private[sql] object StatFunctions extends Logging { } object QuantileSummaries { + // TODO(tjhunter) more tuning could be done one the constants here, but for now the main cost of the algorithm is + // accessing the data in SQL. /** * The default value for the compression threshold. */ - val defaultCompressThreshold: Int = 1000 + val defaultCompressThreshold: Int = 10000 /** * The size of the head buffer. */ - val defaultHeadSize: Int = 500000 + val defaultHeadSize: Int = 50000 /** * The default value for epsilon. @@ -387,21 +292,11 @@ private[sql] object StatFunctions extends Logging { */ case class Stats(value: Double, g: Int, delta: Int) - def printBuffer(buff: Seq[Stats]): String = { - var rankMin = 0 - buff.map { s => - rankMin += s.g - // (s.value, rankMin, rankMin + rankMin + s.delta, s.g, s.delta) - s"${s.value}:$rankMin" - }.mkString(" ") - } - - def compressImmut(currentSamples: IndexedSeq[Stats], mergeThreshold: Double): ArrayBuffer[Stats] = { + private def compressImmut(currentSamples: IndexedSeq[Stats], mergeThreshold: Double): ArrayBuffer[Stats] = { val res: ArrayBuffer[Stats] = ArrayBuffer.empty if (currentSamples.isEmpty) { return res } -// val mergeThreshold = 2 * epsilon * count // Start for the last element, which is always part of the set. // The head contains the current new head, that may be merged with the current element. var head = currentSamples.last @@ -424,10 +319,8 @@ private[sql] object StatFunctions extends Logging { res.prepend(head) // If necessary, add the minimum element: res.prepend(currentSamples.head) -// println(s"compressImmut: threshold=$mergeThreshold, \nbefore=${printBuffer(currentSamples)}\nafter=${printBuffer(res)}") res } - } /** Calculate the Pearson Correlation Coefficient for the given columns */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index d04342993db8b..260fda823a30d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -89,6 +89,26 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(firstRun == secondRun) } + test("approximate quantile") { + val df = Seq.tabulate(1000)(i => (i, 2.0 * i)).toDF("singles", "doubles") + + val expected_1 = 500.0 + val expected_2 = 1600.0 + + val epsilons = List(0.1, 0.05, 0.001) + + for (epsilon <- epsilons) { + val result1 = df.stat.approxQuantile("singles", 0.5, epsilon) + val result2 = df.stat.approxQuantile("doubles", 0.8, epsilon) + + val error_1 = 2 * 1000 * epsilon + val error_2 = 2 * 2000 * epsilon + + assert(math.abs(result1 - expected_1) < error_1) + assert(math.abs(result2 - expected_2) < error_2) + } + } + test("pearson correlation") { val df = Seq.tabulate(10)(i => (i, 2 * i, i * -1.0)).toDF("a", "b", "c") val corr1 = df.stat.corr("a", "b", "pearson") @@ -125,27 +145,6 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(math.abs(decimalRes) < 1e-12) } - test("approximate quantile") { - val df = Seq.tabulate(1000)(i => (i, 2.0 * i)).toDF("singles", "doubles") - - val expected_1 = 500.0 - val expected_2 = 1600.0 - - val epsilons = List(0.1, 0.05, 0.001) - - for (i <- 0 to 2) { - val epsilon = epsilons(i) - val result1 = df.stat.approxQuantile("singles", 0.5, epsilon) - val result2 = df.stat.approxQuantile("doubles", 0.8, epsilon) - - val error_1 = 2 * 1000 * epsilon - val error_2 = 2 * 2000 * epsilon - - assert(math.abs(result1 - expected_1) < error_1) - assert(math.abs(result2 - expected_2) < error_2) - } - } - test("crosstab") { val rng = new Random() val data = Seq.tabulate(25)(i => (rng.nextInt(5), rng.nextInt(10))) @@ -303,6 +302,7 @@ class DataFrameStatPerfSuite extends QueryTest with SharedSQLContext with Loggin // Do some warmup logDebug("warmup...") for (i <- 1 to 10) { + df.count() f } logDebug("execute...") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index d4804915eee1f..9206847a40f12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -20,7 +20,6 @@ class ApproxQuantileSuite extends SparkFunSuite { threshold: Int): QuantileSummaries = { var summary = new QuantileSummaries(threshold, epsi) data.foreach { x => - println(s"buildSummary: $x ${summary.hashCode()}") summary = summary.insert(x) } summary.compress() @@ -39,17 +38,13 @@ class ApproxQuantileSuite extends SparkFunSuite { } for { -// (seq_name, data) <- Seq(increasing, decreasing, random) -// epsi <- Seq(0.1, 0.0001) -// compression <- Seq(1000, 10) - (seq_name, data) <- Seq(increasing, decreasing, random).slice(0,3) - epsi <- Seq(0.1, 0.0001) //.headOption - compression <- Seq(1000, 10) //.drop(1) + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) } { test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { val s = buildSummary(data, epsi, compression) -// println(s"samples: ${s.sampled}") val min_approx = s.query(0.0) assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") val max_approx = s.query(1.0) @@ -59,8 +54,6 @@ class ApproxQuantileSuite extends SparkFunSuite { test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") { val s = buildSummary(data, epsi, compression) assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}") - println(s"samples: ${s.sampled}") - println(s"ranks: ${QuantileSummaries.printBuffer(s.sampled)}") checkQuantile(0.9999, data, s) checkQuantile(0.9, data, s) checkQuantile(0.5, data, s) @@ -74,9 +67,6 @@ class ApproxQuantileSuite extends SparkFunSuite { (seq_name, data) <- Seq(increasing, decreasing, random) epsi <- Seq(0.1, 0.0001) compression <- Seq(1000, 10) -// (seq_name, data) <- Seq(increasing, decreasing, random) -// epsi <- Seq(0.1, 0.0001) -// compression <- Seq(1000, 10) } { val (data1, data2) = { From d607fda404abe8d6175dd151793358069937305d Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Fri, 19 Feb 2016 09:56:44 -0800 Subject: [PATCH 13/15] fix import order --- .../test/scala/org/apache/spark/sql/DataFrameStatSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 9b30058e96dc8..12b2685fbdb88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql import java.util.Random -import org.apache.spark.Logging -import org.apache.spark.sql.execution.stat.StatFunctions import org.scalatest.Matchers._ +import org.apache.spark.Logging +import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.DoubleType From 47cde0551a11d266c2856e78d7060f5bf26f149f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 21 Feb 2016 12:58:54 +0000 Subject: [PATCH 14/15] Add Apache license headers. --- .../execution/stat/ApproxQuantileSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala index 9206847a40f12..6992b4c7235ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.stat import scala.util.Random From a36891babc21b1b1ba26854aad10aa9af7c4ab89 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 21 Feb 2016 13:41:12 +0000 Subject: [PATCH 15/15] Fix scala style. --- .../sql/execution/stat/StatFunctions.scala | 67 +++++++++++-------- .../apache/spark/sql/DataFrameStatSuite.scala | 4 +- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index a7e9e92cc71ab..eb056d555bbdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -39,16 +39,17 @@ private[sql] object StatFunctions extends Logging { * * Note on the target error. * - * The result of this algorithm has the following deterministic bound: if the DataFrame has N elements and if we - * request the quantile `phi` up to error `epsi`, then the algorithm will return a sample `x` from the DataFrame so - * that the *exact* rank of `x` close to (phi * N). More precisely: + * The result of this algorithm has the following deterministic bound: + * if the DataFrame has N elements and if we request the quantile `phi` up to error `epsi`, + * then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank + * of `x` close to (phi * N). More precisely: * * floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N) * * Note on the algorithm used. * - * This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The - * algorithm was first present in the following article: + * This method implements a variation of the Greenwald-Khanna algorithm + * (with some speed optimizations). The algorithm was first present in the following article: * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) * @@ -88,14 +89,16 @@ private[sql] object StatFunctions extends Logging { val columns: Seq[Column] = cols.map { colName => val field = df.schema(colName) require(field.dataType.isInstanceOf[NumericType], - s"Quantile calculation for column $colName with data type ${field.dataType} is not supported.") + s"Quantile calculation for column $colName with data type ${field.dataType}" + + " is not supported.") Column(Cast(Column(colName).expr, DoubleType)) } val emptySummaries = Array.fill(cols.size)( new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon)) - // Note that it works more or less by accident as `rdd.aggregate` is not a pure function: this function returns - // the same array as given in the input (because `aggregate` reuses the same argument). + // Note that it works more or less by accident as `rdd.aggregate` is not a pure function: + // this function returns the same array as given in the input (because `aggregate` reuses + // the same argument). def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = { var i = 0 while (i < summaries.length) { @@ -105,7 +108,9 @@ private[sql] object StatFunctions extends Logging { summaries } - def merge(sum1: Array[QuantileSummaries], sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { + def merge( + sum1: Array[QuantileSummaries], + sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) } } val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge) @@ -119,15 +124,16 @@ private[sql] object StatFunctions extends Logging { * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) * - * In order to optimize for speed, it maintains an internal buffer of the last seen samples, and only inserts them - * after crossing a certain size threshold. This guarantees a near-constant runtime complexity compared to the - * original algorithm. + * In order to optimize for speed, it maintains an internal buffer of the last seen samples, + * and only inserts them after crossing a certain size threshold. This guarantees a near-constant + * runtime complexity compared to the original algorithm. * - * @param compressThreshold the compression threshold: after the internal buffer of statistics crosses this size, it - * attempts to compress the statistics together + * @param compressThreshold the compression threshold: after the internal buffer of statistics + * crosses this size, it attempts to compress the statistics together * @param epsilon the target precision * @param sampled a buffer of quantile statistics. See the G-K article for more details - * @param count the count of all the elements *inserted in the sampled buffer* (excluding the head buffer) + * @param count the count of all the elements *inserted in the sampled buffer* + * (excluding the head buffer) * @param headSampled a buffer of latest samples seen so far */ class QuantileSummaries( @@ -149,8 +155,8 @@ private[sql] object StatFunctions extends Logging { } /** - * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse the summary statistics in - * a single batch. + * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse + * the summary statistics in a single batch. * * This method does not modify the current object and returns if necessary a new copy. * @@ -177,11 +183,13 @@ private[sql] object StatFunctions extends Logging { // If it is the first one to insert, of if it is the last one currentCount += 1 - val delta = if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { - 0 - } else { - math.floor(2 * epsilon * currentCount).toInt - } + val delta = + if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { + 0 + } else { + math.floor(2 * epsilon * currentCount).toInt + } + val tuple = Stats(currentSample, 1, delta) newSamples.append(tuple) opsIdx += 1 @@ -200,7 +208,8 @@ private[sql] object StatFunctions extends Logging { val inserted = this.withHeadInserted assert(inserted.headSampled.isEmpty) assert(inserted.count == count + headSampled.size) - val compressed = compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count) + val compressed = + compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count) new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count) } @@ -216,7 +225,9 @@ private[sql] object StatFunctions extends Logging { val res: ArrayBuffer[Stats] = ArrayBuffer.empty @tailrec - def mergeCurrent(thisList: List[Stats], otherList: List[Stats]): Unit = (thisList, otherList) match { + def mergeCurrent( + thisList: List[Stats], + otherList: List[Stats]): Unit = (thisList, otherList) match { case (Nil, l) => res.appendAll(l) case (l, Nil) => @@ -267,8 +278,8 @@ private[sql] object StatFunctions extends Logging { } object QuantileSummaries { - // TODO(tjhunter) more tuning could be done one the constants here, but for now the main cost of the algorithm is - // accessing the data in SQL. + // TODO(tjhunter) more tuning could be done one the constants here, but for now + // the main cost of the algorithm is accessing the data in SQL. /** * The default value for the compression threshold. */ @@ -292,7 +303,9 @@ private[sql] object StatFunctions extends Logging { */ case class Stats(value: Double, g: Int, delta: Int) - private def compressImmut(currentSamples: IndexedSeq[Stats], mergeThreshold: Double): ArrayBuffer[Stats] = { + private def compressImmut( + currentSamples: IndexedSeq[Stats], + mergeThreshold: Double): ArrayBuffer[Stats] = { val res: ArrayBuffer[Stats] = ArrayBuffer.empty if (currentSamples.isEmpty) { return res diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 12b2685fbdb88..7f9229244b583 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -322,7 +322,9 @@ class DataFrameStatPerfSuite extends QueryTest with SharedSQLContext with Loggin val t1 = millis { df.describe() } logDebug(s"T1 = $t1") logDebug("*** Just quantiles ***") - val t2 = millis { StatFunctions.multipleApproxQuantiles(df, Seq("col1"), Seq(0.1, 0.25, 0.5, 0.75, 0.9), 0.01) } + val t2 = millis { + StatFunctions.multipleApproxQuantiles(df, Seq("col1"), Seq(0.1, 0.25, 0.5, 0.75, 0.9), 0.01) + } logDebug(s"T1 = $t1, T2 = $t2") }