Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,6 +78,55 @@ public static HistogramData linear(double start, double width, int numBuckets) {
return new HistogramData(LinearBuckets.of(start, width, numBuckets));
}

/**
* Returns a histogram object wiht exponential boundaries. The input parameter {@code scale}
* determines a coefficient 'base' which species bucket boundaries.
*
* <pre>
* base = 2**(2**(-scale)) e.g.
* scale=1 => base=2**(1/2)=sqrt(2)
* scale=0 => base=2**(1)=2
* scale=-1 => base=2**(2)=4
* </pre>
*
* This bucketing strategy makes it simple/numerically stable to compute bucket indexes for
* datapoints.
*
* <pre>
* Bucket boundaries are given by the following table where n=numBuckets.
* | 'Bucket Index' | Bucket Boundaries |
* |---------------|---------------------|
* | Underflow | (-inf, 0) |
* | 0 | [0, base) |
* | 1 | [base, base^2) |
* | 2 | [base^2, base^3) |
* | i | [base^i, base^(i+1))|
* | n-1 | [base^(n-1), base^n)|
* | Overflow | [base^n, inf) |
* </pre>
*
* <pre>
* Example scale/boundaries:
* When scale=1, buckets 0,1,2...i have lowerbounds 0, 2^(1/2), 2^(2/2), ... 2^(i/2).
* When scale=0, buckets 0,1,2...i have lowerbounds 0, 2, 2^2, ... 2^(i).
* When scale=-1, buckets 0,1,2...i have lowerbounds 0, 4, 4^2, ... 4^(i).
* </pre>
*
* Scale parameter is similar to <a
* href="https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram">
* OpenTelemetry's notion of ExponentialHistogram</a>. Bucket boundaries are modified to make them
* compatible with GCP's exponential histogram.
*
* @param numBuckets The number of buckets. Clipped so that the largest bucket's lower bound is
* not greater than 2^32-1 (uint32 max).
* @param scale Integer between [-3, 3] which determines bucket boundaries. Larger values imply
* more fine grained buckets.
* @return a new Histogram instance.
*/
public static HistogramData exponential(int scale, int numBuckets) {
return new HistogramData(ExponentialBuckets.of(scale, numBuckets));
}

public void record(double... values) {
for (double value : values) {
record(value);
Expand Down Expand Up @@ -227,6 +277,150 @@ public interface BucketType extends Serializable {
double getAccumulatedBucketSize(int endIndex);
}

@AutoValue
public abstract static class ExponentialBuckets implements BucketType {

// Minimum scale factor. Bucket boundaries can grow at a rate of at most: 2^(2^3)=2^8=256
private static final int MINIMUM_SCALE = -3;

// Minimum scale factor. Bucket boundaries must grow at a rate of at least 2^(2^-3)=2^(1/8)
private static final int MAXIMUM_SCALE = 3;

// Maximum number of buckets that is supported when 'scale' is zero.
private static final int ZERO_SCALE_MAX_NUM_BUCKETS = 32;

public abstract double getBase();

public abstract int getScale();

/**
* Set to 2**scale which is equivalent to 1/log_2(base). Precomputed to use in {@code
* getBucketIndexPositiveScale}
*/
public abstract double getInvLog2GrowthFactor();

@Override
public abstract int getNumBuckets();

/* Precomputed since this value is used everytime a datapoint is recorded. */
@Override
public abstract double getRangeTo();

public static ExponentialBuckets of(int scale, int numBuckets) {
if (scale < MINIMUM_SCALE) {
throw new IllegalArgumentException(
String.format("Scale should be greater than %d: %d", MINIMUM_SCALE, scale));
}

if (scale > MAXIMUM_SCALE) {
throw new IllegalArgumentException(
String.format("Scale should be less than %d: %d", MAXIMUM_SCALE, scale));
}
if (numBuckets <= 0) {
throw new IllegalArgumentException(
String.format("numBuckets should be positive: %d", numBuckets));
}

double invLog2GrowthFactor = Math.pow(2, scale);
double base = Math.pow(2, Math.pow(2, -scale));
int clippedNumBuckets = ExponentialBuckets.computeNumberOfBuckets(scale, numBuckets);
double rangeTo = Math.pow(base, clippedNumBuckets);
return new AutoValue_HistogramData_ExponentialBuckets(
base, scale, invLog2GrowthFactor, clippedNumBuckets, rangeTo);
}

/**
* numBuckets is clipped so that the largest bucket's lower bound is not greater than 2^32-1
* (uint32 max). This value is log_base(2^32) which simplifies as follows:
*
* <pre>
* log_base(2^32)
* = log_2(2^32)/log_2(base)
* = 32/(2**-scale)
* = 32*(2**scale)
* </pre>
*/
private static int computeNumberOfBuckets(int scale, int inputNumBuckets) {
if (scale == 0) {
// When base=2 then the bucket at index 31 contains [2^31, 2^32).
return Math.min(ZERO_SCALE_MAX_NUM_BUCKETS, inputNumBuckets);
} else if (scale > 0) {
// When scale is positive 32*(2**scale) is equivalent to a right bit-shift.
return Math.min(inputNumBuckets, ZERO_SCALE_MAX_NUM_BUCKETS << scale);
} else {
// When scale is negative 32*(2**scale) is equivalent to a left bit-shift.
return Math.min(inputNumBuckets, ZERO_SCALE_MAX_NUM_BUCKETS >> -scale);
}
}

@Override
public int getBucketIndex(double value) {
if (value < getBase()) {
return 0;
}

// When scale is non-positive, 'base' and 'bucket boundaries' will be integers.
// In this scenario `value` and `floor(value)` will belong to the same bucket.
int index;
if (getScale() > 0) {
index = getBucketIndexPositiveScale(value);
} else if (getScale() < 0) {
index = getBucketIndexNegativeScale(DoubleMath.roundToInt(value, RoundingMode.FLOOR));
} else {
index = getBucketIndexZeroScale(DoubleMath.roundToInt(value, RoundingMode.FLOOR));
}
// Ensure that a valid index is returned in the off chance of a numerical instability error.
return Math.max(Math.min(index, getNumBuckets() - 1), 0);
}

private int getBucketIndexZeroScale(int value) {
return IntMath.log2(value, RoundingMode.FLOOR);
}

private int getBucketIndexNegativeScale(int value) {
return getBucketIndexZeroScale(value) >> (-getScale());
}

// This method is valid for all 'scale' values but we fallback to more effecient methods for
// non-positive scales.
// For a value>base we would like to find an i s.t. :
// base^i <= value < base^(i+1)
// i <= log_base(value) < i+1
// i = floor(log_base(value))
// i = floor(log_2(value)/log_2(base))
private int getBucketIndexPositiveScale(double value) {
return DoubleMath.roundToInt(
getInvLog2GrowthFactor() * DoubleMath.log2(value), RoundingMode.FLOOR);
}

@Override
public double getBucketSize(int index) {
if (index < 0) {
return 0;
}
if (index == 0) {
return getBase();
}

// bucketSize = (base)^(i+1) - (base)^i
// = (base)^i(base - 1)
return Math.pow(getBase(), index) * (getBase() - 1);
}

@Override
public double getAccumulatedBucketSize(int endIndex) {
if (endIndex < 0) {
return 0;
}
return Math.pow(getBase(), endIndex + 1);
}

@Override
public double getRangeFrom() {
return 0;
}
}

@AutoValue
public abstract static class LinearBuckets implements BucketType {
public abstract double getStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.sdk.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -200,4 +202,134 @@ public void testIncrementBucketCountByIndex() {
assertThat(data.getTopBucketCount(), equalTo(4L));
assertThat(data.getTotalCount(), equalTo(10L));
}

// The following tests cover exponential buckets.
@Test
public void testExponentialBuckets_PostiveScaleRecord() {
// Buckets will be:
// Index Range
// Underflow (-inf, 0)
// 0 [0, sqrt(2))
// 1 [sqrt(2), 2)
// i [2^(i/2), 2^((i+1)/2))
HistogramData data = HistogramData.exponential(1, 40);

data.record(-1);
assertThat(data.getBottomBucketCount(), equalTo(1L));

data.record(0, 1);
assertThat(data.getCount(0), equalTo(2L));

data.record(2);
assertThat(data.getTotalCount(), equalTo(4L));
assertThat(data.getCount(2), equalTo(1L));

// 10th bucket contains range [2^5, 2^5.5) ~= [32, 45.25)
for (int i = 32; i <= 45; i++) {
data.record(i);
}
assertThat(data.getCount(10), equalTo(14L));

// 30th bucket contains range [2^15, 2^15.5) ~= [32768, 46340.9)
for (int i = 32768; i < 32768 + 100; i++) {
data.record(i);
}
assertThat(data.getCount(30), equalTo(100L));
for (int i = 46340; i > 46340 - 100; i--) {
data.record(i);
}
assertThat(data.getCount(30), equalTo(200L));
}

@Test
public void testExponentialBuckets_ZeroScaleRecord() {
// Buckets will be:
// Index Range
// Underflow (-inf, 0)
// 0 [0, 2)
// 1 [2, 2^2]
// i [2^i, 2^(i+1))
HistogramData data = HistogramData.exponential(0, 20);

data.record(-1);
assertThat(data.getBottomBucketCount(), equalTo(1L));

data.record(0, 1);
assertThat(data.getCount(0), equalTo(2L));

data.record(4, 5, 6, 7);
assertThat(data.getCount(2), equalTo(4L));

for (int i = 32; i < 64; i++) {
data.record(i);
}
assertThat(data.getCount(5), equalTo(32L));

for (int i = IntMath.pow(2, 16); i < IntMath.pow(2, 16) + 100; i++) {
data.record(i);
}
assertThat(data.getCount(16), equalTo(100L));

Long expectedTotalCount = Long.valueOf(100 + 32 + 4 + 2 + 1);
assertThat(data.getTotalCount(), equalTo(expectedTotalCount));
}

@Test
public void testExponentialBuckets_NegativeScalesRecord() {
// Buckets will be:
// Index Range
// Underflow (-inf, 0)
// 0 [0, 4)
// 1 [4, 4^2]
// i [4^i, 4^(i+1))
HistogramData data = HistogramData.exponential(-1, 20);

data.record(-1);
assertThat(data.getBottomBucketCount(), equalTo(1L));

data.record(0, 1, 2);
assertThat(data.getCount(0), equalTo(3L));

data.record(16, 17, 32, 33, 62, 63);
assertThat(data.getCount(2), equalTo(6L));

for (int i = IntMath.pow(4, 5); i < IntMath.pow(4, 5) + 20; i++) {
data.record(i);
}
assertThat(data.getCount(5), equalTo(20L));

Long expectedTotalCount = Long.valueOf(20 + 6 + 3 + 1);
assertThat(data.getTotalCount(), equalTo(expectedTotalCount));
}

@Test
public void testExponentialBuckets_BucketSize() {
HistogramData zeroScaleBucket = HistogramData.exponential(0, 20);
assertThat(zeroScaleBucket.getBucketType().getBucketSize(0), equalTo(2.0));
// 10th bucket contains [2^10, 2^11).
assertThat(zeroScaleBucket.getBucketType().getBucketSize(10), equalTo(1024.0));

HistogramData positiveScaleBucket = HistogramData.exponential(1, 20);
assertThat(positiveScaleBucket.getBucketType().getBucketSize(0), equalTo(Math.sqrt(2)));
// 10th bucket contains [2^5, 2^5.5).
assertThat(positiveScaleBucket.getBucketType().getBucketSize(10), closeTo(13.2, .1));

HistogramData negativeScaleBucket = HistogramData.exponential(-1, 20);
assertThat(negativeScaleBucket.getBucketType().getBucketSize(0), equalTo(4.0));
// 10th bucket contains [2^20, 2^22).
assertThat(negativeScaleBucket.getBucketType().getBucketSize(10), equalTo(3145728.0));
}

@Test
public void testExponentialBuckets_NumBuckets() {
// Validate that numBuckets clipping WAI.
HistogramData zeroScaleBucket = HistogramData.exponential(0, 200);
assertThat(zeroScaleBucket.getBucketType().getNumBuckets(), equalTo(32));

HistogramData positiveScaleBucket = HistogramData.exponential(3, 500);
assertThat(positiveScaleBucket.getBucketType().getNumBuckets(), equalTo(32 * 8));

HistogramData negativeScaleBucket = HistogramData.exponential(-3, 500);
assertThat(negativeScaleBucket.getBucketType().getNumBuckets(), equalTo(4));
}
}