Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/main/java/com/arpnetworking/metrics/mad/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,7 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
@Override
public Optional<ImmutableSet<Statistic>> load(final String metric) throws Exception {
final Optional<ImmutableSet<Statistic>> statistics = _cachedSpecifiedStatistics.get(metric);
if (statistics.isPresent()) {
return Optional.of(computeDependentStatistics(statistics.get()));
} else {
return Optional.empty();
}
return statistics.map(statisticImmutableSet -> computeDependentStatistics(statisticImmutableSet));
}
});
}
Expand Down
22 changes: 9 additions & 13 deletions src/main/java/com/arpnetworking/metrics/mad/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.arpnetworking.tsdcore.model.PeriodicData;
import com.arpnetworking.tsdcore.sinks.Sink;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -76,16 +77,6 @@ public void close() {
computeStatistics(_gaugeMetricCalculators, _specifiedGaugeStatistics, data);
computeStatistics(_timerMetricCalculators, _specifiedTimerStatistics, data);
computeStatistics(_explicitMetricCalculators, _specifiedStatisticsCache, data);
// TODO(vkoskela): Perform expression evaluation here. [NEXT]
// -> This still requires realizing and indexing the computed aggregated data
// in order to feed the expression evaluation. Once the filtering is consolidated
// we can probably just build a map here and then do one copy into immutable form
// in the PeriodicData. This becomes feasible with consolidated filtering because
// fewer copies (e.g. none) are made downstream.
// TODO(vkoskela): Perform alert evaluation here. [NEXT]
// -> This requires expressions. Otherwise, it's just a matter of changing the
// alerts abstraction from a Sink to something more appropriate and hooking it in
// here.
final PeriodicData periodicData = ThreadLocalBuilder.build(
PeriodicData.Builder.class,
b -> b.setData(data.build())
Expand Down Expand Up @@ -114,10 +105,10 @@ public void add(final Record record) {
final String name = entry.getKey();
final Metric metric = entry.getValue();

if (metric.getValues().isEmpty()) {
if (metric.getValues().isEmpty() && metric.getStatistics().isEmpty()) {
LOGGER.debug()
.setMessage("Discarding metric")
.addData("reason", "no samples")
.addData("reason", "no samples or statistics")
.addData("name", name)
.addData("metric", metric)
.log();
Expand Down Expand Up @@ -320,14 +311,18 @@ private void addMetric(
return;
}

// Add the value to any accumulators
// Add the metric data to any accumulators
for (final Calculator<?> calculator : calculators) {
final Statistic statistic = calculator.getStatistic();
if (calculator instanceof Accumulator) {
final Accumulator<?> accumulator = (Accumulator<?>) calculator;
synchronized (accumulator) {
for (final Quantity quantity : metric.getValues()) {
accumulator.accumulate(quantity);
}
for (final CalculatedValue<?> value : metric.getStatistics().getOrDefault(statistic, ImmutableList.of())) {
accumulator.accumulateAny(value);
}
}
}
}
Expand All @@ -351,6 +346,7 @@ private Collection<Calculator<?>> getOrCreateCalculators(
newCalculators.add(statistic.createCalculator());
}
newCalculators.add(COUNT_STATISTIC.createCalculator());

calculators = calculatorsByMetric.putIfAbsent(name, newCalculators);
if (calculators == null) {
calculators = newCalculators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import com.arpnetworking.commons.builder.ThreadLocalBuilder;
import com.arpnetworking.logback.annotations.LogValue;
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
import com.arpnetworking.steno.LogValueMapFactory;
import com.arpnetworking.tsdcore.model.CalculatedValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import net.sf.oval.constraint.NotNull;

import java.util.List;
Expand All @@ -44,7 +47,7 @@ public List<Quantity> getValues() {
}

@Override
public List<AggregatedData> getStatistics() {
public ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> getStatistics() {
return _statistics;
}

Expand Down Expand Up @@ -106,7 +109,7 @@ private DefaultMetric(final Builder builder) {

private final MetricType _type;
private final ImmutableList<Quantity> _values;
private final ImmutableList<AggregatedData> _statistics;
private final ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> _statistics;

/**
* Implementation of builder pattern for {@link DefaultMetric}.
Expand All @@ -123,12 +126,12 @@ public Builder() {
}

/**
* The statistics {@code List}. Cannot be null.
* The statistics {@code Map}. Cannot be null.
*
* @param value The values {@code List}.
* @return This instance of {@link Builder}.
*/
public Builder setStatistics(final ImmutableList<AggregatedData> value) {
public Builder setStatistics(final ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> value) {
_statistics = value;
return this;
}
Expand Down Expand Up @@ -162,7 +165,7 @@ protected void reset() {
}

@NotNull
private ImmutableList<AggregatedData> _statistics = ImmutableList.of();
private ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> _statistics = ImmutableMap.of();
@NotNull
private ImmutableList<Quantity> _values = ImmutableList.of();
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package com.arpnetworking.metrics.mad.model;

import com.arpnetworking.metrics.mad.model.statistics.Statistic;
import com.arpnetworking.tsdcore.model.CalculatedValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;

/**
Expand Down Expand Up @@ -43,5 +48,5 @@ public interface Metric {
*
* @return The collected statistical data.
*/
List<AggregatedData> getStatistics();
ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> getStatistics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,19 @@ public interface Accumulator<T> extends Calculator<T> {
* @return This <code>Accumulator</code>.
*/
Accumulator<T> accumulate(CalculatedValue<T> calculatedValue);

/**
* Add the specified <code>CalculatedValue</code> to the accumulated value. The
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we using @link now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MAD-2.0; haven't back ported this to 1.0

* <code>CalculatedValue</code> was produced by this <code>Accumulator</code> in
* a different context. For example, for a different time period or a different
* host. It is permissible to mix calls to accumulate with <code>Quantity</code>
* and <code>CalculatedValue</code>.
*
* If the <code>CalculatedValue</code>'s supporting data is of an unsupported
* type then an <code>IllegaglArgumentException</code> will be thrown.
*
* @param calculatedValue The <code>CalculatedValue</code> to include in the accumulated value.
* @return This <code>Accumulator</code>.
*/
Accumulator<T> accumulateAny(CalculatedValue<?> calculatedValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {

@Override
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
return accumulateAny(calculatedValue);
}

@Override
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
_count += calculatedValue.getValue().getValue();
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ public Accumulator<HistogramSupportingData> accumulate(final CalculatedValue<His
return this;
}

@Override
public Accumulator<HistogramSupportingData> accumulateAny(final CalculatedValue<?> calculatedValue) {
if (calculatedValue.getData() == null) {
throw new IllegalArgumentException(
String.format(
"Null calculated value data for %s",
this.getClass()));
}
if (!(calculatedValue.getData() instanceof HistogramSupportingData)) {
throw new IllegalArgumentException(
String.format(
"Unsupported calculated value data type %s for %s",
calculatedValue.getData().getClass(),
this.getClass()));
}
@SuppressWarnings("unchecked")
final CalculatedValue<HistogramSupportingData> checkedCalculatedValue =
(CalculatedValue<HistogramSupportingData>) calculatedValue;
return accumulate(checkedCalculatedValue);
}

@Override
public CalculatedValue<HistogramSupportingData> calculate(final Map<Statistic, Calculator<?>> dependencies) {
return ThreadLocalBuilder.<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {

@Override
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
return accumulateAny(calculatedValue);
}

@Override
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
return accumulate(calculatedValue.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {

@Override
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
return accumulateAny(calculatedValue);
}

@Override
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
return accumulate(calculatedValue.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {

@Override
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
return accumulateAny(calculatedValue);
}

@Override
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
return accumulate(calculatedValue.getValue());
}

Expand Down
Loading