From 8ed070810b5670b6ff7381e459e84c328caebb78 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 11 Dec 2017 14:45:25 +0100 Subject: [PATCH] [proxima-core] #39 replace ApproxPercentileMetric with T-Digest --- core/pom.xml | 6 + .../metrics/ApproxPercentileMetric.java | 147 ++++++++---------- .../metrics/ApproxPercentileMetricTest.java | 33 ++-- .../cz/o2/proxima/server/metrics/Metrics.java | 4 +- 4 files changed, 96 insertions(+), 94 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 36a2b78f1..5867e4c7e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -117,6 +117,12 @@ euphoria-core ${euphoria.version} + + + com.tdunning + t-digest + 3.2 + cz.seznam.euphoria diff --git a/core/src/main/java/cz/o2/proxima/metrics/ApproxPercentileMetric.java b/core/src/main/java/cz/o2/proxima/metrics/ApproxPercentileMetric.java index fa9a95be3..344cd0af2 100644 --- a/core/src/main/java/cz/o2/proxima/metrics/ApproxPercentileMetric.java +++ b/core/src/main/java/cz/o2/proxima/metrics/ApproxPercentileMetric.java @@ -16,8 +16,10 @@ package cz.o2.proxima.metrics; +import com.google.common.base.Preconditions; +import com.tdunning.math.stats.TDigest; +import cz.o2.proxima.util.Pair; import java.util.Arrays; -import java.util.Random; /** * An approximation of 1st, 10th, 30th, 50th, 70th, 90th and 99th percentile. @@ -26,103 +28,92 @@ public class ApproxPercentileMetric extends Metric implements ApproxPercentileMetricMXBean { - static final int SUBSAMPLE_SIZE = 500; - public static ApproxPercentileMetric of(String group, String name) { - return new ApproxPercentileMetric(group, name); + + /** + * Construct the metric. + * @param group group name + * @param name metric name + * @param duration total duration of the statistic in ms + * @param window windowNs size in ms + * @return + */ + public static ApproxPercentileMetric of( + String group, String name, long duration, long window) { + + return new ApproxPercentileMetric(group, name, duration, window); } - final Random random = new Random(); - final double[] capture = new double[SUBSAMPLE_SIZE]; - final int[] generations = new int[SUBSAMPLE_SIZE]; - int subsamples = 0; + Pair[] digests; + final long windowNs; + final int maxDigests; + int current = 0; - ApproxPercentileMetric(String group, String name) { + /** + * Construct the metric. + * @param group group name + * @param name metric name + * @param duration total duration of the statistic in ms + * @param window window size in ms + */ + ApproxPercentileMetric(String group, String name, long duration, long window) { super(group, name); + Preconditions.checkArgument(window > 0, "Window must be non-zero length"); + this.maxDigests = (int) (duration / window); + Preconditions.checkArgument(maxDigests > 0, "Duration must be at least of length of the window"); + this.windowNs = window * 1_000_000L; + _reset(); + } + + private TDigest createDigest() { + return TDigest.createMergingDigest(100.0); } @Override - public void increment(double d) { - if (subsamples < SUBSAMPLE_SIZE) { - // capture all data elements, until the array is full - synchronized (this) { - generations[subsamples] = 1; - capture[subsamples++] = d; - if (subsamples == SUBSAMPLE_SIZE) { - Arrays.sort(capture); - } - } - } else { - synchronized (this) { - int remove = random.nextInt(capture.length); - double removed = capture[remove]; - double w = 1.0 / generations[remove]; - d = (d + removed * w) / (1 + w); - int search = Arrays.binarySearch(capture, d); - int insert = search; - if (search < 0) { - insert = - (search + 1); - } - if (insert < remove) { - System.arraycopy(capture, insert, capture, insert + 1, remove - insert); - } else if (insert != remove) { - insert--; - int toMove = insert - remove; - if (toMove > 0) { - System.arraycopy(capture, remove + 1, capture, remove, toMove); - } - } - capture[insert] = d; - generations[insert] = 0; - } - for (int i = 0; i < SUBSAMPLE_SIZE; i++) { - generations[i]++; - } + public synchronized void increment(double d) { + if (System.nanoTime() - digests[current].getSecond() > windowNs) { + addDigest(); } + digests[current].getFirst().add(d); } @Override public synchronized Stats getValue() { - // calculate the stastics from the capture array - if (subsamples >= SUBSAMPLE_SIZE) { - return new Stats(new double[] { - capture[5], - capture[50], - capture[150], - capture[250], - capture[350], - capture[450], - capture[495] - }); - } - // not yet properly sampled - // we need to copy the array and sort it - if (subsamples > 50) { - double[] sorted = Arrays.copyOf(capture, subsamples); - Arrays.sort(sorted); - int ratio = sorted.length / 10; - return new Stats(new double[] { - sorted[ratio / 10], - sorted[ratio], - sorted[3 * ratio], - sorted[5 * ratio], - sorted[7 * ratio], - sorted[9 * ratio], - sorted[99 * ratio / 10] - }); - } - // zeros, don't mess the stats with too fuzzy data - return new Stats(new double[7]); + TDigest result = createDigest(); + Arrays.stream(digests).forEach(p -> result.add(p.getFirst())); + return new Stats(new double[] { + result.quantile(0.01), + result.quantile(0.1), + result.quantile(0.3), + result.quantile(0.5), + result.quantile(0.7), + result.quantile(0.9), + result.quantile(0.99), + }); } @Override public synchronized void reset() { - subsamples = 0; - for (int i = 0; i < capture.length; i++) { - capture[i] = 0; + _reset(); + } + + private void addDigest() { + if (current + 1 < maxDigests) { + digests[++current] = Pair.of(createDigest(), System.nanoTime()); + } else { + // move the array + System.arraycopy(digests, 1, digests, 0, maxDigests - 1); + digests[current] = Pair.of(createDigest(), System.nanoTime()); } } + @SuppressWarnings("unchecked") + private synchronized void _reset() { + this.digests = new Pair[maxDigests]; + this.digests[0] = Pair.of(createDigest(), System.nanoTime()); + this.current = 0; + } + } diff --git a/core/src/test/java/cz/o2/proxima/metrics/ApproxPercentileMetricTest.java b/core/src/test/java/cz/o2/proxima/metrics/ApproxPercentileMetricTest.java index afaf210e8..dbfa804f0 100644 --- a/core/src/test/java/cz/o2/proxima/metrics/ApproxPercentileMetricTest.java +++ b/core/src/test/java/cz/o2/proxima/metrics/ApproxPercentileMetricTest.java @@ -18,6 +18,7 @@ package cz.o2.proxima.metrics; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; @@ -26,19 +27,16 @@ */ public class ApproxPercentileMetricTest { - private Random random = new Random(); + private final Random random = new Random(); @Test public void testMetric() { - ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test"); + ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test", 60000, 60000); // put in some uniformly distributed values in range 0..999 - int count = 10000000; + int count = 100000; for (int i = 0; i < count; i++) { int r = random.nextInt(1000); m.increment(r); - if (m.subsamples == ApproxPercentileMetric.SUBSAMPLE_SIZE && i % 1000 == 0) { - checkSorted(m.capture); - } } double[] result = m.getValue().getRaw(); assertTrue("Invalid value, got " + result[0], result[0] < 30); @@ -52,18 +50,15 @@ public void testMetric() { @Test public void testMetricWhenDistributionUpdates() { - ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test"); + ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test", 60000, 60000); // put in some uniformly distributed values in range 0..999 final int k = 2000; for (int i = 0; i < k; i++) { m.increment(random.nextInt(1000)); } // change the distribution and put in values distributed in 0..99 - for (int i = 0; i < 50000 * k; i++) { + for (int i = 0; i < 1000 * k; i++) { m.increment(random.nextInt(100)); - if (m.subsamples == ApproxPercentileMetric.SUBSAMPLE_SIZE && i % 1000 == 0) { - checkSorted(m.capture); - } } double[] result = m.getValue().getRaw(); assertTrue("Invalid value, got " + result[0], result[0] < 3); @@ -75,12 +70,20 @@ public void testMetricWhenDistributionUpdates() { assertTrue("Invalid value, got " + result[6], 95 < result[6]); } - private void checkSorted(double[] capture) { - for (int i = 1; i < capture.length - 1; i++) { - if (capture[i - 1] > capture[i]) { - fail("Array is not properly sorted"); + @Test + public void testWindowing() throws InterruptedException { + ApproxPercentileMetric m = new ApproxPercentileMetric("test", "test", 1000, 100); + // put 20-times 10 values and wait for 100 ms + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + m.increment(10 * i + j); } + TimeUnit.MILLISECONDS.sleep(100); } + assertEquals( + 150.0, + m.getValue().get50(), + 1.0); } } diff --git a/server/src/main/java/cz/o2/proxima/server/metrics/Metrics.java b/server/src/main/java/cz/o2/proxima/server/metrics/Metrics.java index 8a03a3c6f..e713aa464 100644 --- a/server/src/main/java/cz/o2/proxima/server/metrics/Metrics.java +++ b/server/src/main/java/cz/o2/proxima/server/metrics/Metrics.java @@ -20,6 +20,7 @@ import cz.o2.proxima.metrics.Metric; import cz.o2.proxima.metrics.TimeAveragingMetric; import java.lang.management.ManagementFactory; +import java.time.Duration; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -37,7 +38,8 @@ public class Metrics { GROUP, "ingest-bulk", 1_000); public static final ApproxPercentileMetric BULK_SIZE = ApproxPercentileMetric.of( - GROUP, "bulk-size"); + GROUP, "bulk-size", Duration.ofHours(1).toMillis(), + Duration.ofMinutes(5).toMillis()); public static final Metric INGESTS = TimeAveragingMetric.of( GROUP, "ingests", 1_000);