diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/ConnectionThrottlingConfig.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/ConnectionThrottlingConfig.java index 6b6d3e3eaf..7175941a6e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/ConnectionThrottlingConfig.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/ConnectionThrottlingConfig.java @@ -39,6 +39,12 @@ default int getMaxInFlight() { return (int) (getLimit() * getMaxInFlightFactor()); } + /** + * Returns the tolerance in percent of the actual throttling limit when a source is considered throttled. + * + * @return the throttling detection tolerance + */ + double getThrottlingDetectionTolerance(); /** * Returns an instance of {@code ConnectionThrottlingConfig} based on the settings of the specified Config. @@ -62,7 +68,12 @@ enum ConfigValue implements KnownConfigValue { * The factor of maximum number of messages waiting for an acknowledgement per consumer in relation to the * {@code limit}. */ - MAX_IN_FLIGHT_FACTOR("max-in-flight-factor", 2.0); + MAX_IN_FLIGHT_FACTOR("max-in-flight-factor", 2.0), + + /** + * The tolerance in percent of the actual throttling limit when a source is considered throttled. + */ + THROTTLING_DETECTION_TOLERANCE("throttling-detection-tolerance", 0.05); private final String path; private final Object defaultValue; diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfig.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfig.java index 461b90bfdf..06ae8d6c8c 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfig.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfig.java @@ -33,15 +33,22 @@ final class DefaultConnectionThrottlingConfig implements ConnectionThrottlingCon private final ThrottlingConfig throttlingConfig; private final double maxInFlightFactor; + private final double throttlingDetectionTolerance; private DefaultConnectionThrottlingConfig(final ScopedConfig config, final ThrottlingConfig throttlingConfig) { maxInFlightFactor = config.getPositiveDoubleOrThrow(ConfigValue.MAX_IN_FLIGHT_FACTOR); + throttlingDetectionTolerance = config.getPositiveDoubleOrThrow(ConfigValue.THROTTLING_DETECTION_TOLERANCE); this.throttlingConfig = throttlingConfig; if (maxInFlightFactor < 1.0) { throw new DittoConfigError(MessageFormat.format( "The double value at <{0}> must be >= 1.0 but it was <{1}>!", ConfigValue.MAX_IN_FLIGHT_FACTOR.getConfigPath(), maxInFlightFactor)); } + if (throttlingDetectionTolerance > 1.0) { + throw new DittoConfigError(MessageFormat.format( + "The double value at <{0}> must be <= 1.0 but it was <{1}>!", + ConfigValue.THROTTLING_DETECTION_TOLERANCE.getConfigPath(), maxInFlightFactor)); + } } static ConnectionThrottlingConfig of(final Config config) { @@ -65,24 +72,31 @@ public double getMaxInFlightFactor() { return maxInFlightFactor; } + @Override + public double getThrottlingDetectionTolerance() { + return throttlingDetectionTolerance; + } + @Override public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final DefaultConnectionThrottlingConfig that = (DefaultConnectionThrottlingConfig) o; - return Double.compare(that.maxInFlightFactor, maxInFlightFactor) == 0 && - Objects.equals(throttlingConfig, that.throttlingConfig); + return Double.compare(that.maxInFlightFactor, maxInFlightFactor) == 0 + && Double.compare(that.throttlingDetectionTolerance, throttlingDetectionTolerance) == 0 + && Objects.equals(throttlingConfig, that.throttlingConfig); } @Override public int hashCode() { - return Objects.hash(maxInFlightFactor, throttlingConfig); + return Objects.hash(maxInFlightFactor, throttlingConfig, throttlingDetectionTolerance); } @Override public String toString() { return getClass().getSimpleName() + " [" + "maxInFlightFactor=" + maxInFlightFactor + + "throttlingDetectionTolerance=" + throttlingDetectionTolerance + ", throttlingConfig=" + throttlingConfig + "]"; } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/ConnectionMonitorRegistry.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/ConnectionMonitorRegistry.java index 7d70b50679..4dce028af7 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/ConnectionMonitorRegistry.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/ConnectionMonitorRegistry.java @@ -14,7 +14,6 @@ package org.eclipse.ditto.connectivity.service.messaging.monitoring; import org.eclipse.ditto.connectivity.model.Connection; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; /** * Registry that provides monitors for the different use cases inside a connection. diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java index 4592beb43b..5f102712f3 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/logs/ConnectionLoggerRegistry.java @@ -41,7 +41,6 @@ import org.eclipse.ditto.connectivity.model.LogType; import org.eclipse.ditto.connectivity.model.Source; import org.eclipse.ditto.connectivity.model.Target; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; import org.eclipse.ditto.connectivity.service.config.MonitoringLoggerConfig; import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry; import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey; diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounter.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounter.java index 415d69bc66..492e72943e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounter.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounter.java @@ -31,12 +31,12 @@ public interface ConnectionMetricsCounter { void recordFailure(); /** - * Record a successful operation. + * Record a successful operation at the given timestamp. */ void recordSuccess(long ts); /** - * Record a failed operation. + * Record a failed operation at the given timestamp. */ void recordFailure(long ts); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounterFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounterFactory.java index 00872d95e3..cd55bf624f 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounterFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ConnectionMetricsCounterFactory.java @@ -18,7 +18,6 @@ import javax.annotation.Nullable; -import org.eclipse.ditto.base.service.config.ThrottlingConfig; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.model.MetricDirection; @@ -49,7 +48,7 @@ final class ConnectionMetricsCounterFactory { /** * An alert can be registered for a combination of MetricType and MetricDirection e.g. CONSUMED + INBOUND. These - * alerts will be instantiated using the registered Creator and used with newly created SlidingWindowCounters. + * alerts will be instantiated using the registered Creator and passed to created SlidingWindowCounters. */ private static final Map alerts = Map.of( MetricsAlert.Key.CONSUMED_INBOUND, @@ -63,18 +62,30 @@ final class ConnectionMetricsCounterFactory { } ); - private ConnectionMetricsCounterFactory() {} - + /** + * Create a new instance of a DefaultConnectionMetricsCounter from the given arguments. + * + * @param metricType the metric type + * @param metricDirection the metric direction + * @param connectionId the connection id required to build a metrics counter + * @param connectionType the connection type required to build a metrics counter + * @param address the monitored address + * @param clock the clock to be used + * @param connectivityConfig the connectivity config required to read throttling limits from the config + */ static DefaultConnectionMetricsCounter create( final MetricType metricType, - final MetricDirection metricDirection, final ConnectionId connectionId, + final MetricDirection metricDirection, + final ConnectionId connectionId, final ConnectionType connectionType, final String address, - final Clock clock, final ConnectivityConfig connectivityConfig) { + final Clock clock, + final ConnectivityConfig connectivityConfig) { final Counter metricsCounter = metricsCounter(connectionId, connectionType, metricType, metricDirection); final SlidingWindowCounter counter; if (MetricType.THROTTLED == metricType) { - counter = SlidingWindowCounter.newBuilder(metricsCounter).clock(clock) + counter = SlidingWindowCounter.newBuilder(metricsCounter) + .clock(clock) // we need to record for every minute of the last 24h if throttling occurred .recordingMeasurementWindows(MeasurementWindow.ONE_DAY_WITH_ONE_MINUTE_RESOLUTION) // reporting windows are the same as for the other metrics (1m, 1h, 1d) @@ -117,7 +128,7 @@ private static MetricsAlert resolveOptionalAlert(final MetricDirection metricDir .orElse(null); } - private static int calculateThrottlingLimitFromConfig(final ConnectionType connectionType, + private static long calculateThrottlingLimitFromConfig(final ConnectionType connectionType, final ConnectivityConfig config) { switch (connectionType) { case AMQP_10: @@ -138,13 +149,19 @@ private static int calculateThrottlingLimitFromConfig(final ConnectionType conne } } - private static int perInterval(final ThrottlingConfig throttlingConfig, final Duration resolution) { + private static long perInterval(final ConnectionThrottlingConfig throttlingConfig, final Duration resolution) { + final double tolerance = throttlingConfig.getThrottlingDetectionTolerance(); final Duration interval = throttlingConfig.getInterval(); - final float factor = (float) resolution.toMillis() / interval.toMillis(); + // calculate factor to adjust the limit to the given resolution + final double factor = (double) resolution.toMillis() / interval.toMillis(); final int limit = throttlingConfig.getLimit(); - return Math.round(limit * factor); + final double limitAdjustedToResolution = limit * factor; + // apply the configured tolerance to the resulting limit + return (long) (limitAdjustedToResolution * (1 - tolerance)); } + private ConnectionMetricsCounterFactory() {} + /** * Creator interface for MetricsAlerts which are stored in the map of existing {@link #alerts}. */ diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ThrottledMetricsAlert.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ThrottledMetricsAlert.java index bcd205fd0e..2b13b3f557 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ThrottledMetricsAlert.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/ThrottledMetricsAlert.java @@ -37,7 +37,7 @@ final class ThrottledMetricsAlert implements MetricsAlert { * @param lookup a supplier to lookup the counter to record a possible threshold exceedance (the counter may not * exist at the time when the alert is created) */ - ThrottledMetricsAlert(final MeasurementWindow targetMeasurementWindow, final int threshold, + ThrottledMetricsAlert(final MeasurementWindow targetMeasurementWindow, final long threshold, final Supplier lookup) { this.lookup = lookup; this.threshold = threshold; diff --git a/connectivity/service/src/main/resources/connectivity.conf b/connectivity/service/src/main/resources/connectivity.conf index c773a96609..4ecd1de277 100644 --- a/connectivity/service/src/main/resources/connectivity.conf +++ b/connectivity/service/src/main/resources/connectivity.conf @@ -170,8 +170,13 @@ ditto { limit = 100 limit = ${?AMQP10_CONSUMER_THROTTLING_LIMIT} + # Throttling detection tolerance in percent e.g. for a tolerance of 5% if consumed messages are above + # limit - 5% per interval the source is considered throttled. + throttling-detection-tolerance = 0.05 + throttling-detection-tolerance = ${?AMQP_10_CONSUMER_THROTTLING_DETECTION_TOLERANCE} + # How many unacknowledged messages are allowed at any time as factor of ${limit} - must be >= 1.0 - # This limit couples latency with throughput (long lagency before ack -> lower throughput) + # This limit couples latency with throughput (long latency before ack -> lower throughput) max-in-flight-factor = 2.0 max-in-flight-factor = ${?AMQP10_CONSUMER_THROTTLING_MAX_IN_FLIGHT_FACTOR} } @@ -327,8 +332,13 @@ ditto { limit = 100 limit = ${?KAFKA_CONSUMER_THROTTLING_LIMIT} + # Throttling detection tolerance in percent e.g. for a tolerance of 5% if consumed messages are above + # limit - 5% per interval the source is considered throttled. + throttling-detection-tolerance = 0.05 + throttling-detection-tolerance = ${?KAFKA_CONSUMER_THROTTLING_DETECTION_TOLERANCE} + # How many unacknowledged messages are allowed at any time as factor of ${limit} - must be >= 1.0 - # This limit couples latency with throughput (long lagency before ack -> lower throughput) + # This limit couples latency with throughput (long latency before ack -> lower throughput) max-in-flight-factor = 2.0 max-in-flight-factor = ${?KAFKA_CONSUMER_THROTTLING_MAX_IN_FLIGHT_FACTOR} } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfigTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfigTest.java index a5c8df41d5..3fe9170b40 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfigTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/config/DefaultConnectionThrottlingConfigTest.java @@ -64,6 +64,9 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() { softly.assertThat(underTest.getMaxInFlightFactor()) .as(ConnectionThrottlingConfig.ConfigValue.MAX_IN_FLIGHT_FACTOR.getConfigPath()) .isEqualTo(ConnectionThrottlingConfig.ConfigValue.MAX_IN_FLIGHT_FACTOR.getDefaultValue()); + softly.assertThat(underTest.getThrottlingDetectionTolerance()) + .as(ConnectionThrottlingConfig.ConfigValue.THROTTLING_DETECTION_TOLERANCE.getConfigPath()) + .isEqualTo(ConnectionThrottlingConfig.ConfigValue.THROTTLING_DETECTION_TOLERANCE.getDefaultValue()); softly.assertThat(underTest.getInterval()) .as(ThrottlingConfig.ConfigValue.INTERVAL.getConfigPath()) .isEqualTo(ThrottlingConfig.ConfigValue.INTERVAL.getDefaultValue()); @@ -78,6 +81,9 @@ public void underTestReturnsValuesOfConfigFile() { softly.assertThat(underTest.getMaxInFlightFactor()) .as(ConnectionThrottlingConfig.ConfigValue.MAX_IN_FLIGHT_FACTOR.getConfigPath()) .isEqualTo(2.0); + softly.assertThat(underTest.getThrottlingDetectionTolerance()) + .as(ConnectionThrottlingConfig.ConfigValue.THROTTLING_DETECTION_TOLERANCE.getConfigPath()) + .isEqualTo(0.05); softly.assertThat(underTest.getLimit()) .as(ThrottlingConfig.ConfigValue.LIMIT.getConfigPath()) .isEqualTo(4711); diff --git a/connectivity/service/src/test/resources/connection-throttling-test.conf b/connectivity/service/src/test/resources/connection-throttling-test.conf index db7f3ab465..b061459df5 100644 --- a/connectivity/service/src/test/resources/connection-throttling-test.conf +++ b/connectivity/service/src/test/resources/connection-throttling-test.conf @@ -1,6 +1,7 @@ throttling { interval = 42s limit = 4711 + throttling-detection-tolerance = 0.05 max-in-flight = 1337 }