Skip to content

Commit

Permalink
remove throttling limits currently in place by default for Ditto:
Browse files Browse the repository at this point in the history
* Kafka consumer was limited to receive 100 msg/s
* AMQP 1.0 consumer was limited to receive 100 msg/s
* Websocket was limited to receive 100 msg/s

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Feb 28, 2022
1 parent bfbfa62 commit cab08f8
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static ThrottlingConfig of(final Config config) {

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code Amqp10Config}.
* {@code ThrottlingConfig}.
*/
enum ConfigValue implements KnownConfigValue {

Expand All @@ -89,7 +89,7 @@ enum ConfigValue implements KnownConfigValue {
* The consumer throttling limit defining processed messages per configured
* {@link #INTERVAL interval}.
*/
LIMIT("limit", 100);
LIMIT("limit", 0);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ enum ConfigValue implements KnownConfigValue {
/**
* Whether consumer rate limit is enabled.
*/
RATE_LIMIT_ENABLED("rate-limit-enabled", true),
RATE_LIMIT_ENABLED("rate-limit-enabled", false),

/**
* When to forget messages for which redelivery was requested (they may be consumed by another consumer).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private Sink<Object, NotUsed> mapMessage() {
);

final Flow<Object, InboundMappingOutcomes, NotUsed> flowWithOptionalThrottling;
if (throttlingConfig != null) {
if (throttlingConfig != null &&
throttlingConfig.getLimit() > 0 && throttlingConfig.getInterval().negated().isNegative()) {
flowWithOptionalThrottling = mapMessageFlow
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval(),
outcomes -> (int) outcomes.getOutcomes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -90,10 +91,21 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
.run(materializer);

this.materializer = materializer;
consumerControl = sourceSupplier.get()

final var source = sourceSupplier.get()
.filter(committableMessage -> isNotDryRun(committableMessage.record(), dryRun))
.map(kafkaMessageTransformer::transform)
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval())
.map(kafkaMessageTransformer::transform);

final int throttlingLimit = throttlingConfig.getLimit();
final Duration throttlingInterval = throttlingConfig.getInterval();
final Source<CommittableTransformationResult, Consumer.Control> throttledSource;
if (throttlingLimit > 0 && throttlingInterval.negated().isNegative()) {
throttledSource = source.throttle(throttlingLimit, throttlingInterval);
} else {
throttledSource = source;
}

consumerControl = throttledSource
.flatMapConcat(this::processTransformationResult)
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -83,11 +84,22 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
"Please contact the service team", result
)))
.run(materializer);
consumerControl = sourceSupplier.get()

final var source = sourceSupplier.get()
.filter(consumerRecord -> isNotDryRun(consumerRecord, dryRun))
.map(kafkaMessageTransformer::transform)
.filter(result -> !result.isExpired())
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval())
.filter(result -> !result.isExpired());

final int throttlingLimit = throttlingConfig.getLimit();
final Duration throttlingInterval = throttlingConfig.getInterval();
final Source<TransformationResult, Consumer.Control> throttledSource;
if (throttlingLimit > 0 && throttlingInterval.negated().isNegative()) {
throttledSource = source.throttle(throttlingLimit, throttlingInterval);
} else {
throttledSource = source;
}

consumerControl = throttledSource
.flatMapConcat(this::processTransformationResult)
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Sink.ignore(), Consumer::createDrainingControl)
Expand Down
8 changes: 5 additions & 3 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ ditto {
amqp10 {
consumer {
# Whether rate limit according to throughput and acknowledgement is enabled.
rate-limit-enabled = true
rate-limit-enabled = false
rate-limit-enabled = ${?AMQP10_CONSUMER_RATE_LIMIT_ENABLED}

# When to forget an NAcked redelivery=true message -- those may be consumed by another consumer
Expand Down Expand Up @@ -361,13 +361,15 @@ ditto {
consumer {
throttling {
# Interval at which the consumer is throttled. Values smaller than 1s are treated as 1s.
# Disable throttling with a value of zero.
interval = 1s
interval = ${?KAFKA_CONSUMER_THROTTLING_INTERVAL}

# The maximum number of messages the consumer is allowed to receive within the configured
# throttling interval e.g. 100 msgs/s.
# Values smaller than 1 are treated as 1.
limit = 100
# Values smaller than 0 are treated as 0.
# Disable throttling with a value of zero.
limit = 0
limit = ${?KAFKA_CONSUMER_THROTTLING_LIMIT}

# Throttling detection tolerance in percent e.g. for a tolerance of 5% if consumed messages are above
Expand Down
4 changes: 2 additions & 2 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ ditto {

# The maximum number of messages the websocket is allowed to receive within the configured
# throttling interval e.g. 100 msgs/s. Disable throttling with a value of zero.
limit = 100
limit = 0
limit = ${?GATEWAY_WEBSOCKET_THROTTLING_LIMIT}
}
}
Expand All @@ -134,7 +134,7 @@ ditto {

# The maximum number of things a search SSE connection is allowed to send within the configured
# throttling interval e.g. 100 things/s. Disable throttling with a value of zero.
limit = 100
limit = 0
limit = ${?GATEWAY_SSE_THROTTLING_LIMIT}
}
}
Expand Down

0 comments on commit cab08f8

Please sign in to comment.