From 451447aca8c4717214314d6e4d777b30ea364a94 Mon Sep 17 00:00:00 2001 From: Ville Koskela Date: Wed, 11 Sep 2019 11:24:32 -0700 Subject: [PATCH 1/2] Add support for aggregation pre-aggregated values (e.g. from the ISM V3 protocol). --- .../arpnetworking/metrics/mad/Aggregator.java | 6 +- .../com/arpnetworking/metrics/mad/Bucket.java | 22 +-- .../metrics/mad/model/DefaultMetric.java | 13 +- .../metrics/mad/model/Metric.java | 7 +- .../mad/model/statistics/Accumulator.java | 15 ++ .../mad/model/statistics/CountStatistic.java | 5 + .../model/statistics/HistogramStatistic.java | 21 ++ .../mad/model/statistics/MaxStatistic.java | 5 + .../mad/model/statistics/MinStatistic.java | 5 + .../mad/model/statistics/SumStatistic.java | 5 + .../mad/parsers/ProtobufV3ToRecordParser.java | 134 +++++++------ .../metrics/mad/sources/MappingSource.java | 19 +- .../tsdcore/model/CalculatedValue.java | 2 +- .../arpnetworking/metrics/mad/BucketTest.java | 116 ++++++++++- .../model/statistics/CountStatisticTest.java | 34 +++- .../model/statistics/MaxStatisticTest.java | 34 +++- .../model/statistics/MinStatisticTest.java | 32 +++ .../model/statistics/SumStatisticTest.java | 34 +++- .../parsers/ProtobufV3ToRecordParserTest.java | 182 ++++++------------ 19 files changed, 474 insertions(+), 217 deletions(-) diff --git a/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java b/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java index 54db7dd9..d83c4b83 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java +++ b/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java @@ -220,11 +220,7 @@ public Optional> load(final String metric) throws Except @Override public Optional> load(final String metric) throws Exception { final Optional> statistics = _cachedSpecifiedStatistics.get(metric); - if (statistics.isPresent()) { - return Optional.of(computeDependentStatistics(statistics.get())); - } else { - return Optional.empty(); - } + return statistics.map(statisticImmutableSet -> computeDependentStatistics(statisticImmutableSet)); } }); } diff --git a/src/main/java/com/arpnetworking/metrics/mad/Bucket.java b/src/main/java/com/arpnetworking/metrics/mad/Bucket.java index b96ac780..79aa9553 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/Bucket.java +++ b/src/main/java/com/arpnetworking/metrics/mad/Bucket.java @@ -35,6 +35,7 @@ import com.arpnetworking.tsdcore.model.PeriodicData; import com.arpnetworking.tsdcore.sinks.Sink; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -76,16 +77,6 @@ public void close() { computeStatistics(_gaugeMetricCalculators, _specifiedGaugeStatistics, data); computeStatistics(_timerMetricCalculators, _specifiedTimerStatistics, data); computeStatistics(_explicitMetricCalculators, _specifiedStatisticsCache, data); - // TODO(vkoskela): Perform expression evaluation here. [NEXT] - // -> This still requires realizing and indexing the computed aggregated data - // in order to feed the expression evaluation. Once the filtering is consolidated - // we can probably just build a map here and then do one copy into immutable form - // in the PeriodicData. This becomes feasible with consolidated filtering because - // fewer copies (e.g. none) are made downstream. - // TODO(vkoskela): Perform alert evaluation here. [NEXT] - // -> This requires expressions. Otherwise, it's just a matter of changing the - // alerts abstraction from a Sink to something more appropriate and hooking it in - // here. final PeriodicData periodicData = ThreadLocalBuilder.build( PeriodicData.Builder.class, b -> b.setData(data.build()) @@ -114,10 +105,10 @@ public void add(final Record record) { final String name = entry.getKey(); final Metric metric = entry.getValue(); - if (metric.getValues().isEmpty()) { + if (metric.getValues().isEmpty() && metric.getStatistics().isEmpty()) { LOGGER.debug() .setMessage("Discarding metric") - .addData("reason", "no samples") + .addData("reason", "no samples or statistics") .addData("name", name) .addData("metric", metric) .log(); @@ -320,14 +311,18 @@ private void addMetric( return; } - // Add the value to any accumulators + // Add the metric data to any accumulators for (final Calculator calculator : calculators) { + final Statistic statistic = calculator.getStatistic(); if (calculator instanceof Accumulator) { final Accumulator accumulator = (Accumulator) calculator; synchronized (accumulator) { for (final Quantity quantity : metric.getValues()) { accumulator.accumulate(quantity); } + for (final CalculatedValue value : metric.getStatistics().getOrDefault(statistic, ImmutableList.of())) { + accumulator.accumulateAny(value); + } } } } @@ -351,6 +346,7 @@ private Collection> getOrCreateCalculators( newCalculators.add(statistic.createCalculator()); } newCalculators.add(COUNT_STATISTIC.createCalculator()); + calculators = calculatorsByMetric.putIfAbsent(name, newCalculators); if (calculators == null) { calculators = newCalculators; diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/DefaultMetric.java b/src/main/java/com/arpnetworking/metrics/mad/model/DefaultMetric.java index b60fdfed..8c8f28a8 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/DefaultMetric.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/DefaultMetric.java @@ -17,10 +17,13 @@ import com.arpnetworking.commons.builder.ThreadLocalBuilder; import com.arpnetworking.logback.annotations.LogValue; +import com.arpnetworking.metrics.mad.model.statistics.Statistic; import com.arpnetworking.steno.LogValueMapFactory; +import com.arpnetworking.tsdcore.model.CalculatedValue; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import net.sf.oval.constraint.NotNull; import java.util.List; @@ -44,7 +47,7 @@ public List getValues() { } @Override - public List getStatistics() { + public ImmutableMap>> getStatistics() { return _statistics; } @@ -106,7 +109,7 @@ private DefaultMetric(final Builder builder) { private final MetricType _type; private final ImmutableList _values; - private final ImmutableList _statistics; + private final ImmutableMap>> _statistics; /** * Implementation of builder pattern for {@link DefaultMetric}. @@ -123,12 +126,12 @@ public Builder() { } /** - * The statistics {@code List}. Cannot be null. + * The statistics {@code Map}. Cannot be null. * * @param value The values {@code List}. * @return This instance of {@link Builder}. */ - public Builder setStatistics(final ImmutableList value) { + public Builder setStatistics(final ImmutableMap>> value) { _statistics = value; return this; } @@ -162,7 +165,7 @@ protected void reset() { } @NotNull - private ImmutableList _statistics = ImmutableList.of(); + private ImmutableMap>> _statistics = ImmutableMap.of(); @NotNull private ImmutableList _values = ImmutableList.of(); @NotNull diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/Metric.java b/src/main/java/com/arpnetworking/metrics/mad/model/Metric.java index 7c87f148..2ecd2ea4 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/Metric.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/Metric.java @@ -15,6 +15,11 @@ */ package com.arpnetworking.metrics.mad.model; +import com.arpnetworking.metrics.mad.model.statistics.Statistic; +import com.arpnetworking.tsdcore.model.CalculatedValue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + import java.util.List; /** @@ -43,5 +48,5 @@ public interface Metric { * * @return The collected statistical data. */ - List getStatistics(); + ImmutableMap>> getStatistics(); } diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/Accumulator.java b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/Accumulator.java index 725647fa..712caf46 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/Accumulator.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/Accumulator.java @@ -49,4 +49,19 @@ public interface Accumulator extends Calculator { * @return This Accumulator. */ Accumulator accumulate(CalculatedValue calculatedValue); + + /** + * Add the specified CalculatedValue to the accumulated value. The + * CalculatedValue was produced by this Accumulator in + * a different context. For example, for a different time period or a different + * host. It is permissible to mix calls to accumulate with Quantity + * and CalculatedValue. + * + * If the CalculatedValue's supporting data is of an unsupported + * type then an IllegaglArgumentException will be thrown. + * + * @param calculatedValue The CalculatedValue to include in the accumulated value. + * @return This Accumulator. + */ + Accumulator accumulateAny(CalculatedValue calculatedValue); } diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/CountStatistic.java b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/CountStatistic.java index 7379a723..d85c9186 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/CountStatistic.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/CountStatistic.java @@ -76,6 +76,11 @@ public Accumulator accumulate(final Quantity quantity) { @Override public Accumulator accumulate(final CalculatedValue calculatedValue) { + return accumulateAny(calculatedValue); + } + + @Override + public Accumulator accumulateAny(final CalculatedValue calculatedValue) { _count += calculatedValue.getValue().getValue(); return this; } diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/HistogramStatistic.java b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/HistogramStatistic.java index 356ca35d..86eaa8a2 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/HistogramStatistic.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/HistogramStatistic.java @@ -96,6 +96,27 @@ public Accumulator accumulate(final CalculatedValue accumulateAny(final CalculatedValue calculatedValue) { + if (calculatedValue.getData() == null) { + throw new IllegalArgumentException( + String.format( + "Null calculated value data for %s", + this.getClass())); + } + if (!(calculatedValue.getData() instanceof HistogramSupportingData)) { + throw new IllegalArgumentException( + String.format( + "Unsupported calculated value data type %s for %s", + calculatedValue.getData().getClass(), + this.getClass())); + } + @SuppressWarnings("unchecked") + final CalculatedValue checkedCalculatedValue = + (CalculatedValue) calculatedValue; + return accumulate(checkedCalculatedValue); + } + @Override public CalculatedValue calculate(final Map> dependencies) { return ThreadLocalBuilder.< diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatistic.java b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatistic.java index 8eb67775..6e0e81d1 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatistic.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatistic.java @@ -94,6 +94,11 @@ public Accumulator accumulate(final Quantity quantity) { @Override public Accumulator accumulate(final CalculatedValue calculatedValue) { + return accumulateAny(calculatedValue); + } + + @Override + public Accumulator accumulateAny(final CalculatedValue calculatedValue) { return accumulate(calculatedValue.getValue()); } diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MinStatistic.java b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MinStatistic.java index ffbb818d..3d62789d 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MinStatistic.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/MinStatistic.java @@ -92,6 +92,11 @@ public Accumulator accumulate(final Quantity quantity) { @Override public Accumulator accumulate(final CalculatedValue calculatedValue) { + return accumulateAny(calculatedValue); + } + + @Override + public Accumulator accumulateAny(final CalculatedValue calculatedValue) { return accumulate(calculatedValue.getValue()); } diff --git a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/SumStatistic.java b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/SumStatistic.java index 9328e6cd..24b75e8b 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/model/statistics/SumStatistic.java +++ b/src/main/java/com/arpnetworking/metrics/mad/model/statistics/SumStatistic.java @@ -77,6 +77,11 @@ public Accumulator accumulate(final Quantity quantity) { @Override public Accumulator accumulate(final CalculatedValue calculatedValue) { + return accumulateAny(calculatedValue); + } + + @Override + public Accumulator accumulateAny(final CalculatedValue calculatedValue) { return accumulate(calculatedValue.getValue()); } diff --git a/src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParser.java b/src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParser.java index f9bd1038..aef31a69 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParser.java +++ b/src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParser.java @@ -18,7 +18,6 @@ import com.arpnetworking.commons.builder.ThreadLocalBuilder; import com.arpnetworking.metrics.common.parsers.Parser; import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException; -import com.arpnetworking.metrics.mad.model.AggregatedData; import com.arpnetworking.metrics.mad.model.DefaultMetric; import com.arpnetworking.metrics.mad.model.DefaultQuantity; import com.arpnetworking.metrics.mad.model.DefaultRecord; @@ -28,7 +27,9 @@ import com.arpnetworking.metrics.mad.model.Quantity; import com.arpnetworking.metrics.mad.model.Record; import com.arpnetworking.metrics.mad.model.statistics.HistogramStatistic; +import com.arpnetworking.metrics.mad.model.statistics.Statistic; import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory; +import com.arpnetworking.tsdcore.model.CalculatedValue; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -150,73 +151,72 @@ private void parseNumericalStatistics( metricData.put(metricName, metricDatum); } + final Map> statistics = Maps.newHashMap(); final long populationSize = augmentedHistogram.getEntriesList().stream() .map(e -> (long) e.getCount()) .reduce(Long::sum) .orElse(0L); - final ImmutableList.Builder statistics = ImmutableList.builder(); - statistics.add(ThreadLocalBuilder.build(AggregatedData.Builder.class, bldr -> - bldr.setStatistic(STATISTIC_FACTORY.getStatistic("min")) - .setIsSpecified(false) - .setValue(ThreadLocalBuilder.build( - DefaultQuantity.Builder.class, - b -> b.setValue(augmentedHistogram.getMin()))) - .setPopulationSize(populationSize))); + statistics.put( + STATISTIC_FACTORY.getStatistic("min"), + ThreadLocalBuilder., CalculatedValue.Builder>buildGeneric( + CalculatedValue.Builder.class, + b1 -> b1.setValue( + ThreadLocalBuilder.build( + DefaultQuantity.Builder.class, + b2 -> b2.setValue(augmentedHistogram.getMin()))))); - statistics.add(ThreadLocalBuilder.build(AggregatedData.Builder.class, bldr -> - bldr.setStatistic(STATISTIC_FACTORY.getStatistic("max")) - .setIsSpecified(false) - .setValue(ThreadLocalBuilder.build( - DefaultQuantity.Builder.class, - b -> b.setValue(augmentedHistogram.getMax()))) - .setPopulationSize(populationSize))); + statistics.put( + STATISTIC_FACTORY.getStatistic("max"), + ThreadLocalBuilder., CalculatedValue.Builder>buildGeneric( + CalculatedValue.Builder.class, + b1 -> b1.setValue( + ThreadLocalBuilder.build( + DefaultQuantity.Builder.class, + b2 -> b2.setValue(augmentedHistogram.getMax()))))); - statistics.add(ThreadLocalBuilder.build(AggregatedData.Builder.class, bldr -> - bldr.setStatistic(STATISTIC_FACTORY.getStatistic("count")) - .setIsSpecified(false) - .setValue(ThreadLocalBuilder.build( - DefaultQuantity.Builder.class, - b -> b.setValue((double) populationSize))) - .setPopulationSize(populationSize))); + statistics.put( + STATISTIC_FACTORY.getStatistic("count"), + ThreadLocalBuilder., CalculatedValue.Builder>buildGeneric( + CalculatedValue.Builder.class, + b1 -> b1.setValue( + ThreadLocalBuilder.build( + DefaultQuantity.Builder.class, + b2 -> b2.setValue((double) populationSize))))); - statistics.add(ThreadLocalBuilder.build(AggregatedData.Builder.class, bldr -> - bldr.setStatistic(STATISTIC_FACTORY.getStatistic("sum")) - .setIsSpecified(false) - .setValue(ThreadLocalBuilder.build( - DefaultQuantity.Builder.class, - b -> b.setValue(augmentedHistogram.getSum()))) - .setPopulationSize(populationSize))); + statistics.put( + STATISTIC_FACTORY.getStatistic("sum"), + ThreadLocalBuilder., CalculatedValue.Builder>buildGeneric( + CalculatedValue.Builder.class, + b1 -> b1.setValue( + ThreadLocalBuilder.build( + DefaultQuantity.Builder.class, + b2 -> b2.setValue(augmentedHistogram.getSum()))))); - if (populationSize != 0) { - statistics.add(ThreadLocalBuilder.build(AggregatedData.Builder.class, bldr -> - bldr.setStatistic(STATISTIC_FACTORY.getStatistic("mean")) - .setIsSpecified(false) - .setValue(ThreadLocalBuilder.build( - DefaultQuantity.Builder.class, - b -> b.setValue(augmentedHistogram.getSum() / populationSize))) - .setPopulationSize(populationSize))); - } - - statistics.add(ThreadLocalBuilder.build(AggregatedData.Builder.class, bldr -> - bldr.setStatistic(STATISTIC_FACTORY.getStatistic("histogram")) - .setIsSpecified(false) - .setValue(ThreadLocalBuilder.build( - DefaultQuantity.Builder.class, - b -> b.setValue(1.0))) - .setPopulationSize(populationSize) - .setSupportingData(ThreadLocalBuilder.build( - HistogramStatistic.HistogramSupportingData.Builder.class, - b -> { - final HistogramStatistic.Histogram histogram = new HistogramStatistic.Histogram(); - augmentedHistogram.getEntriesList().forEach( - e -> histogram.recordPacked( - e.getBucket(), - e.getCount())); - b.setHistogramSnapshot(histogram.getSnapshot()); - })))); + statistics.put( + STATISTIC_FACTORY.getStatistic("histogram"), + // CHECKSTYLE.OFF: LineLength - Generic specification required for buildGeneric + ThreadLocalBuilder., CalculatedValue.Builder>buildGeneric( + // CHECKSTYLE.ON: LineLength + CalculatedValue.Builder.class, + b1 -> b1.setValue( + ThreadLocalBuilder.build( + DefaultQuantity.Builder.class, + b2 -> b2.setValue(1.0))) + .setData( + ThreadLocalBuilder.build( + HistogramStatistic.HistogramSupportingData.Builder.class, + b3 -> { + final HistogramStatistic.Histogram histogram = + new HistogramStatistic.Histogram(); + augmentedHistogram.getEntriesList().forEach( + e -> histogram.recordPacked( + e.getBucket(), + e.getCount())); + b3.setHistogramSnapshot(histogram.getSnapshot()); + })))); - metricDatum.addStatistics(statistics.build()); + metricDatum.addStatistics(statistics); } private ImmutableMap buildDimensions(final ClientV3.Record record) { @@ -255,21 +255,29 @@ void addSamples(final Collection samples) { _metricSamples.addAll(samples); } - void addStatistics(final Collection statistics) { - _metricStatistics.addAll(statistics); + void addStatistics(final Map> statistics) { + for (final Map.Entry> entry : statistics.entrySet()) { + final Statistic statistic = entry.getKey(); + final ImmutableList.Builder> calculatedValues = + _metricStatistics.computeIfAbsent(statistic, k -> ImmutableList.builder()); + calculatedValues.add(entry.getValue()); + } } ImmutableList getSamples() { return _metricSamples.build(); } - ImmutableList getStatistics() { - return _metricStatistics.build(); + ImmutableMap>> getStatistics() { + return _metricStatistics.entrySet().stream() + .collect(ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> entry.getValue().build())); } MetricData() {} private final ImmutableList.Builder _metricSamples = ImmutableList.builder(); - private final ImmutableList.Builder _metricStatistics = ImmutableList.builder(); + private final Map>> _metricStatistics = Maps.newHashMap(); } } diff --git a/src/main/java/com/arpnetworking/metrics/mad/sources/MappingSource.java b/src/main/java/com/arpnetworking/metrics/mad/sources/MappingSource.java index 20c93d9f..d3915603 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/sources/MappingSource.java +++ b/src/main/java/com/arpnetworking/metrics/mad/sources/MappingSource.java @@ -21,16 +21,17 @@ import com.arpnetworking.logback.annotations.LogValue; import com.arpnetworking.metrics.common.sources.BaseSource; import com.arpnetworking.metrics.common.sources.Source; -import com.arpnetworking.metrics.mad.model.AggregatedData; import com.arpnetworking.metrics.mad.model.DefaultMetric; import com.arpnetworking.metrics.mad.model.DefaultRecord; import com.arpnetworking.metrics.mad.model.Metric; import com.arpnetworking.metrics.mad.model.MetricType; import com.arpnetworking.metrics.mad.model.Quantity; import com.arpnetworking.metrics.mad.model.Record; +import com.arpnetworking.metrics.mad.model.statistics.Statistic; import com.arpnetworking.steno.LogValueMapFactory; import com.arpnetworking.steno.Logger; import com.arpnetworking.steno.LoggerFactory; +import com.arpnetworking.tsdcore.model.CalculatedValue; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -182,7 +183,9 @@ private void merge(final Metric metric, final String key, final Map>> entry : metric.getStatistics().entrySet()) { + _statistics.computeIfAbsent(entry.getKey(), k-> ImmutableList.builder()).addAll(entry.getValue()); + } } public boolean isMergable(final Metric metric) { @@ -194,6 +197,9 @@ public void merge(final Metric metric) { throw new IllegalArgumentException(String.format("Metric cannot be merged; metric=%s", metric)); } _values.addAll(metric.getValues()); + for (final Map.Entry>> entry : metric.getStatistics().entrySet()) { + _statistics.computeIfAbsent(entry.getKey(), k-> ImmutableList.builder()).addAll(entry.getValue()); + } } @Override @@ -207,8 +213,11 @@ public ImmutableList getValues() { } @Override - public ImmutableList getStatistics() { - return _statistics.build(); + public ImmutableMap>> getStatistics() { + return _statistics.entrySet().stream().collect( + ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> entry.getValue().build())); } @Override @@ -222,7 +231,7 @@ public String toString() { private final MetricType _type; private final ImmutableList.Builder _values = ImmutableList.builder(); - private final ImmutableList.Builder _statistics = ImmutableList.builder(); + private final Map>> _statistics = Maps.newHashMap(); } /** diff --git a/src/main/java/com/arpnetworking/tsdcore/model/CalculatedValue.java b/src/main/java/com/arpnetworking/tsdcore/model/CalculatedValue.java index 01a6f853..c4dee1ae 100644 --- a/src/main/java/com/arpnetworking/tsdcore/model/CalculatedValue.java +++ b/src/main/java/com/arpnetworking/tsdcore/model/CalculatedValue.java @@ -69,7 +69,7 @@ public Builder setValue(final Quantity value) { } /** - * Set the data. Optional. Cannot be null. Defaults to empty list. + * Set the data. Optional. Defaults to null. * * @param data The data. * @return This Builder instance. diff --git a/src/test/java/com/arpnetworking/metrics/mad/BucketTest.java b/src/test/java/com/arpnetworking/metrics/mad/BucketTest.java index 3e5fa4b9..e6dcceba 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/BucketTest.java +++ b/src/test/java/com/arpnetworking/metrics/mad/BucketTest.java @@ -15,6 +15,7 @@ */ package com.arpnetworking.metrics.mad; +import com.arpnetworking.commons.builder.ThreadLocalBuilder; import com.arpnetworking.metrics.mad.model.AggregatedData; import com.arpnetworking.metrics.mad.model.DefaultMetric; import com.arpnetworking.metrics.mad.model.DefaultQuantity; @@ -24,6 +25,7 @@ import com.arpnetworking.metrics.mad.model.Unit; import com.arpnetworking.metrics.mad.model.statistics.Statistic; import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory; +import com.arpnetworking.tsdcore.model.CalculatedValue; import com.arpnetworking.tsdcore.model.DefaultKey; import com.arpnetworking.tsdcore.model.Key; import com.arpnetworking.tsdcore.model.PeriodicData; @@ -206,6 +208,72 @@ public void testTimer() { .build())); } + @Test + public void testCalculatedValues() { + _bucket.add( + new DefaultRecord.Builder() + .setTime(START) + .setDimensions( + ImmutableMap.of( + Key.HOST_DIMENSION_KEY, "MyHost", + Key.SERVICE_DIMENSION_KEY, "MyService", + Key.CLUSTER_DIMENSION_KEY, "MyCluster")) + .setId(UUID.randomUUID().toString()) + .setMetrics(ImmutableMap.of( + "testCalculatedValues/MyMetric", + new DefaultMetric.Builder() + .setType(MetricType.GAUGE) + .setStatistics(ImmutableMap.of( + STATISTIC_FACTORY.getStatistic("min"), cvl(1.0, 2.0), + STATISTIC_FACTORY.getStatistic("max"), cvl(99.0, 100.0), + STATISTIC_FACTORY.getStatistic("count"), cvl(2.0, 3.0), + STATISTIC_FACTORY.getStatistic("sum"), cvl(252.0) + )) + .build())) + .build()); + _bucket.close(); + + final ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(PeriodicData.class); + Mockito.verify(_sink).recordAggregateData(dataCaptor.capture()); + + final ImmutableMultimap data = dataCaptor.getValue().getData(); + Assert.assertEquals(5, data.size()); + + Assert.assertThat( + data.get("testCalculatedValues/MyMetric"), + Matchers.containsInAnyOrder( + new AggregatedData.Builder() + .setIsSpecified(true) + .setPopulationSize(5L) + .setStatistic(COUNT_STATISTIC) + .setValue(q(5)) + .build(), + new AggregatedData.Builder() + .setIsSpecified(false) + .setPopulationSize(5L) + .setStatistic(SUM_STATISTIC) + .setValue(q(252)) + .build(), + new AggregatedData.Builder() + .setIsSpecified(true) + .setPopulationSize(5L) + .setStatistic(MEAN_STATISTIC) + .setValue(q(50.4)) + .build(), + new AggregatedData.Builder() + .setIsSpecified(true) + .setPopulationSize(5L) + .setStatistic(MAX_STATISTIC) + .setValue(q(100)) + .build(), + new AggregatedData.Builder() + .setIsSpecified(true) + .setPopulationSize(5L) + .setStatistic(MIN_STATISTIC) + .setValue(q(1)) + .build())); + } + @Test public void testToString() { final String asString = new Bucket.Builder() @@ -250,13 +318,33 @@ private void addData(final String name, final MetricType type, final Quantity va .build()); } + private static ImmutableList> cvl(final double... valueArray) { + final ImmutableList.Builder> calculatedValues = ImmutableList.builder(); + for (final double value : valueArray) { + calculatedValues.add(cv(value)); + } + return calculatedValues.build(); + } + + private static CalculatedValue cv(final double value) { + return ThreadLocalBuilder., CalculatedValue.Builder>buildGeneric( + CalculatedValue.Builder.class, + b1 -> b1.setValue(q(value))); + } + + private static Quantity q(final double value) { + return ThreadLocalBuilder.build( + DefaultQuantity.Builder.class, + b2 -> b2.setValue(value)); + } + private Bucket _bucket; private LoadingCache>> _specifiedStatsCache = CacheBuilder.newBuilder() - .build(new AbsentStatisticCacheLoader()); + .build(new TestSpecifiedCacheLoader()); private LoadingCache>> _dependentStatsCache = CacheBuilder.newBuilder() - .build(new AbsentStatisticCacheLoader()); + .build(new TestDependentCacheLoader()); @Mock private Sink _sink; @@ -279,9 +367,31 @@ private void addData(final String name, final MetricType type, final Quantity va private static final Statistic SUM_STATISTIC = STATISTIC_FACTORY.getStatistic("sum"); private static final Statistic COUNT_STATISTIC = STATISTIC_FACTORY.getStatistic("count"); - private static final class AbsentStatisticCacheLoader extends CacheLoader>> { + private static final class TestSpecifiedCacheLoader extends CacheLoader>> { + @Override + public Optional> load(@Nullable final String key) { + if ("testCalculatedValues/MyMetric".equals(key)) { + return Optional.of( + ImmutableSet.of( + STATISTIC_FACTORY.getStatistic("min"), + STATISTIC_FACTORY.getStatistic("max"), + STATISTIC_FACTORY.getStatistic("count"), + STATISTIC_FACTORY.getStatistic("mean") + )); + } + return Optional.empty(); + } + } + + private static final class TestDependentCacheLoader extends CacheLoader>> { @Override public Optional> load(@Nullable final String key) { + if ("testCalculatedValues/MyMetric".equals(key)) { + return Optional.of( + ImmutableSet.of( + STATISTIC_FACTORY.getStatistic("sum") + )); + } return Optional.empty(); } } diff --git a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/CountStatisticTest.java b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/CountStatisticTest.java index e72caaa4..be4b6508 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/CountStatisticTest.java +++ b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/CountStatisticTest.java @@ -57,7 +57,7 @@ public void testHashCode() { } @Test - public void testAccumulator() { + public void testAccumulatorWithSamples() { final Accumulator accumulator = (Accumulator) COUNT_STATISTIC.createCalculator(); accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); accumulator.accumulate(new DefaultQuantity.Builder().setValue(18d).build()); @@ -66,6 +66,38 @@ public void testAccumulator() { Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(3.0).build()); } + @Test + public void testAccumulatorWithCalculatedValues() { + final Accumulator accumulator = (Accumulator) COUNT_STATISTIC.createCalculator(); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(12d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(15d).build()) + .build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(45.0).build()); + } + + @Test + public void testAccumulatorMixed() { + final Accumulator accumulator = (Accumulator) COUNT_STATISTIC.createCalculator(); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(5d).build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(20.0).build()); + } + private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory(); private static final CountStatistic COUNT_STATISTIC = (CountStatistic) STATISTIC_FACTORY.getStatistic("count"); } diff --git a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatisticTest.java b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatisticTest.java index d01f1b09..ade1bdfd 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatisticTest.java +++ b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatisticTest.java @@ -47,7 +47,7 @@ public void testHashCode() { } @Test - public void testAccumulator() { + public void testAccumulatorWithSamples() { final Accumulator accumulator = (Accumulator) MAX_STATISTIC.createCalculator(); accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); accumulator.accumulate(new DefaultQuantity.Builder().setValue(18d).build()); @@ -56,6 +56,38 @@ public void testAccumulator() { Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(18.0).build()); } + @Test + public void testAccumulatorWithCalculatedValues() { + final Accumulator accumulator = (Accumulator) MAX_STATISTIC.createCalculator(); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(12d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(5d).build()) + .build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(18.0).build()); + } + + @Test + public void testAccumulatorMixed() { + final Accumulator accumulator = (Accumulator) MAX_STATISTIC.createCalculator(); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(5d).build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(18.0).build()); + } + private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory(); private static final MaxStatistic MAX_STATISTIC = (MaxStatistic) STATISTIC_FACTORY.getStatistic("max"); } diff --git a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MinStatisticTest.java b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MinStatisticTest.java index 0b32d5e1..0c147c79 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MinStatisticTest.java +++ b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/MinStatisticTest.java @@ -56,6 +56,38 @@ public void testAccumulator() { Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(5.0).build()); } + @Test + public void testAccumulatorWithCalculatedValues() { + final Accumulator accumulator = (Accumulator) MIN_STATISTIC.createCalculator(); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(12d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(5d).build()) + .build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(5.0).build()); + } + + @Test + public void testAccumulatorMixed() { + final Accumulator accumulator = (Accumulator) MIN_STATISTIC.createCalculator(); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(5d).build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(5.0).build()); + } + private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory(); private static final MinStatistic MIN_STATISTIC = (MinStatistic) STATISTIC_FACTORY.getStatistic("min"); } diff --git a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/SumStatisticTest.java b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/SumStatisticTest.java index 42bd644c..ab0e17e6 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/model/statistics/SumStatisticTest.java +++ b/src/test/java/com/arpnetworking/metrics/mad/model/statistics/SumStatisticTest.java @@ -49,7 +49,7 @@ public void testHashCode() { } @Test - public void testAccumulator() { + public void testAccumulatorWithSamples() { final Accumulator accumulator = (Accumulator) SUM_STATISTIC.createCalculator(); accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); accumulator.accumulate(new DefaultQuantity.Builder().setValue(18d).build()); @@ -58,6 +58,38 @@ public void testAccumulator() { Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(35.0).build()); } + @Test + public void testAccumulatorWithCalculatedValues() { + final Accumulator accumulator = (Accumulator) SUM_STATISTIC.createCalculator(); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(12d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(5d).build()) + .build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(35.0).build()); + } + + @Test + public void testAccumulatorMixed() { + final Accumulator accumulator = (Accumulator) SUM_STATISTIC.createCalculator(); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(12d).build()); + accumulator.accumulate( + new CalculatedValue.Builder() + .setValue(new DefaultQuantity.Builder().setValue(18d).build()) + .build()); + accumulator.accumulate(new DefaultQuantity.Builder().setValue(5d).build()); + final CalculatedValue calculated = accumulator.calculate(Collections.emptyMap()); + Assert.assertEquals(calculated.getValue(), new DefaultQuantity.Builder().setValue(35.0).build()); + } + private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory(); private static final SumStatistic SUM_STATISTIC = (SumStatistic) STATISTIC_FACTORY.getStatistic("sum"); } diff --git a/src/test/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParserTest.java b/src/test/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParserTest.java index 87833c4e..d7770648 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParserTest.java +++ b/src/test/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParserTest.java @@ -18,7 +18,6 @@ import akka.util.ByteString; import com.arpnetworking.metrics.common.parsers.Parser; import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException; -import com.arpnetworking.metrics.mad.model.AggregatedData; import com.arpnetworking.metrics.mad.model.DefaultQuantity; import com.arpnetworking.metrics.mad.model.HttpRequest; import com.arpnetworking.metrics.mad.model.Metric; @@ -26,8 +25,10 @@ import com.arpnetworking.metrics.mad.model.Record; import com.arpnetworking.metrics.mad.model.statistics.HistogramStatistic; import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory; +import com.arpnetworking.tsdcore.model.CalculatedValue; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Iterables; import com.google.common.io.Resources; import org.junit.Assert; import org.junit.Test; @@ -60,7 +61,7 @@ public void testParseEmpty() throws ParsingException, IOException { public void testParseSingleRecord() throws ParsingException, IOException { HistogramStatistic.HistogramSupportingData supportingData; HistogramStatistic.HistogramSnapshot histogramSnapshot; - AggregatedData aggregatedData; + CalculatedValue calculatedValue; final UUID uuid = UUID.fromString("142949d2-c0fc-469e-9958-7d2be2c49fa5"); final ZonedDateTime time = ZonedDateTime.ofInstant(Instant.ofEpochMilli(1513239602974L), ZoneOffset.UTC); @@ -94,50 +95,36 @@ public void testParseSingleRecord() throws ParsingException, IOException { Assert.assertNotNull(histogram); Assert.assertEquals(MetricType.GAUGE, histogram.getType()); Assert.assertEquals(0, histogram.getValues().size()); - Assert.assertEquals(6, histogram.getStatistics().size()); + Assert.assertEquals(5, histogram.getStatistics().size()); + for (final List> values : histogram.getStatistics().values()) { + Assert.assertEquals(1, values.size()); + } // Min - aggregatedData = histogram.getStatistics().get(0); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("min"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(9, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = Iterables.getOnlyElement( + histogram.getStatistics().get(STATISTIC_FACTORY.getStatistic("min"))); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Max - aggregatedData = histogram.getStatistics().get(1); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("max"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(5.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(9, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = Iterables.getOnlyElement( + histogram.getStatistics().get(STATISTIC_FACTORY.getStatistic("max"))); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(5.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Count - aggregatedData = histogram.getStatistics().get(2); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("count"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(9.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(9, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = Iterables.getOnlyElement( + histogram.getStatistics().get(STATISTIC_FACTORY.getStatistic("count"))); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(9.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Sum - aggregatedData = histogram.getStatistics().get(3); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("sum"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(27.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(9, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); - // Mean - aggregatedData = histogram.getStatistics().get(4); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("mean"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(9, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = Iterables.getOnlyElement( + histogram.getStatistics().get(STATISTIC_FACTORY.getStatistic("sum"))); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(27.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Histogram - aggregatedData = histogram.getStatistics().get(5); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("histogram"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(9, aggregatedData.getPopulationSize()); - Assert.assertTrue(aggregatedData.getSupportingData() instanceof HistogramStatistic.HistogramSupportingData); - supportingData = (HistogramStatistic.HistogramSupportingData) aggregatedData.getSupportingData(); + calculatedValue = Iterables.getOnlyElement( + histogram.getStatistics().get(STATISTIC_FACTORY.getStatistic("histogram"))); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), calculatedValue.getValue()); + Assert.assertTrue(calculatedValue.getData() instanceof HistogramStatistic.HistogramSupportingData); + supportingData = (HistogramStatistic.HistogramSupportingData) calculatedValue.getData(); histogramSnapshot = supportingData.getHistogramSnapshot(); Assert.assertFalse(supportingData.getUnit().isPresent()); Assert.assertEquals(9, histogramSnapshot.getEntriesCount()); @@ -156,50 +143,31 @@ public void testParseSingleRecord() throws ParsingException, IOException { Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), combined.getValues().get(1)); Assert.assertEquals(new DefaultQuantity.Builder().setValue(2.0).build(), combined.getValues().get(2)); Assert.assertEquals(new DefaultQuantity.Builder().setValue(4.0).build(), combined.getValues().get(3)); - Assert.assertEquals(12, combined.getStatistics().size()); + Assert.assertEquals(5, combined.getStatistics().size()); + for (final List> values : combined.getStatistics().values()) { + Assert.assertEquals(2, values.size()); + } // Min - aggregatedData = combined.getStatistics().get(0); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("min"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(2.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(4, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("min")).get(0); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(2.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Max - aggregatedData = combined.getStatistics().get(1); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("max"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(5.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(4, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("max")).get(0); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(5.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Count - aggregatedData = combined.getStatistics().get(2); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("count"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(4.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(4, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("count")).get(0); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(4.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Sum - aggregatedData = combined.getStatistics().get(3); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("sum"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(14.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(4, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); - // Mean - aggregatedData = combined.getStatistics().get(4); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("mean"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.5).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(4, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("sum")).get(0); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(14.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Histogram - aggregatedData = combined.getStatistics().get(5); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("histogram"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(4, aggregatedData.getPopulationSize()); - Assert.assertTrue(aggregatedData.getSupportingData() instanceof HistogramStatistic.HistogramSupportingData); - supportingData = (HistogramStatistic.HistogramSupportingData) aggregatedData.getSupportingData(); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("histogram")).get(0); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), calculatedValue.getValue()); + Assert.assertTrue(calculatedValue.getData() instanceof HistogramStatistic.HistogramSupportingData); + supportingData = (HistogramStatistic.HistogramSupportingData) calculatedValue.getData(); histogramSnapshot = supportingData.getHistogramSnapshot(); Assert.assertFalse(supportingData.getUnit().isPresent()); Assert.assertEquals(4, histogramSnapshot.getEntriesCount()); @@ -209,48 +177,26 @@ public void testParseSingleRecord() throws ParsingException, IOException { Assert.assertEquals(1, histogramSnapshot.getValue(4.0)); Assert.assertEquals(1, histogramSnapshot.getValue(5.0)); // Min - aggregatedData = combined.getStatistics().get(6); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("min"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(1, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("min")).get(1); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Max - aggregatedData = combined.getStatistics().get(7); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("max"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(1, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("max")).get(1); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Count - aggregatedData = combined.getStatistics().get(8); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("count"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(1, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("count")).get(1); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Sum - aggregatedData = combined.getStatistics().get(9); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("sum"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(1, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); - // Mean - aggregatedData = combined.getStatistics().get(10); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("mean"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(1, aggregatedData.getPopulationSize()); - Assert.assertNull(aggregatedData.getSupportingData()); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("sum")).get(1); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(3.0).build(), calculatedValue.getValue()); + Assert.assertNull(calculatedValue.getData()); // Histogram - aggregatedData = combined.getStatistics().get(11); - Assert.assertEquals(STATISTIC_FACTORY.getStatistic("histogram"), aggregatedData.getStatistic()); - Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), aggregatedData.getValue()); - Assert.assertFalse(aggregatedData.isSpecified()); - Assert.assertEquals(1, aggregatedData.getPopulationSize()); - Assert.assertTrue(aggregatedData.getSupportingData() instanceof HistogramStatistic.HistogramSupportingData); - supportingData = (HistogramStatistic.HistogramSupportingData) aggregatedData.getSupportingData(); + calculatedValue = combined.getStatistics().get(STATISTIC_FACTORY.getStatistic("histogram")).get(1); + Assert.assertEquals(new DefaultQuantity.Builder().setValue(1.0).build(), calculatedValue.getValue()); + Assert.assertTrue(calculatedValue.getData() instanceof HistogramStatistic.HistogramSupportingData); + supportingData = (HistogramStatistic.HistogramSupportingData) calculatedValue.getData(); histogramSnapshot = supportingData.getHistogramSnapshot(); Assert.assertFalse(supportingData.getUnit().isPresent()); Assert.assertEquals(1, histogramSnapshot.getEntriesCount()); From 8bdfa7b99f67010598cdf002e7b8285961cd746b Mon Sep 17 00:00:00 2001 From: Ville Koskela Date: Mon, 23 Sep 2019 16:02:49 -0700 Subject: [PATCH 2/2] Pre-aggregation integration test. --- .../metrics/mad/integration/TelemetryIT.java | 158 ++++++++++++++++-- 1 file changed, 148 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/arpnetworking/metrics/mad/integration/TelemetryIT.java b/src/test/java/com/arpnetworking/metrics/mad/integration/TelemetryIT.java index 3df28bc4..35a9eea7 100644 --- a/src/test/java/com/arpnetworking/metrics/mad/integration/TelemetryIT.java +++ b/src/test/java/com/arpnetworking/metrics/mad/integration/TelemetryIT.java @@ -17,10 +17,13 @@ import com.arpnetworking.metrics.Metrics; import com.arpnetworking.metrics.MetricsFactory; +import com.arpnetworking.metrics.impl.AugmentedHistogram; +import com.arpnetworking.metrics.impl.TsdMetrics; import com.arpnetworking.metrics.impl.TsdMetricsFactory; import com.arpnetworking.metrics.mad.model.statistics.Statistic; import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory; import com.arpnetworking.test.TelemetryClient; +import com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,8 +46,8 @@ @RunWith(Parameterized.class) public final class TelemetryIT { - public TelemetryIT(final Statistic statistic, final double expectedResult) { - _statistic = statistic; + public TelemetryIT(final String statisticName, final double expectedResult) { + _statistic = STATISTIC_FACTORY.getStatistic(statisticName); _expectedResult = expectedResult; _telemetryClient = TelemetryClient.getInstance(); } @@ -57,15 +60,15 @@ public void setUp() { @Parameterized.Parameters(name = "{0}") public static Collection createParameters() { return Arrays.asList( - new Object[]{STATISTIC_FACTORY.getStatistic("count"), 55.0d}, - new Object[]{STATISTIC_FACTORY.getStatistic("sum"), 423.5d}, - new Object[]{STATISTIC_FACTORY.getStatistic("mean"), 7.7d}, - new Object[]{STATISTIC_FACTORY.getStatistic("max"), 11.0d}, - new Object[]{STATISTIC_FACTORY.getStatistic("min"), 1.1d}); + new Object[]{"count", 55.0d}, + new Object[]{"sum", 423.5d}, + new Object[]{"mean", 7.7d}, + new Object[]{"max", 11.0d}, + new Object[]{"min", 1.1d}); // TODO(ville): enable with mad-2.0 - //new Object[]{STATISTIC_FACTORY.getStatistic("median"), 7.7d}, - //new Object[]{STATISTIC_FACTORY.getStatistic("p25"), 5.5d}, - //new Object[]{STATISTIC_FACTORY.getStatistic("p75"), 9.9d}); + //new Object[]{"median", 7.7d}, + //new Object[]{"p25", 5.5d}, + //new Object[]{"p75", 9.9d}); } @Test @@ -96,6 +99,141 @@ public void testFromSamples() throws InterruptedException, ExecutionException, T } } + + @Test + public void testFromAggregatedData() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture future = new CompletableFuture<>(); + + try { + _telemetryClient.subscribe( + "TelemetryIT", + _metricName, + _statistic, + future::complete); + + sleepToBeginningOfSecond(); + + try (Metrics metrics = METRICS_FACTORY.create()) { + final TsdMetrics tsdMetrics = (TsdMetrics) metrics; + tsdMetrics.recordAggregatedData( + _metricName, + new AugmentedHistogram.Builder() + .setMinimum(1.1) + .setMaximum(5.5) + .setSum(60.5) + .setHistogram(ImmutableMap.of( + 1.1, 1, + 2.2, 2, + 3.3, 3, + 4.4, 4, + 5.5, 5)) + .setPrecision(7) + .build()); + } + try (Metrics metrics = METRICS_FACTORY.create()) { + final TsdMetrics tsdMetrics = (TsdMetrics) metrics; + tsdMetrics.recordAggregatedData( + _metricName, + new AugmentedHistogram.Builder() + .setMinimum(6.6) + .setMaximum(11.0) + .setSum(363.0) + .setHistogram(ImmutableMap.of( + 6.6, 6, + 7.7, 7, + 8.8, 8, + 9.9, 9, + 11.0, 10)) + .setPrecision(7) + .build()); + } + + Assert.assertEquals(_expectedResult, future.get(5, TimeUnit.SECONDS), 0.0001); + } finally { + _telemetryClient.unsubscribe( + "TelemetryIT", + _metricName, + _statistic); + } + } + + @Test + public void testFromMixedSamplesAndAggregatedData() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture future = new CompletableFuture<>(); + + try { + _telemetryClient.subscribe( + "TelemetryIT", + _metricName, + _statistic, + future::complete); + + sleepToBeginningOfSecond(); + + // First mixed samples-aggregates unit of work + try (Metrics metrics = METRICS_FACTORY.create()) { + for (int i = 1; i <= 2; ++i) { + for (int j = 1; j <= i; ++j) { + metrics.setGauge(_metricName, i * 1.1d); + } + } + + final TsdMetrics tsdMetrics = (TsdMetrics) metrics; + tsdMetrics.recordAggregatedData( + _metricName, + new AugmentedHistogram.Builder() + .setMinimum(3.3) + .setMaximum(5.5) + .setSum(55.0) + .setHistogram(ImmutableMap.of( + 3.3, 3, + 4.4, 4, + 5.5, 5)) + .setPrecision(7) + .build()); + } + + // Second mixed samples-aggregates unit of work + try (Metrics metrics = METRICS_FACTORY.create()) { + for (int i = 6; i <= 8; ++i) { + for (int j = 1; j <= i; ++j) { + metrics.setGauge(_metricName, i * 1.1d); + } + } + + final TsdMetrics tsdMetrics = (TsdMetrics) metrics; + tsdMetrics.recordAggregatedData( + _metricName, + new AugmentedHistogram.Builder() + .setMinimum(9.9) + .setMaximum(11.0) + .setSum(199.1) + .setHistogram(ImmutableMap.of( + 9.9, 9, + 11.0, 10)) + .setPrecision(7) + .build()); + } + + Assert.assertEquals(_expectedResult, future.get(5, TimeUnit.SECONDS), 0.0001); + } finally { + _telemetryClient.unsubscribe( + "TelemetryIT", + _metricName, + _statistic); + } + } + + private static void sleepToBeginningOfSecond() { + while (System.currentTimeMillis() % 1000 >= 100) { + try { + Thread.sleep(1); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } + private String _metricName; private final Statistic _statistic;