Skip to content

Commit

Permalink
[SPARK-29336][SQL] Fix the implementation of QuantileSummaries.merge …
Browse files Browse the repository at this point in the history
…(guarantee that the relativeError will be respected)

### What changes were proposed in this pull request?

Reimplement `org.apache.spark.sql.catalyst.util.QuantileSummaries#merge` and add a test-case showing the previous bug.

### Why are the changes needed?

The original Greenwald-Khanna paper, from which the algorithm behind `approxQuantile` was taken, does not cover how to merge the result of multiple parallel QuantileSummaries. The current implementation violates some invariants and therefore the effective error can be larger than the specified.

### Does this PR introduce any user-facing change?

Yes, for same cases, the results from `approxQuantile` (`percentile_approx` in SQL) will now be within the expected error margin. For example:

```scala
var values = (1 to 100).toArray
val all_quantiles = values.indices.map(i => (i+1).toDouble / values.length).toArray
for (n <- 0 until 5) {
  var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5)
  val all_answers = df.stat.approxQuantile("value", all_quantiles, 0.1)
  val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray
  val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) => Math.abs(expected - answer) }).toArray
  val max_error = error.max
  print(max_error + "\n")
}
```

In the current build it returns:

```
16
12
10
11
17
```

I couldn't run the code with this patch applied to double check the implementation. Can someone please confirm it now outputs at most `10`, please?

### How was this patch tested?

A new unit test was added to uncover the previous bug.

Closes #26029 from sitegui/SPARK-29336.

Authored-by: Guilherme <sitegui@sitegui.com.br>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
sitegui authored and srowen committed Oct 8, 2019
1 parent 4e6d31f commit de360e9
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 18 deletions.
Expand Up @@ -159,14 +159,72 @@ class QuantileSummaries(
other.shallowCopy
} else {
// Merge the two buffers.
// The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
// statistics during the merging: the invariants are still respected after the merge.
// TODO: could replace full sort by ordered merge, the two lists are known to be sorted
// already.
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
other.compressThreshold, other.relativeError, comp, other.count + count, true)
// The GK algorithm is a bit unclear about it, but we need to adjust the statistics during the
// merging. The main idea is that samples that come from one side will suffer from the lack of
// precision of the other.
// As a concrete example, take two QuantileSummaries whose samples (value, g, delta) are:
// `a = [(0, 1, 0), (20, 99, 0)]` and `b = [(10, 1, 0), (30, 49, 0)]`
// This means `a` has 100 values, whose minimum is 0 and maximum is 20,
// while `b` has 50 values, between 10 and 30.
// The resulting samples of the merge will be:
// a+b = [(0, 1, 0), (10, 1, ??), (20, 99, ??), (30, 49, 0)]
// The values of `g` do not change, as they represent the minimum number of values between two
// consecutive samples. The values of `delta` should be adjusted, however.
// Take the case of the sample `10` from `b`. In the original stream, it could have appeared
// right after `0` (as expressed by `g=1`) or right before `20`, so `delta=99+0-1=98`.
// In the GK algorithm's style of working in terms of maximum bounds, one can observe that the
// maximum additional uncertainty over samples comming from `b` is `max(g_a + delta_a) =
// floor(2 * eps_a * n_a)`. Likewise, additional uncertainty over samples from `a` is
// `floor(2 * eps_b * n_b)`.
// Only samples that interleave the other side are affected. That means that samples from
// one side that are lesser (or greater) than all samples from the other side are just copied
// unmodifed.
// If the merging instances have different `relativeError`, the resulting instance will cary
// the largest one: `eps_ab = max(eps_a, eps_b)`.
// The main invariant of the GK algorithm is kept:
// `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since
// `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)`
// Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])`

val mergedSampled = new ArrayBuffer[Stats]()
val mergedRelativeError = math.max(relativeError, other.relativeError)
val mergedCount = count + other.count
val additionalSelfDelta = math.floor(2 * other.relativeError * other.count).toLong
val additionalOtherDelta = math.floor(2 * relativeError * count).toLong

// Do a merge of two sorted lists until one of the lists is fully consumed
var selfIdx = 0
var otherIdx = 0
while (selfIdx < sampled.length && otherIdx < other.sampled.length) {
val selfSample = sampled(selfIdx)
val otherSample = other.sampled(otherIdx)

// Detect next sample
val (nextSample, additionalDelta) = if (selfSample.value < otherSample.value) {
selfIdx += 1
(selfSample, if (otherIdx > 0) additionalSelfDelta else 0)
} else {
otherIdx += 1
(otherSample, if (selfIdx > 0) additionalOtherDelta else 0)
}

// Insert it
mergedSampled += nextSample.copy(delta = nextSample.delta + additionalDelta)
}

// Copy the remaining samples from the other list
// (by construction, at most one `while` loop will run)
while (selfIdx < sampled.length) {
mergedSampled += sampled(selfIdx)
selfIdx += 1
}
while (otherIdx < other.sampled.length) {
mergedSampled += other.sampled(otherIdx)
otherIdx += 1
}

val comp = compressImmut(mergedSampled, 2 * mergedRelativeError * mergedCount)
new QuantileSummaries(other.compressThreshold, mergedRelativeError, comp, mergedCount, true)
}
}

Expand Down
Expand Up @@ -169,5 +169,22 @@ class QuantileSummariesSuite extends SparkFunSuite {
checkQuantile(0.1, data, s)
checkQuantile(0.001, data, s)
}

// length of data21 is 4 * length of data22
val data21 = data.zipWithIndex.filter(_._2 % 5 != 0).map(_._1).toSeq
val data22 = data.zipWithIndex.filter(_._2 % 5 == 0).map(_._1).toSeq

test(
s"Merging unbalanced interleaved lists with epsi=$epsi and seq=$seq_name, " +
s"compression=$compression") {
val s1 = buildSummary(data21, epsi, compression)
val s2 = buildSummary(data22, epsi, compression)
val s = s1.merge(s2)
// Check all quantiles
for (queryRank <- 1 to n) {
val queryQuantile = queryRank.toDouble / n.toDouble
checkQuantile(queryQuantile, data, s)
}
}
}
}
Expand Up @@ -124,20 +124,24 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSparkSession
test("percentile_approx, with different accuracies") {

withTempView(table) {
(1 to 1000).toDF("col").createOrReplaceTempView(table)
val tableCount = 1000
(1 to tableCount).toDF("col").createOrReplaceTempView(table)

// With different accuracies
val expectedPercentile = 250D
val accuracies = Array(1, 10, 100, 1000, 10000)
val errors = accuracies.map { accuracy =>
val df = spark.sql(s"SELECT percentile_approx(col, 0.25, $accuracy) FROM $table")
val approximatePercentile = df.collect().head.getInt(0)
val error = Math.abs(approximatePercentile - expectedPercentile)
error
val expectedPercentiles = Array(100D, 200D, 250D, 314D, 777D)
for (accuracy <- accuracies) {
for (expectedPercentile <- expectedPercentiles) {
val df = spark.sql(
s"""SELECT
| percentile_approx(col, $expectedPercentile/$tableCount, $accuracy)
|FROM $table
""".stripMargin)
val approximatePercentile = df.collect().head.getInt(0)
val error = Math.abs(approximatePercentile - expectedPercentile)
assert(error <= math.floor(tableCount.toDouble / accuracy.toDouble))
}
}

// The larger accuracy value we use, the smaller error we get
assert(errors.sorted.sameElements(errors.reverse))
}
}

Expand Down

0 comments on commit de360e9

Please sign in to comment.