Skip to content

Commit

Permalink
introduced explicit "enabled" option for ThrottlingConfig and by defa…
Browse files Browse the repository at this point in the history
…ult disable it in Ditto

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Feb 28, 2022
1 parent cab08f8 commit 0076d4b
Show file tree
Hide file tree
Showing 27 changed files with 136 additions and 105 deletions.
Expand Up @@ -28,18 +28,25 @@
@Immutable
final class DefaultThrottlingConfig implements ThrottlingConfig {

private final boolean enabled;
private final Duration interval;
private final int limit;

private DefaultThrottlingConfig(final ScopedConfig config) {
interval = config.getNonNegativeDurationOrThrow(ConfigValue.INTERVAL);
limit = config.getNonNegativeIntOrThrow(ConfigValue.LIMIT);
enabled = config.getBoolean(ConfigValue.ENABLED.getConfigPath());
interval = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.INTERVAL);
limit = config.getPositiveIntOrThrow(ConfigValue.LIMIT);
}

static DefaultThrottlingConfig of(final Config config) {
return new DefaultThrottlingConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}

@Override
public boolean isEnabled() {
return enabled;
}

@Override
public Duration getInterval() {
return interval;
Expand All @@ -59,18 +66,21 @@ public boolean equals(final Object o) {
return false;
}
final DefaultThrottlingConfig that = (DefaultThrottlingConfig) o;
return limit == that.limit && Objects.equals(interval, that.interval);
return enabled == that.enabled &&
limit == that.limit &&
Objects.equals(interval, that.interval);
}

@Override
public int hashCode() {
return Objects.hash(interval, limit);
return Objects.hash(enabled, interval, limit);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"interval=" + interval +
"enabled=" + enabled +
", interval=" + interval +
", limit=" + limit +
"]";
}
Expand Down
Expand Up @@ -35,16 +35,23 @@ public interface ThrottlingConfig {
String CONFIG_PATH = "throttling";

/**
* Returns the consumer throttling interval meaning in which duration may the configured
* Returns whether throttling should be enabled or not.
*
* @return whether throttling should be enabled or not.
*/
boolean isEnabled();

/**
* Returns the throttling interval meaning in which duration may the configured
* {@link #getLimit() limit} be processed before throttling further messages.
*
* @return the consumer throttling interval.
*/
Duration getInterval();

/**
* Returns the consumer throttling limit defining processed messages per configured
* {@link #getInterval()} interval}.
* Returns the throttling limit defining processed messages per configured
* {@link #getInterval() interval}.
*
* @return the consumer throttling limit.
*/
Expand All @@ -57,6 +64,7 @@ public interface ThrottlingConfig {
*/
default Config render() {
final Map<String, Object> map = new HashMap<>();
map.put(ConfigValue.ENABLED.getConfigPath(), isEnabled());
map.put(ConfigValue.INTERVAL.getConfigPath(), getInterval().toMillis() + "ms");
map.put(ConfigValue.LIMIT.getConfigPath(), getLimit());
return ConfigFactory.parseMap(map).atKey(CONFIG_PATH);
Expand All @@ -80,16 +88,21 @@ static ThrottlingConfig of(final Config config) {
enum ConfigValue implements KnownConfigValue {

/**
* The consumer throttling interval meaning in which duration may the configured
* Whether throttling should be enabled.
*/
ENABLED("enabled", false),

/**
* The throttling interval meaning in which duration may the configured
* {@link #LIMIT limit} be processed before throttling further messages.
*/
INTERVAL("interval", Duration.ofSeconds(1)),

/**
* The consumer throttling limit defining processed messages per configured
* The throttling limit defining processed messages per configured
* {@link #INTERVAL interval}.
*/
LIMIT("limit", 0);
LIMIT("limit", 100);

private final String path;
private final Object defaultValue;
Expand Down
Expand Up @@ -19,7 +19,6 @@
import java.time.Duration;

import org.assertj.core.api.JUnitSoftAssertions;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -52,6 +51,10 @@ public void testHashCodeAndEquals() {
public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
final ThrottlingConfig underTest = ThrottlingConfig.of(ConfigFactory.empty());

softly.assertThat(underTest.isEnabled())
.as(ThrottlingConfig.ConfigValue.ENABLED.getConfigPath())
.isEqualTo(ThrottlingConfig.ConfigValue.ENABLED.getDefaultValue());

softly.assertThat(underTest.getInterval())
.as(ThrottlingConfig.ConfigValue.INTERVAL.getConfigPath())
.isEqualTo(ThrottlingConfig.ConfigValue.INTERVAL.getDefaultValue());
Expand All @@ -65,6 +68,10 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
public void underTestReturnsValuesOfConfigFile() {
final ThrottlingConfig underTest = ThrottlingConfig.of(ConfigFactory.load("throttling-test"));

softly.assertThat(underTest.isEnabled())
.as(ThrottlingConfig.ConfigValue.ENABLED.getConfigPath())
.isEqualTo(true);

softly.assertThat(underTest.getInterval())
.as(ThrottlingConfig.ConfigValue.INTERVAL.getConfigPath())
.isEqualTo(Duration.ofSeconds(1234L));
Expand Down
1 change: 1 addition & 0 deletions base/service/src/test/resources/throttling-test.conf
@@ -1,4 +1,5 @@
throttling {
enabled = true
interval = 1234s
limit = 5678
}
Expand Up @@ -26,13 +26,6 @@
@Immutable
public interface Amqp10ConsumerConfig {

/**
* Return whether rate limit based on throughput _and_ acknowledgements is enabled.
*
* @return whether rate limit is enabled.
*/
boolean isRateLimitEnabled();

/**
* Return when to forget messages for which redelivery was requested (they may be consumed by another consumer).
*
Expand Down Expand Up @@ -64,11 +57,6 @@ static Amqp10ConsumerConfig of(final Config config) {
*/
enum ConfigValue implements KnownConfigValue {

/**
* Whether consumer rate limit is enabled.
*/
RATE_LIMIT_ENABLED("rate-limit-enabled", false),

/**
* When to forget messages for which redelivery was requested (they may be consumed by another consumer).
*/
Expand Down
Expand Up @@ -23,15 +23,15 @@
public interface ConnectionThrottlingConfig extends ThrottlingConfig {

/**
* Return the factor of many unacknowledged messages are allowed in relation to {@link #getLimit()}.
* Return the factor of many unacknowledged messages are allowed in relation to {@link #getLimit() limit}.
*
* @return the factor of maximum number of messages in flight.
*/
double getMaxInFlightFactor();

/**
* Returns how many unacknowledged messages are allowed utilizung {@link #getLimit()} and
* {@link #getMaxInFlightFactor()}.
* Returns how many unacknowledged messages are allowed utilizing {@link #getLimit() limit} and
* {@link #getMaxInFlightFactor() max inflight factor}.
*
* @return the maximum number of messages in flight.
*/
Expand Down
Expand Up @@ -30,12 +30,10 @@ final class DefaultAmqp10ConsumerConfig implements Amqp10ConsumerConfig {

private static final String CONFIG_PATH = "consumer";

private final boolean rateLimitEnabled;
private final Duration redeliveryExpectationTimeout;
private final ConnectionThrottlingConfig throttlingConfig;

private DefaultAmqp10ConsumerConfig(final ScopedConfig config) {
rateLimitEnabled = config.getBoolean(ConfigValue.RATE_LIMIT_ENABLED.getConfigPath());
redeliveryExpectationTimeout = config.getNonNegativeAndNonZeroDurationOrThrow(
ConfigValue.REDELIVERY_EXPECTATION_TIMEOUT);
throttlingConfig = ConnectionThrottlingConfig.of(config);
Expand All @@ -53,11 +51,6 @@ public static DefaultAmqp10ConsumerConfig of(final Config config) {
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}

@Override
public boolean isRateLimitEnabled() {
return rateLimitEnabled;
}

@Override
public Duration getRedeliveryExpectationTimeout() {
return redeliveryExpectationTimeout;
Expand All @@ -77,21 +70,19 @@ public boolean equals(final Object o) {
return false;
}
final DefaultAmqp10ConsumerConfig that = (DefaultAmqp10ConsumerConfig) o;
return rateLimitEnabled == that.rateLimitEnabled &&
Objects.equals(redeliveryExpectationTimeout, that.redeliveryExpectationTimeout) &&
return Objects.equals(redeliveryExpectationTimeout, that.redeliveryExpectationTimeout) &&
Objects.equals(throttlingConfig, that.throttlingConfig);
}

@Override
public int hashCode() {
return Objects.hash(rateLimitEnabled, redeliveryExpectationTimeout, throttlingConfig);
return Objects.hash(redeliveryExpectationTimeout, throttlingConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"rateLimitEnabled=" + rateLimitEnabled +
", redeliveryExpectationTimeout=" + redeliveryExpectationTimeout +
"redeliveryExpectationTimeout=" + redeliveryExpectationTimeout +
", throttlingConfig=" + throttlingConfig +
"]";
}
Expand Down
Expand Up @@ -57,6 +57,11 @@ static ConnectionThrottlingConfig of(final Config config) {
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()), throttlingConfig);
}

@Override
public boolean isEnabled() {
return throttlingConfig.isEnabled();
}

@Override
public Duration getInterval() {
return throttlingConfig.getInterval();
Expand Down
Expand Up @@ -141,8 +141,7 @@ private Sink<Object, NotUsed> mapMessage() {
);

final Flow<Object, InboundMappingOutcomes, NotUsed> flowWithOptionalThrottling;
if (throttlingConfig != null &&
throttlingConfig.getLimit() > 0 && throttlingConfig.getInterval().negated().isNegative()) {
if (throttlingConfig != null && throttlingConfig.isEnabled()) {
flowWithOptionalThrottling = mapMessageFlow
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval(),
outcomes -> (int) outcomes.getOutcomes()
Expand Down
Expand Up @@ -30,10 +30,10 @@
@NotThreadSafe
final class MessageRateLimiter<S> {

private final boolean enabled;
private final int maxPerPeriod;
private final int maxInFlight;
private final Duration redeliveryExpectationTimeout;
private final boolean enabled;

/**
* Whether consumers are open before the next throttling decision.
Expand All @@ -56,29 +56,28 @@ final class MessageRateLimiter<S> {
private int consumedInPeriod = 0;

private MessageRateLimiter(final ConnectionThrottlingConfig connectionThrottlingConfig,
final Duration redeliveryExpectationTimeout, final boolean enabled) {
final Duration redeliveryExpectationTimeout) {
this.enabled = connectionThrottlingConfig.isEnabled();
this.maxPerPeriod = connectionThrottlingConfig.getLimit();
this.maxInFlight = connectionThrottlingConfig.getMaxInFlight();
this.redeliveryExpectationTimeout = maxDuration(redeliveryExpectationTimeout, Duration.ofSeconds(1L));
this.enabled = enabled;
}

private MessageRateLimiter(final ConnectionThrottlingConfig connectionThrottlingConfig,
final Duration redeliveryExpectationTimeout, final MessageRateLimiter<S> existingLimiter) {
this.enabled = connectionThrottlingConfig.isEnabled();
this.maxPerPeriod = connectionThrottlingConfig.getLimit();
this.maxInFlight = connectionThrottlingConfig.getMaxInFlight();
this.redeliveryExpectationTimeout = maxDuration(redeliveryExpectationTimeout, Duration.ofSeconds(1L));
this.enabled = existingLimiter.enabled;
this.isConsumerOpen = existingLimiter.isConsumerOpen;
this.pendingRedeliveries.addAll(existingLimiter.pendingRedeliveries);
this.inFlight = existingLimiter.inFlight;
this.consumedInPeriod = existingLimiter.consumedInPeriod;
}

static <S> MessageRateLimiter<S> of(final Amqp10Config config, final boolean enabled) {
static <S> MessageRateLimiter<S> of(final Amqp10Config config) {
return new MessageRateLimiter<>(config.getConsumerConfig().getThrottlingConfig(),
config.getConsumerConfig().getRedeliveryExpectationTimeout(),
enabled);
config.getConsumerConfig().getRedeliveryExpectationTimeout());
}

static <S> MessageRateLimiter<S> of(final Amqp10Config config, final MessageRateLimiter<S> existingLimiter) {
Expand Down
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;

import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;

import akka.actor.AbstractActor;
import akka.actor.Actor;
Expand Down Expand Up @@ -65,13 +66,13 @@ interface MessageRateLimiterBehavior<S> extends Actor, Timers {
* @return the rate limiter as a part of the actor state.
*/
default MessageRateLimiter<S> initMessageRateLimiter(final Amqp10Config config) {
final boolean enabled = config.getConsumerConfig().isRateLimitEnabled();
if (enabled) {
final ConnectionThrottlingConfig throttlingConfig = config.getConsumerConfig().getThrottlingConfig();
if (throttlingConfig.isEnabled()) {
// schedule periodic throughput check
timers().startPeriodicTimer(Control.CHECK_RATE_LIMIT, Control.CHECK_RATE_LIMIT,
config.getConsumerConfig().getThrottlingConfig().getInterval());
throttlingConfig.getInterval());
}
return MessageRateLimiter.of(config, enabled);
return MessageRateLimiter.of(config);
}

/**
Expand Down
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -96,11 +95,9 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
.filter(committableMessage -> isNotDryRun(committableMessage.record(), dryRun))
.map(kafkaMessageTransformer::transform);

final int throttlingLimit = throttlingConfig.getLimit();
final Duration throttlingInterval = throttlingConfig.getInterval();
final Source<CommittableTransformationResult, Consumer.Control> throttledSource;
if (throttlingLimit > 0 && throttlingInterval.negated().isNegative()) {
throttledSource = source.throttle(throttlingLimit, throttlingInterval);
if (throttlingConfig.isEnabled()) {
throttledSource = source.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval());
} else {
throttledSource = source;
}
Expand Down
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -90,11 +89,9 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
.map(kafkaMessageTransformer::transform)
.filter(result -> !result.isExpired());

final int throttlingLimit = throttlingConfig.getLimit();
final Duration throttlingInterval = throttlingConfig.getInterval();
final Source<TransformationResult, Consumer.Control> throttledSource;
if (throttlingLimit > 0 && throttlingInterval.negated().isNegative()) {
throttledSource = source.throttle(throttlingLimit, throttlingInterval);
if (throttlingConfig.isEnabled()) {
throttledSource = source.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval());
} else {
throttledSource = source;
}
Expand Down

0 comments on commit 0076d4b

Please sign in to comment.