Skip to content
Permalink
Browse files
Merge pull request #384 from apache/FixKllComments
Fix kll comments
  • Loading branch information
leerho committed Jan 29, 2022
2 parents 1cb37fb + d465d28 commit 1e14bf14120d66a71189972f4f57b16a5b8014e1
Showing 6 changed files with 63 additions and 33 deletions.
@@ -41,7 +41,7 @@ public static long convertToPrecedingCummulative(final long[] array) {
}

/**
* Returns the zero-based index (position) of a value in the hypothetical sorted stream of
* Returns the linear zero-based index (position) of a value in the hypothetical sorted stream of
* values of size n.
* @param phi the fractional position where: 0 ≤ φ ≤ 1.0.
* @param n the size of the stream
@@ -54,7 +54,7 @@ public static long posOfPhi(final double phi, final long n) {

/**
* This is written in terms of a plain array to facilitate testing.
* @param wtArr the cumlative weights array consisting of chunks
* @param wtArr the cumulative weights array consisting of chunks
* @param pos the position
* @return the index of the chunk containing the position
*/
@@ -54,8 +54,8 @@ final class KllFloatsQuantileCalculator {
n_ = n;
items_ = items;
weights_ = weights; //must be size of items + 1
levels_ = null; //not used
numLevels_ = 0; //not used
levels_ = null; //not used by test
numLevels_ = 0; //not used by test
}

float getQuantile(final double phi) { //phi is normalized rank [0,1].
@@ -80,7 +80,7 @@ private void populateFromSketch(final float[] srcItems, final int[] srcLevels,
while (srcLevel < numLevels) {
final int fromIndex = srcLevels[srcLevel] - offset;
final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive
if (fromIndex < toIndex) { // skip empty levels
if (fromIndex < toIndex) { // if equal, skip empty level
Arrays.fill(weights_, fromIndex, toIndex, weight);
levels_[dstLevel] = fromIndex;
levels_[dstLevel + 1] = toIndex;
@@ -104,9 +104,10 @@ private static void blockyTandemMergeSort(final float[] items, final long[] weig
blockyTandemMergeSortRecursion(itemsTmp, weightsTmp, items, weights, levels, 0, numLevels);
}

private static void blockyTandemMergeSortRecursion(final float[] itemsSrc, final long[] weightsSrc,
final float[] itemsDst, final long[] weightsDst, final int[] levels, final int startingLevel,
final int numLevels) {
private static void blockyTandemMergeSortRecursion(
final float[] itemsSrc, final long[] weightsSrc,
final float[] itemsDst, final long[] weightsDst,
final int[] levels, final int startingLevel, final int numLevels) {
if (numLevels == 1) { return; }
final int numLevels1 = numLevels / 2;
final int numLevels2 = numLevels - numLevels1;
@@ -115,17 +116,27 @@ private static void blockyTandemMergeSortRecursion(final float[] itemsSrc, final
final int startingLevel1 = startingLevel;
final int startingLevel2 = startingLevel + numLevels1;
// swap roles of src and dst
blockyTandemMergeSortRecursion(itemsDst, weightsDst, itemsSrc, weightsSrc, levels,
startingLevel1, numLevels1);
blockyTandemMergeSortRecursion(itemsDst, weightsDst, itemsSrc, weightsSrc, levels,
startingLevel2, numLevels2);
tandemMerge(itemsSrc, weightsSrc, itemsDst, weightsDst, levels, startingLevel1, numLevels1,
blockyTandemMergeSortRecursion(
itemsDst, weightsDst,
itemsSrc, weightsSrc,
levels, startingLevel1, numLevels1);
blockyTandemMergeSortRecursion(
itemsDst, weightsDst,
itemsSrc, weightsSrc,
levels, startingLevel2, numLevels2);
tandemMerge(
itemsSrc, weightsSrc,
itemsDst, weightsDst,
levels,
startingLevel1, numLevels1,
startingLevel2, numLevels2);
}

private static void tandemMerge(final float[] itemsSrc, final long[] weightsSrc,
private static void tandemMerge(
final float[] itemsSrc, final long[] weightsSrc,
final float[] itemsDst, final long[] weightsDst,
final int[] levelStarts, final int startingLevel1, final int numLevels1,
final int[] levelStarts,
final int startingLevel1, final int numLevels1,
final int startingLevel2, final int numLevels2) {
final int fromIndex1 = levelStarts[startingLevel1];
final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
@@ -251,7 +251,7 @@ private enum Flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM }
private float maxValue_;
private final boolean compatible; //compatible with quantiles sketch
private static final Random random = new Random();

/**
* Heap constructor with the default <em>k = 200</em>, which has a rank error of about 1.65%.
*/
@@ -300,7 +300,7 @@ private KllFloatsSketch(final int k, final int m, final boolean compatible) {

/**
* Off-heap constructor.
* @param mem Memory object that contains data serilized by this sketch.
* @param mem Memory object that contains data serialized by this sketch.
*/
private KllFloatsSketch(final Memory mem) {
m_ = DEFAULT_M;
@@ -498,7 +498,7 @@ public double getNormalizedRankError(final boolean pmf) {
/**
* Gets the normalized rank error given k and pmf.
* Static method version of the {@link #getNormalizedRankError(boolean)}.
* @param k the configuation parameter
* @param k the configuration parameter
* @param pmf if true, returns the "double-sided" normalized rank error for the getPMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* @return if pmf is true, the normalized rank error for the getPMF() function.
@@ -832,7 +832,7 @@ public String toString(final boolean withLevels, final boolean withData) {
sb.append(" Empty : ").append(isEmpty()).append(Util.LS);
sb.append(" Estimation Mode : ").append(isEstimationMode()).append(Util.LS);
sb.append(" Levels : ").append(numLevels_).append(Util.LS);
sb.append(" Sorted : ").append(isLevelZeroSorted_).append(Util.LS);
sb.append(" Level 0 Sorted : ").append(isLevelZeroSorted_).append(Util.LS);
sb.append(" Buffer Capacity Items: ").append(items_.length).append(Util.LS);
sb.append(" Retained Items : ").append(getNumRetained()).append(Util.LS);
sb.append(" Storage Bytes : ").append(getSerializedSizeBytes()).append(Util.LS);
@@ -842,7 +842,7 @@ public String toString(final boolean withLevels, final boolean withData) {

if (withLevels) {
sb.append("### KLL sketch levels:").append(Util.LS)
.append(" level, offset: nominal capacity, actual size").append(Util.LS);
.append(" level, offset: nominal capacity, actual size").append(Util.LS);
for (int i = 0; i < numLevels_; i++) {
sb.append(" ").append(i).append(", ").append(levels_[i]).append(": ")
.append(KllHelper.levelCapacity(k_, numLevels_, i, m_))
@@ -852,19 +852,29 @@ public String toString(final boolean withLevels, final boolean withData) {
}

if (withData) {
sb.append("### KLL sketch data:").append(Util.LS);
sb.append("### KLL sketch data {index, item}:").append(Util.LS);
if (levels_[0] > 0) {
sb.append(" Garbage:" + Util.LS);
for (int i = 0; i < levels_[0]; i++) {
if (items_[i] == 0.0f) { continue; }
sb.append(" ").append(i + ", ").append(items_[i]).append(Util.LS);
}
}
int level = 0;
while (level < numLevels_) {
final int fromIndex = levels_[level];
final int toIndex = levels_[level + 1]; // exclusive
if (fromIndex < toIndex) {
sb.append(" level ").append(level).append(":").append(Util.LS);
sb.append(" level[").append(level).append("]: offset: " + levels_[level] + " wt: " + (1 << level));
sb.append(Util.LS);
}
for (int i = fromIndex; i < toIndex; i++) {
sb.append(" ").append(items_[i]).append(Util.LS);
sb.append(" ").append(i + ", ").append(items_[i]).append(Util.LS);
}
level++;
}
sb.append(" level[" + level + "]: offset: " + levels_[level] + " (Exclusive)");
sb.append(Util.LS);
sb.append("### End sketch data").append(Util.LS);
}

@@ -1009,7 +1019,9 @@ private void compressWhileUpdating() {
KllHelper.randomlyHalveUp(items_, adjBeg, adjPop, random);
} else {
KllHelper.randomlyHalveDown(items_, adjBeg, adjPop, random);
KllHelper.mergeSortedArrays(items_, adjBeg, halfAdjPop, items_, rawLim, popAbove,
KllHelper.mergeSortedArrays(
items_, adjBeg, halfAdjPop,
items_, rawLim, popAbove,
items_, adjBeg + halfAdjPop);
}
levels_[level + 1] -= halfAdjPop; // adjust boundaries of the level above
@@ -1034,7 +1046,11 @@ private void compressWhileUpdating() {
}
}

private int findLevelToCompact() {
/**
* Finds the first level starting with level 0 that exceeds its nominal capacity
* @return level to compact
*/
private int findLevelToCompact() { //
int level = 0;
while (true) {
assert level < numLevels_;
@@ -1051,7 +1067,7 @@ private void addEmptyTopLevelToCompletelyFullSketch() {
final int curTotalCap = levels_[numLevels_];

// make sure that we are following a certain growth scheme
assert levels_[0] == 0;
assert levels_[0] == 0; //definition of full
assert items_.length == curTotalCap;

// note that merging MIGHT over-grow levels_, in which case we might not have to grow it here
@@ -41,6 +41,7 @@ static boolean isOdd(final int value) {

/**
* Copy the old array into a new larger array.
* The extra space is at the top.
* @param oldArr the given old array with data
* @param newLen the new length larger than the oldArr.length.
* @return the new array
@@ -106,7 +107,7 @@ static int levelCapacity(final int k, final int numLevels, final int height, fin
* @return the actual capacity of a given level given its depth index.
*/
private static long intCapAux(final int k, final int depth) {
if (depth <= 30) { return (int) intCapAuxAux(k, depth); }
if (depth <= 30) { return intCapAuxAux(k, depth); }
final int half = depth / 2;
final int rest = depth - half;
final long tmp = intCapAuxAux(k, half);
@@ -120,9 +121,9 @@ private static long intCapAux(final int k, final int depth) {
* @return the actual capacity of a given level given its depth index.
*/
private static long intCapAuxAux(final long k, final int depth) {
final long twok = k << 1; // for rounding, we pre-multiply by 2
final long twok = k << 1; // for rounding pre-multiply by 2
final long tmp = ((twok << depth) / powersOfThree[depth]);
final long result = ((tmp + 1) >> 1); // then here we add 1 and divide by 2
final long result = ((tmp + 1L) >>> 1); // add 1 and divide by 2
assert (result <= k);
return result;
}
@@ -147,8 +148,10 @@ static long sumTheSampleWeights(final int num_levels, final int[] levels) {
return total;
}

static void mergeSortedArrays(final float[] bufA, final int startA, final int lenA,
final float[] bufB, final int startB, final int lenB, final float[] bufC, final int startC) {
static void mergeSortedArrays(
final float[] bufA, final int startA, final int lenA,
final float[] bufB, final int startB, final int lenB,
final float[] bufC, final int startC) {
final int lenC = lenA + lenB;
final int limA = startA + lenA;
final int limB = startB + lenB;
@@ -84,7 +84,7 @@ final class DoublesAuxiliary {
* item would appear in position 0 &le; pos &lt; n of a hypothetical sorted
* version of that stream.
*
* <p>Note that since that since the true stream is unavailable,
* <p>Note that since the true stream is unavailable,
* we don't actually answer the question for that stream, but rather for
* a <i>different</i> stream of the same length, that could hypothetically
* be reconstructed from the weighted samples in our sketch.
@@ -39,7 +39,7 @@

/**
* Constructs the Auxiliary structure from the ItemsSketch
* @param qs an Itemsketch
* @param qs an ItemsSketch
*/
@SuppressWarnings("unchecked")
ItemsAuxiliary(final ItemsSketch<T> qs) {

0 comments on commit 1e14bf1

Please sign in to comment.