Skip to content

Commit

Permalink
Merge pull request #8 from datadrivencz/39-replace-approx-metric-with…
Browse files Browse the repository at this point in the history
…-tdigest

 [proxima-core] #39 replace ApproxPercentileMetric with T-Digest
  • Loading branch information
je-ik committed Dec 11, 2017
2 parents a6088b1 + 8ed0708 commit c2959cb
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 94 deletions.
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
<artifactId>euphoria-core</artifactId>
<version>${euphoria.version}</version>
</dependency>

<dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
<version>3.2</version>
</dependency>

<dependency>
<groupId>cz.seznam.euphoria</groupId>
Expand Down
147 changes: 69 additions & 78 deletions core/src/main/java/cz/o2/proxima/metrics/ApproxPercentileMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package cz.o2.proxima.metrics;

import com.google.common.base.Preconditions;
import com.tdunning.math.stats.TDigest;
import cz.o2.proxima.util.Pair;
import java.util.Arrays;
import java.util.Random;

/**
* An approximation of 1st, 10th, 30th, 50th, 70th, 90th and 99th percentile.
Expand All @@ -26,103 +28,92 @@ public class ApproxPercentileMetric
extends Metric<Stats>
implements ApproxPercentileMetricMXBean {

static final int SUBSAMPLE_SIZE = 500;

public static ApproxPercentileMetric of(String group, String name) {
return new ApproxPercentileMetric(group, name);

/**
* Construct the metric.
* @param group group name
* @param name metric name
* @param duration total duration of the statistic in ms
* @param window windowNs size in ms
* @return
*/
public static ApproxPercentileMetric of(
String group, String name, long duration, long window) {

return new ApproxPercentileMetric(group, name, duration, window);
}

final Random random = new Random();
final double[] capture = new double[SUBSAMPLE_SIZE];
final int[] generations = new int[SUBSAMPLE_SIZE];
int subsamples = 0;
Pair<TDigest, Long>[] digests;
final long windowNs;
final int maxDigests;
int current = 0;

ApproxPercentileMetric(String group, String name) {
/**
* Construct the metric.
* @param group group name
* @param name metric name
* @param duration total duration of the statistic in ms
* @param window window size in ms
*/
ApproxPercentileMetric(String group, String name, long duration, long window) {
super(group, name);
Preconditions.checkArgument(window > 0, "Window must be non-zero length");
this.maxDigests = (int) (duration / window);
Preconditions.checkArgument(maxDigests > 0, "Duration must be at least of length of the window");
this.windowNs = window * 1_000_000L;
_reset();
}

private TDigest createDigest() {
return TDigest.createMergingDigest(100.0);
}


@Override
public void increment(double d) {
if (subsamples < SUBSAMPLE_SIZE) {
// capture all data elements, until the array is full
synchronized (this) {
generations[subsamples] = 1;
capture[subsamples++] = d;
if (subsamples == SUBSAMPLE_SIZE) {
Arrays.sort(capture);
}
}
} else {
synchronized (this) {
int remove = random.nextInt(capture.length);
double removed = capture[remove];
double w = 1.0 / generations[remove];
d = (d + removed * w) / (1 + w);
int search = Arrays.binarySearch(capture, d);
int insert = search;
if (search < 0) {
insert = - (search + 1);
}
if (insert < remove) {
System.arraycopy(capture, insert, capture, insert + 1, remove - insert);
} else if (insert != remove) {
insert--;
int toMove = insert - remove;
if (toMove > 0) {
System.arraycopy(capture, remove + 1, capture, remove, toMove);
}
}
capture[insert] = d;
generations[insert] = 0;
}
for (int i = 0; i < SUBSAMPLE_SIZE; i++) {
generations[i]++;
}
public synchronized void increment(double d) {
if (System.nanoTime() - digests[current].getSecond() > windowNs) {
addDigest();
}
digests[current].getFirst().add(d);
}


@Override
public synchronized Stats getValue() {
// calculate the stastics from the capture array
if (subsamples >= SUBSAMPLE_SIZE) {
return new Stats(new double[] {
capture[5],
capture[50],
capture[150],
capture[250],
capture[350],
capture[450],
capture[495]
});
}
// not yet properly sampled
// we need to copy the array and sort it
if (subsamples > 50) {
double[] sorted = Arrays.copyOf(capture, subsamples);
Arrays.sort(sorted);
int ratio = sorted.length / 10;
return new Stats(new double[] {
sorted[ratio / 10],
sorted[ratio],
sorted[3 * ratio],
sorted[5 * ratio],
sorted[7 * ratio],
sorted[9 * ratio],
sorted[99 * ratio / 10]
});
}
// zeros, don't mess the stats with too fuzzy data
return new Stats(new double[7]);
TDigest result = createDigest();
Arrays.stream(digests).forEach(p -> result.add(p.getFirst()));
return new Stats(new double[] {
result.quantile(0.01),
result.quantile(0.1),
result.quantile(0.3),
result.quantile(0.5),
result.quantile(0.7),
result.quantile(0.9),
result.quantile(0.99),
});
}

@Override
public synchronized void reset() {
subsamples = 0;
for (int i = 0; i < capture.length; i++) {
capture[i] = 0;
_reset();
}

private void addDigest() {
if (current + 1 < maxDigests) {
digests[++current] = Pair.of(createDigest(), System.nanoTime());
} else {
// move the array
System.arraycopy(digests, 1, digests, 0, maxDigests - 1);
digests[current] = Pair.of(createDigest(), System.nanoTime());
}
}

@SuppressWarnings("unchecked")
private synchronized void _reset() {
this.digests = new Pair[maxDigests];
this.digests[0] = Pair.of(createDigest(), System.nanoTime());
this.current = 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cz.o2.proxima.metrics;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import static org.junit.Assert.*;

Expand All @@ -26,19 +27,16 @@
*/
public class ApproxPercentileMetricTest {

private Random random = new Random();
private final Random random = new Random();

@Test
public void testMetric() {
ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test");
ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test", 60000, 60000);
// put in some uniformly distributed values in range 0..999
int count = 10000000;
int count = 100000;
for (int i = 0; i < count; i++) {
int r = random.nextInt(1000);
m.increment(r);
if (m.subsamples == ApproxPercentileMetric.SUBSAMPLE_SIZE && i % 1000 == 0) {
checkSorted(m.capture);
}
}
double[] result = m.getValue().getRaw();
assertTrue("Invalid value, got " + result[0], result[0] < 30);
Expand All @@ -52,18 +50,15 @@ public void testMetric() {

@Test
public void testMetricWhenDistributionUpdates() {
ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test");
ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test", 60000, 60000);
// put in some uniformly distributed values in range 0..999
final int k = 2000;
for (int i = 0; i < k; i++) {
m.increment(random.nextInt(1000));
}
// change the distribution and put in values distributed in 0..99
for (int i = 0; i < 50000 * k; i++) {
for (int i = 0; i < 1000 * k; i++) {
m.increment(random.nextInt(100));
if (m.subsamples == ApproxPercentileMetric.SUBSAMPLE_SIZE && i % 1000 == 0) {
checkSorted(m.capture);
}
}
double[] result = m.getValue().getRaw();
assertTrue("Invalid value, got " + result[0], result[0] < 3);
Expand All @@ -75,12 +70,20 @@ public void testMetricWhenDistributionUpdates() {
assertTrue("Invalid value, got " + result[6], 95 < result[6]);
}

private void checkSorted(double[] capture) {
for (int i = 1; i < capture.length - 1; i++) {
if (capture[i - 1] > capture[i]) {
fail("Array is not properly sorted");
@Test
public void testWindowing() throws InterruptedException {
ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test", 1000, 100);
// put 20-times 10 values and wait for 100 ms
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
m.increment(10 * i + j);
}
TimeUnit.MILLISECONDS.sleep(100);
}
assertEquals(
150.0,
m.getValue().get50(),
1.0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cz.o2.proxima.metrics.Metric;
import cz.o2.proxima.metrics.TimeAveragingMetric;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import javax.management.MBeanServer;
import javax.management.ObjectName;

Expand All @@ -37,7 +38,8 @@ public class Metrics {
GROUP, "ingest-bulk", 1_000);

public static final ApproxPercentileMetric BULK_SIZE = ApproxPercentileMetric.of(
GROUP, "bulk-size");
GROUP, "bulk-size", Duration.ofHours(1).toMillis(),
Duration.ofMinutes(5).toMillis());

public static final Metric<Double> INGESTS = TimeAveragingMetric.of(
GROUP, "ingests", 1_000);
Expand Down

0 comments on commit c2959cb

Please sign in to comment.