Skip to content

Commit

Permalink
add throttling detection tolerance to configuration, some javadoc and…
Browse files Browse the repository at this point in the history
… cleanup

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Sep 23, 2021
1 parent 8592249 commit e4ec681
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ default int getMaxInFlight() {
return (int) (getLimit() * getMaxInFlightFactor());
}

/**
* Returns the tolerance in percent of the actual throttling limit when a source is considered throttled.
*
* @return the throttling detection tolerance
*/
double getThrottlingDetectionTolerance();

/**
* Returns an instance of {@code ConnectionThrottlingConfig} based on the settings of the specified Config.
Expand All @@ -62,7 +68,12 @@ enum ConfigValue implements KnownConfigValue {
* The factor of maximum number of messages waiting for an acknowledgement per consumer in relation to the
* {@code limit}.
*/
MAX_IN_FLIGHT_FACTOR("max-in-flight-factor", 2.0);
MAX_IN_FLIGHT_FACTOR("max-in-flight-factor", 2.0),

/**
* The tolerance in percent of the actual throttling limit when a source is considered throttled.
*/
THROTTLING_DETECTION_TOLERANCE("throttling-detection-tolerance", 0.05);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,22 @@ final class DefaultConnectionThrottlingConfig implements ConnectionThrottlingCon

private final ThrottlingConfig throttlingConfig;
private final double maxInFlightFactor;
private final double throttlingDetectionTolerance;

private DefaultConnectionThrottlingConfig(final ScopedConfig config, final ThrottlingConfig throttlingConfig) {
maxInFlightFactor = config.getPositiveDoubleOrThrow(ConfigValue.MAX_IN_FLIGHT_FACTOR);
throttlingDetectionTolerance = config.getPositiveDoubleOrThrow(ConfigValue.THROTTLING_DETECTION_TOLERANCE);
this.throttlingConfig = throttlingConfig;
if (maxInFlightFactor < 1.0) {
throw new DittoConfigError(MessageFormat.format(
"The double value at <{0}> must be >= 1.0 but it was <{1}>!",
ConfigValue.MAX_IN_FLIGHT_FACTOR.getConfigPath(), maxInFlightFactor));
}
if (throttlingDetectionTolerance > 1.0) {
throw new DittoConfigError(MessageFormat.format(
"The double value at <{0}> must be <= 1.0 but it was <{1}>!",
ConfigValue.THROTTLING_DETECTION_TOLERANCE.getConfigPath(), maxInFlightFactor));
}
}

static ConnectionThrottlingConfig of(final Config config) {
Expand All @@ -65,24 +72,31 @@ public double getMaxInFlightFactor() {
return maxInFlightFactor;
}

@Override
public double getThrottlingDetectionTolerance() {
return throttlingDetectionTolerance;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultConnectionThrottlingConfig that = (DefaultConnectionThrottlingConfig) o;
return Double.compare(that.maxInFlightFactor, maxInFlightFactor) == 0 &&
Objects.equals(throttlingConfig, that.throttlingConfig);
return Double.compare(that.maxInFlightFactor, maxInFlightFactor) == 0
&& Double.compare(that.throttlingDetectionTolerance, throttlingDetectionTolerance) == 0
&& Objects.equals(throttlingConfig, that.throttlingConfig);
}

@Override
public int hashCode() {
return Objects.hash(maxInFlightFactor, throttlingConfig);
return Objects.hash(maxInFlightFactor, throttlingConfig, throttlingDetectionTolerance);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"maxInFlightFactor=" + maxInFlightFactor +
"throttlingDetectionTolerance=" + throttlingDetectionTolerance +
", throttlingConfig=" + throttlingConfig +
"]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.ditto.connectivity.service.messaging.monitoring;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;

/**
* Registry that provides monitors for the different use cases inside a connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringLoggerConfig;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public interface ConnectionMetricsCounter {
void recordFailure();

/**
* Record a successful operation.
* Record a successful operation at the given timestamp.
*/
void recordSuccess(long ts);

/**
* Record a failed operation.
* Record a failed operation at the given timestamp.
*/
void recordFailure(long ts);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
Expand Down Expand Up @@ -49,7 +48,7 @@ final class ConnectionMetricsCounterFactory {

/**
* An alert can be registered for a combination of MetricType and MetricDirection e.g. CONSUMED + INBOUND. These
* alerts will be instantiated using the registered Creator and used with newly created SlidingWindowCounters.
* alerts will be instantiated using the registered Creator and passed to created SlidingWindowCounters.
*/
private static final Map<MetricsAlert.Key, AlertsCreator> alerts = Map.of(
MetricsAlert.Key.CONSUMED_INBOUND,
Expand All @@ -63,18 +62,30 @@ final class ConnectionMetricsCounterFactory {
}
);

private ConnectionMetricsCounterFactory() {}

/**
* Create a new instance of a DefaultConnectionMetricsCounter from the given arguments.
*
* @param metricType the metric type
* @param metricDirection the metric direction
* @param connectionId the connection id required to build a metrics counter
* @param connectionType the connection type required to build a metrics counter
* @param address the monitored address
* @param clock the clock to be used
* @param connectivityConfig the connectivity config required to read throttling limits from the config
*/
static DefaultConnectionMetricsCounter create(
final MetricType metricType,
final MetricDirection metricDirection, final ConnectionId connectionId,
final MetricDirection metricDirection,
final ConnectionId connectionId,
final ConnectionType connectionType,
final String address,
final Clock clock, final ConnectivityConfig connectivityConfig) {
final Clock clock,
final ConnectivityConfig connectivityConfig) {
final Counter metricsCounter = metricsCounter(connectionId, connectionType, metricType, metricDirection);
final SlidingWindowCounter counter;
if (MetricType.THROTTLED == metricType) {
counter = SlidingWindowCounter.newBuilder(metricsCounter).clock(clock)
counter = SlidingWindowCounter.newBuilder(metricsCounter)
.clock(clock)
// we need to record for every minute of the last 24h if throttling occurred
.recordingMeasurementWindows(MeasurementWindow.ONE_DAY_WITH_ONE_MINUTE_RESOLUTION)
// reporting windows are the same as for the other metrics (1m, 1h, 1d)
Expand Down Expand Up @@ -117,7 +128,7 @@ private static MetricsAlert resolveOptionalAlert(final MetricDirection metricDir
.orElse(null);
}

private static int calculateThrottlingLimitFromConfig(final ConnectionType connectionType,
private static long calculateThrottlingLimitFromConfig(final ConnectionType connectionType,
final ConnectivityConfig config) {
switch (connectionType) {
case AMQP_10:
Expand All @@ -138,13 +149,19 @@ private static int calculateThrottlingLimitFromConfig(final ConnectionType conne
}
}

private static int perInterval(final ThrottlingConfig throttlingConfig, final Duration resolution) {
private static long perInterval(final ConnectionThrottlingConfig throttlingConfig, final Duration resolution) {
final double tolerance = throttlingConfig.getThrottlingDetectionTolerance();
final Duration interval = throttlingConfig.getInterval();
final float factor = (float) resolution.toMillis() / interval.toMillis();
// calculate factor to adjust the limit to the given resolution
final double factor = (double) resolution.toMillis() / interval.toMillis();
final int limit = throttlingConfig.getLimit();
return Math.round(limit * factor);
final double limitAdjustedToResolution = limit * factor;
// apply the configured tolerance to the resulting limit
return (long) (limitAdjustedToResolution * (1 - tolerance));
}

private ConnectionMetricsCounterFactory() {}

/**
* Creator interface for MetricsAlerts which are stored in the map of existing {@link #alerts}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class ThrottledMetricsAlert implements MetricsAlert {
* @param lookup a supplier to lookup the counter to record a possible threshold exceedance (the counter may not
* exist at the time when the alert is created)
*/
ThrottledMetricsAlert(final MeasurementWindow targetMeasurementWindow, final int threshold,
ThrottledMetricsAlert(final MeasurementWindow targetMeasurementWindow, final long threshold,
final Supplier<ConnectionMetricsCounter> lookup) {
this.lookup = lookup;
this.threshold = threshold;
Expand Down
14 changes: 12 additions & 2 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,13 @@ ditto {
limit = 100
limit = ${?AMQP10_CONSUMER_THROTTLING_LIMIT}

# Throttling detection tolerance in percent e.g. for a tolerance of 5% if consumed messages are above
# limit - 5% per interval the source is considered throttled.
throttling-detection-tolerance = 0.05
throttling-detection-tolerance = ${?AMQP_10_CONSUMER_THROTTLING_DETECTION_TOLERANCE}

# How many unacknowledged messages are allowed at any time as factor of ${limit} - must be >= 1.0
# This limit couples latency with throughput (long lagency before ack -> lower throughput)
# This limit couples latency with throughput (long latency before ack -> lower throughput)
max-in-flight-factor = 2.0
max-in-flight-factor = ${?AMQP10_CONSUMER_THROTTLING_MAX_IN_FLIGHT_FACTOR}
}
Expand Down Expand Up @@ -327,8 +332,13 @@ ditto {
limit = 100
limit = ${?KAFKA_CONSUMER_THROTTLING_LIMIT}

# Throttling detection tolerance in percent e.g. for a tolerance of 5% if consumed messages are above
# limit - 5% per interval the source is considered throttled.
throttling-detection-tolerance = 0.05
throttling-detection-tolerance = ${?KAFKA_CONSUMER_THROTTLING_DETECTION_TOLERANCE}

# How many unacknowledged messages are allowed at any time as factor of ${limit} - must be >= 1.0
# This limit couples latency with throughput (long lagency before ack -> lower throughput)
# This limit couples latency with throughput (long latency before ack -> lower throughput)
max-in-flight-factor = 2.0
max-in-flight-factor = ${?KAFKA_CONSUMER_THROTTLING_MAX_IN_FLIGHT_FACTOR}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
softly.assertThat(underTest.getMaxInFlightFactor())
.as(ConnectionThrottlingConfig.ConfigValue.MAX_IN_FLIGHT_FACTOR.getConfigPath())
.isEqualTo(ConnectionThrottlingConfig.ConfigValue.MAX_IN_FLIGHT_FACTOR.getDefaultValue());
softly.assertThat(underTest.getThrottlingDetectionTolerance())
.as(ConnectionThrottlingConfig.ConfigValue.THROTTLING_DETECTION_TOLERANCE.getConfigPath())
.isEqualTo(ConnectionThrottlingConfig.ConfigValue.THROTTLING_DETECTION_TOLERANCE.getDefaultValue());
softly.assertThat(underTest.getInterval())
.as(ThrottlingConfig.ConfigValue.INTERVAL.getConfigPath())
.isEqualTo(ThrottlingConfig.ConfigValue.INTERVAL.getDefaultValue());
Expand All @@ -78,6 +81,9 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.getMaxInFlightFactor())
.as(ConnectionThrottlingConfig.ConfigValue.MAX_IN_FLIGHT_FACTOR.getConfigPath())
.isEqualTo(2.0);
softly.assertThat(underTest.getThrottlingDetectionTolerance())
.as(ConnectionThrottlingConfig.ConfigValue.THROTTLING_DETECTION_TOLERANCE.getConfigPath())
.isEqualTo(0.05);
softly.assertThat(underTest.getLimit())
.as(ThrottlingConfig.ConfigValue.LIMIT.getConfigPath())
.isEqualTo(4711);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
throttling {
interval = 42s
limit = 4711
throttling-detection-tolerance = 0.05
max-in-flight = 1337
}

0 comments on commit e4ec681

Please sign in to comment.