[query] Add reservoir sample aggregator#12812
Conversation
CHANGELOG: Fixed bug where Table/MT._calculate_new_partitions returned unbalanced intervals with whole-stage code generation runtime.
|
Suggestion from Patrick: https://en.wikipedia.org/wiki/Combinatorial_number_system could be used to transform the sample into a uniform single integer in |
|
|
||
| cb.assign(garbage, other.garbage) | ||
| cb.assign(seenSoFar, other.seenSoFar) | ||
| cb.assign(garbage, other.garbage) |
| cb.assign(j, 0) | ||
| cb.whileLoop(j < maxSize, { | ||
| val x = cb.memoize(rand.invoke[Double]("nextDouble")) | ||
| cb.ifx(x * (totalWeightLeft + totalWeightRight) <= totalWeightLeft, { |
There was a problem hiding this comment.
I think the probabilities need to change as you start pulling items out of the two sides. I think it should be
if (x * (leftSize * totalWeightLeft + rightSize * totalWeightRight) <= leftSize * totalWeightLeft)
Another possibility is to modify the left builder in place, using a weighted generalization of the seqOp:
weightSoFar = totalWeightLeft
rightWeight = totalWeightRight / rightSize
for (j in 0..right.size)
weightSoFar += rightWeight
if (left.size < maxSize)
left.append(right[j])
else
if (randDouble() * weightSoFar < rightWeight * maxSize)
swap right[j] into random position in left
The unweighted sampler maintains the invariant that at any time, the probability any item seen so far is in the sample (P(x in S)) is maxSize / seenSoFar. The weighted generalization makes that maxSize * weight(x) / weightSoFar, where weightSoFar is the sum of the weights of all items seen so far.
For the combOp, if we just union the two samples together, but give each item from the left the weight totalWeightLeft / leftSize, and similarly for the right, then after the weighted sampler runs, the probability any item from the left is in the result is
(leftSize / totalWeightLeft) * (maxSize * (totalWeightLeft / leftSize) / totalWeight)
=
maxSize / totalWeight
I'm pretty sure this handles all cases where one or both sides aren't full as well.
CHANGELOG: Fixed bug where Table/MT._calculate_new_partitions returned unbalanced intervals with whole-stage code generation runtime.