Skip to content

Commit

Permalink
use last timestamp to determine the value of throttled metric for 1mi…
Browse files Browse the repository at this point in the history
…n measurement window (allowes more accuracy)

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Sep 23, 2021
1 parent e4ec681 commit 596ff34
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ static DefaultConnectionMetricsCounter create(
MeasurementWindow.ONE_HOUR_WITH_ONE_MINUTE_RESOLUTION,
MeasurementWindow.ONE_DAY_WITH_ONE_MINUTE_RESOLUTION)
.maximumPerSlot(1L)
.useLastTimestampForWindow(MeasurementWindow.ONE_MINUTE_WITH_ONE_MINUTE_RESOLUTION, 1L)
.build();
} else {
final MetricsAlert metricsAlert = resolveOptionalAlert(metricDirection, address, metricType, connectionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ static Optional<Key> from(final MetricDirection metricDirection, final MetricTyp
.findAny();
}

private Key(final MetricDirection metricDirection, final MetricType metricType) {
Key(final MetricDirection metricDirection, final MetricType metricType) {
this.metricDirection = metricDirection;
this.metricType = metricType;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -53,6 +54,10 @@ public final class SlidingWindowCounter {
private final long maximumPerSlot;
private final boolean cleanUpEnabled;

// allows to override the reported value with a fixed value by checking the last modified timestamp instead of
// calculating from the measurement maps (allows more accuracy for the shortest window)
private final Map<MeasurementWindow, Long> lastTimestampOverrides;

private SlidingWindowCounter(final SlidingWindowCounterBuilder builder) {
this.metricsCounter = builder.metricsCounter;
this.clock = builder.clock;
Expand All @@ -61,6 +66,7 @@ private SlidingWindowCounter(final SlidingWindowCounterBuilder builder) {
this.windowsForRecording = builder.recordingMeasurementWindows;
this.windowsForReporting = builder.reportingMeasurementWindows;
this.maximumPerSlot = builder.maximumPerSlot;
this.lastTimestampOverrides = builder.lastTimestampOverrides;

minResolution = Stream.of(windowsForRecording)
.map(MeasurementWindow::getResolution)
Expand Down Expand Up @@ -179,9 +185,9 @@ private boolean isOld(final long slot) {
*/
Map<Duration, Long> getCounts(final boolean success) {
if (success) {
return getCounts(successMeasurements);
return getCounts(successMeasurements, lastSuccessTimestamp.get());
}
return getCounts(failureMeasurements);
return getCounts(failureMeasurements, lastFailureTimestamp.get());
}

/**
Expand All @@ -190,21 +196,26 @@ Map<Duration, Long> getCounts(final boolean success) {
* @param measurements the measurements map to use
* @return the counts for all windows
*/
private Map<Duration, Long> getCounts(final Map<Long, Long> measurements) {
private Map<Duration, Long> getCounts(final Map<Long, Long> measurements,
final long lastTimestamp) {
final Map<Duration, Long> result = new HashMap<>();
final long now = clock.instant().toEpochMilli();
for (final MeasurementWindow window : windowsForReporting) {
// min is where we start to sum up the slots
final long windowInMs = window.getWindow().toMillis();
final long resolutionInMs = window.getResolution().toMillis();
final long min = getSlot(now - windowInMs, resolutionInMs);
// max is the current active time slot
final long max = getSlot(now, resolutionInMs);
long sum = 0;
for (final Map.Entry<Long, Long> e : measurements.entrySet()) {
final long slot = e.getKey();
if (slot > min && slot <= max) {
sum += Math.min(maximumPerSlot, e.getValue());
if (lastTimestampOverrides.containsKey(window) && now - window.getWindow().toMillis() < lastTimestamp) {
sum = lastTimestampOverrides.get(window);
} else {
// min is where we start to sum up the slots
final long windowInMs = window.getWindow().toMillis();
final long resolutionInMs = window.getResolution().toMillis();
final long min = getSlot(now - windowInMs, resolutionInMs);
// max is the current active time slot
final long max = getSlot(now, resolutionInMs);
for (final Map.Entry<Long, Long> e : measurements.entrySet()) {
final long slot = e.getKey();
if (slot > min && slot <= max) {
sum += Math.min(maximumPerSlot, e.getValue());
}
}
}
result.put(window.getWindow(), sum);
Expand Down Expand Up @@ -240,51 +251,58 @@ static final class SlidingWindowCounterBuilder {
private MeasurementWindow[] recordingMeasurementWindows;
private MeasurementWindow[] reportingMeasurementWindows;
private long maximumPerSlot = Long.MAX_VALUE;
private final Map<MeasurementWindow, Long> lastTimestampOverrides = new EnumMap<>(MeasurementWindow.class);

public SlidingWindowCounterBuilder(final Counter metricsCounter) {
private SlidingWindowCounterBuilder(final Counter metricsCounter) {
this.metricsCounter = metricsCounter;
}

public SlidingWindowCounterBuilder clock(final Clock clock) {
SlidingWindowCounterBuilder clock(final Clock clock) {
this.clock = clock;
return this;
}

public SlidingWindowCounterBuilder metricsAlert(@Nullable final MetricsAlert metricsAlert) {
SlidingWindowCounterBuilder metricsAlert(@Nullable final MetricsAlert metricsAlert) {
this.metricsAlert = metricsAlert;
return this;
}

public SlidingWindowCounterBuilder cleanUpEnabled(final boolean cleanUpEnabled) {
SlidingWindowCounterBuilder cleanUpEnabled(final boolean cleanUpEnabled) {
this.cleanUpEnabled = cleanUpEnabled;
return this;
}

public SlidingWindowCounterBuilder recordingMeasurementWindows(
SlidingWindowCounterBuilder recordingMeasurementWindows(
final MeasurementWindow... recordingMeasurementWindows) {
this.recordingMeasurementWindows = recordingMeasurementWindows;
return this;
}

public SlidingWindowCounterBuilder reportingMeasurementWindows(
SlidingWindowCounterBuilder reportingMeasurementWindows(
final MeasurementWindow... reportingMeasurementWindows) {
this.reportingMeasurementWindows = reportingMeasurementWindows;
return this;
}

public SlidingWindowCounterBuilder measurementWindows(
SlidingWindowCounterBuilder measurementWindows(
final MeasurementWindow... measurementWindows) {
this.reportingMeasurementWindows = measurementWindows;
this.recordingMeasurementWindows = measurementWindows;
return this;
}

public SlidingWindowCounterBuilder maximumPerSlot(final long maximumPerSlot) {
SlidingWindowCounterBuilder maximumPerSlot(final long maximumPerSlot) {
this.maximumPerSlot = maximumPerSlot;
return this;
}

public SlidingWindowCounter build() {
SlidingWindowCounterBuilder useLastTimestampForWindow(final MeasurementWindow window,
final Long fixedValue) {
this.lastTimestampOverrides.put(window, fixedValue);
return this;
}

SlidingWindowCounter build() {
return new SlidingWindowCounter(this);
}
}
Expand All @@ -304,6 +322,7 @@ public String toString() {
", metricsAlert=" + metricsAlert +
", maximumPerSlot=" + maximumPerSlot +
", cleanUpEnabled=" + cleanUpEnabled +
", lastTimestampOverrides=" + lastTimestampOverrides +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class ThrottledMetricsAlert implements MetricsAlert {

@Override
public boolean evaluateCondition(final MeasurementWindow window, final long timestamp, final long value) {
return targetMeasurementWindow == window && value >= threshold;
return targetMeasurementWindow == window && value > threshold;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private static ConnectivityConfig mockConnectivityConfig() {
when(connectionConfig.getAmqp10Config()).thenReturn(amqp10Config);
when(amqp10Config.getConsumerConfig()).thenReturn(consumerConfig);
when(consumerConfig.getThrottlingConfig()).thenReturn(throttlingConfig);
when(throttlingConfig.getLimit()).thenReturn(12);
when(throttlingConfig.getLimit()).thenReturn(10);
when(throttlingConfig.getInterval()).thenReturn(Duration.ofMinutes(1));
return connectivityConfig;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics;

import static org.assertj.core.api.Assertions.*;
import static org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.MeasurementWindow.ONE_MINUTE_WITH_ONE_MINUTE_RESOLUTION;
import static org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.MeasurementWindow.ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import org.junit.Before;
import org.junit.Test;

/**
* Tests ThrottledMetricsAlert.
*/
public class ThrottledMetricsAlertTest {

private static final long NOW = System.currentTimeMillis();
private ConnectionMetricsCounter metricsCounter;
private MetricsAlert underTest;

@Before
public void setUp() throws Exception {
metricsCounter = mock(ConnectionMetricsCounter.class);
underTest = new ThrottledMetricsAlert(ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION, 10,
() -> metricsCounter);
}

@Test
public void testCondition() {


assertThat(underTest.evaluateCondition(ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION, NOW, 0))
.isFalse();
assertThat(underTest.evaluateCondition(ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION, NOW, 10))
.isFalse();
assertThat(underTest.evaluateCondition(ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION, NOW, 11))
.isTrue();
assertThat(underTest.evaluateCondition(ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION, NOW, 100))
.isTrue();

// wrong window
assertThat(underTest.evaluateCondition(ONE_MINUTE_WITH_ONE_MINUTE_RESOLUTION, NOW, 1000))
.isFalse();
}

@Test
public void testAction() {
underTest.triggerAction(NOW, 100);
verify(metricsCounter).recordFailure(NOW);
}

@Test
public void testLookupReturnsNull() {
assertThatNoException().isThrownBy(() -> new ThrottledMetricsAlert(ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION, 10,
() -> metricsCounter).triggerAction(NOW, 100));

}
}

0 comments on commit 596ff34

Please sign in to comment.