Skip to content

Commit

Permalink
Throttle kafka consumer stream also after consuming a record from Kaf…
Browse files Browse the repository at this point in the history
…ka (throttling after payload mapping has already been implemented). This early throttling avoids being overloaded with malformed messages and processing them at full speed.

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Oct 25, 2021
1 parent f1ccd79 commit 879c323
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand Down Expand Up @@ -55,7 +56,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
AtLeastOnceConsumerStream(
final AtLeastOnceKafkaConsumerSourceSupplier sourceSupplier,
final CommitterSettings committerSettings,
final int consumerMaxInflight,
final ConnectionThrottlingConfig throttlingConfig,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun,
final Materializer materializer,
Expand Down Expand Up @@ -87,8 +88,9 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
consumerControl = sourceSupplier.get()
.filter(committableMessage -> isNotDryRun(committableMessage.record(), dryRun))
.map(kafkaMessageTransformer::transform)
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval())
.flatMapConcat(this::processTransformationResult)
.mapAsync(consumerMaxInflight, x -> x)
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(materializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand Down Expand Up @@ -50,7 +51,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {

AtMostOnceConsumerStream(
final AtMostOnceKafkaConsumerSourceSupplier sourceSupplier,
final int consumerMaxInflight,
final ConnectionThrottlingConfig throttlingConfig,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun,
final Materializer materializer,
Expand Down Expand Up @@ -81,8 +82,9 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
.filter(consumerRecord -> isNotDryRun(consumerRecord, dryRun))
.map(kafkaMessageTransformer::transform)
.filter(result -> !result.isExpired())
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval())
.flatMapConcat(this::processTransformationResult)
.mapAsync(consumerMaxInflight, x -> x)
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(materializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ KafkaConsumerStream newAtMostOnceConsumerStream(

final KafkaMessageTransformer kafkaMessageTransformer = buildKafkaMessageTransformer(inboundMonitor);
return new AtMostOnceConsumerStream(atMostOnceKafkaConsumerSourceSupplier,
throttlingConfig.getMaxInFlight(),
throttlingConfig,
kafkaMessageTransformer,
dryRun,
materializer,
Expand All @@ -105,7 +105,7 @@ KafkaConsumerStream newAtLeastOnceConsumerStream(
final KafkaMessageTransformer kafkaMessageTransformer = buildKafkaMessageTransformer(inboundMonitor);
return new AtLeastOnceConsumerStream(atLeastOnceKafkaConsumerSourceSupplier,
propertiesFactory.getCommitterSettings(),
throttlingConfig.getMaxInFlight(),
throttlingConfig,
kafkaMessageTransformer,
dryRun,
materializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionMetricsResponse;
import org.eclipse.ditto.connectivity.service.config.ClientConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfigProvider;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
Expand Down Expand Up @@ -161,6 +161,7 @@ public final class TestConstants {
public static final PingConfig PING_CONFIG;
public static final ProtocolConfig PROTOCOL_CONFIG;
public static final MonitoringConfig MONITORING_CONFIG;
public static final ConnectionThrottlingConfig KAFKA_THROTTLING_CONFIG;

static {
final DefaultScopedConfig dittoScopedConfig = DefaultScopedConfig.dittoScoped(CONFIG);
Expand All @@ -172,6 +173,7 @@ public final class TestConstants {
PING_CONFIG = CONNECTIVITY_CONFIG.getPingConfig();
PROTOCOL_CONFIG = CONNECTIVITY_CONFIG.getProtocolConfig();
MONITORING_CONFIG = CONNECTIVITY_CONFIG.getMonitoringConfig();
KAFKA_THROTTLING_CONFIG = CONNECTION_CONFIG.getKafkaConfig().getConsumerConfig().getThrottlingConfig();
}

private static final ConnectionType TYPE = ConnectionType.AMQP_10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void isImmutable() {
}

@Test
public void appliesBackPressureWhenMessagesAreNotAcknowledged() throws InterruptedException {
public void appliesBackPressureWhenMessagesAreNotAcknowledged() {
new TestKit(actorSystem) {{
/*
* Given we have a kafka source which emits records that are all transformed to External messages.
Expand All @@ -113,13 +114,14 @@ public void appliesBackPressureWhenMessagesAreNotAcknowledged() throws Interrupt
.thenReturn(CommittableTransformationResult.of(result, committableMessage.committableOffset()));
final ConnectionMonitor connectionMonitor = mock(ConnectionMonitor.class);
final ConnectionMonitor ackMonitor = mock(ConnectionMonitor.class);
final int maxInflight = 10;
final int maxInflight = TestConstants.KAFKA_THROTTLING_CONFIG.getMaxInFlight();
final Materializer materializer = Materializer.createMaterializer(actorSystem);
final Sink<DittoRuntimeException, TestSubscriber.Probe<DittoRuntimeException>> dreSink =
TestSink.create(actorSystem);

// When starting the stream
new AtLeastOnceConsumerStream(sourceSupplier, CommitterSettings.apply(actorSystem), maxInflight,
new AtLeastOnceConsumerStream(sourceSupplier, CommitterSettings.apply(actorSystem),
TestConstants.KAFKA_THROTTLING_CONFIG,
messageTransformer, false, materializer,
connectionMonitor, ackMonitor, inboundMappingSink, dreSink);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void isImmutable() {
}

@Test
public void appliesBackPressureWhenMessagesAreNotAcknowledged() throws InterruptedException {
public void appliesBackPressureWhenMessagesAreNotAcknowledged() {
new TestKit(actorSystem) {{
/*
* Given we have a kafka source which emits records that are all transformed to External messages.
Expand All @@ -104,13 +105,14 @@ public void appliesBackPressureWhenMessagesAreNotAcknowledged() throws Interrupt
when(messageTransformer.transform(ArgumentMatchers.<ConsumerRecord<String, String>>any())).thenReturn(
result);
final ConnectionMonitor connectionMonitor = mock(ConnectionMonitor.class);
final int maxInflight = 10;
final int maxInflight = TestConstants.KAFKA_THROTTLING_CONFIG.getMaxInFlight();
final Materializer materializer = Materializer.createMaterializer(actorSystem);
final Sink<DittoRuntimeException, TestSubscriber.Probe<DittoRuntimeException>> dreSink =
TestSink.create(actorSystem);

// When starting the stream
new AtMostOnceConsumerStream(sourceSupplier, maxInflight, messageTransformer, false, materializer,
new AtMostOnceConsumerStream(sourceSupplier, TestConstants.KAFKA_THROTTLING_CONFIG, messageTransformer,
false, materializer,
connectionMonitor, inboundMappingSink, dreSink);

inboundSinkProbe.ensureSubscription();
Expand Down

0 comments on commit 879c323

Please sign in to comment.