Skip to content

Commit

Permalink
Allow to throttle the inbound stream after payload mapping
Browse files Browse the repository at this point in the history
* We can now throttle the inbound stream based on the number of generated
  messages within ditto
* The only consumer which is supporting this right now is kafka

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 21, 2021
1 parent 38ab037 commit 1cca944
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
Expand Down Expand Up @@ -315,7 +316,7 @@ protected void init(final ConnectionContext connectionContext) {
outboundMappingProcessorActor = actorPair.second();

final Sink<Object, NotUsed> inboundDispatchingSink =
startInboundDispatchingSink(connection, protocolAdapter, outboundMappingProcessorActor);
getInboundDispatchingSink(connection, protocolAdapter, outboundMappingProcessorActor);
inboundMappingSink = getInboundMappingSink(connectionContext, protocolAdapter, inboundDispatchingSink);
subscriptionManager = startSubscriptionManager(proxyActorSelection,
connectionContext.getConnectivityConfig().getClientConfig());
Expand Down Expand Up @@ -1667,13 +1668,13 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ConnectionContext con
}

/**
* Starts the {@link InboundDispatchingSink} responsible for signal de-multiplexing and acknowledgement
* Gets the {@link InboundDispatchingSink} responsible for signal de-multiplexing and acknowledgement
* aggregation.
*
* @return the ref to the started {@link InboundDispatchingSink}
* @throws DittoRuntimeException when mapping processor could not get started.
*/
private Sink<Object, NotUsed> startInboundDispatchingSink(final Connection connection,
private Sink<Object, NotUsed> getInboundDispatchingSink(final Connection connection,
final ProtocolAdapter protocolAdapter,
final ActorRef outboundMappingProcessorActor) {

Expand Down Expand Up @@ -1720,9 +1721,14 @@ private Sink<Object, NotUsed> getInboundMappingSink(final ConnectionContext conn
processorPoolSize,
inboundDispatchingSink,
mappingConfig,
getThrottlingConfig().orElse(null),
messageMappingProcessorDispatcher);
}

protected Optional<ThrottlingConfig> getThrottlingConfig() {
return Optional.empty();
}

/**
* Start the subscription manager. Requires MessageMappingProcessorActor to be started to work.
* Creates an actor materializer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
Expand Down Expand Up @@ -47,17 +50,20 @@ public final class InboundMappingSink {
private final MessageDispatcher messageMappingProcessorDispatcher;
private final Sink<Object, ?> inboundDispatchingSink;
private final InboundMappingProcessor initialInboundMappingProcessor;
@Nullable private final ThrottlingConfig throttlingConfig;

private InboundMappingSink(final InboundMappingProcessor initialInboundMappingProcessor,
final ConnectionId connectionId,
final int processorPoolSize,
final Sink<Object, ?> inboundDispatchingSink,
final MappingConfig mappingConfig,
@Nullable final ThrottlingConfig throttlingConfig,
final MessageDispatcher messageMappingProcessorDispatcher) {

this.messageMappingProcessorDispatcher = messageMappingProcessorDispatcher;
this.initialInboundMappingProcessor = initialInboundMappingProcessor;
this.inboundDispatchingSink = inboundDispatchingSink;
this.throttlingConfig = throttlingConfig;

logger = DittoLoggerFactory.getThreadSafeLogger(InboundMappingSink.class)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId);
Expand All @@ -74,6 +80,7 @@ private InboundMappingSink(final InboundMappingProcessor initialInboundMappingPr
* @param processorPoolSize how many message processing may happen in parallel per direction (incoming or outgoing).
* @param inboundDispatchingSink used to dispatch inbound signals.
* @param mappingConfig The mapping config.
* @param throttlingConfig the throttling config.
* @param messageMappingProcessorDispatcher The dispatcher which is used for async mapping.
* @return the Akka configuration Props object.
*/
Expand All @@ -83,13 +90,15 @@ public static Sink<Object, NotUsed> createSink(
final int processorPoolSize,
final Sink<Object, ?> inboundDispatchingSink,
final MappingConfig mappingConfig,
@Nullable final ThrottlingConfig throttlingConfig,
final MessageDispatcher messageMappingProcessorDispatcher) {

final var inboundMappingSink = new InboundMappingSink(inboundMappingProcessor,
connectionId,
processorPoolSize,
inboundDispatchingSink,
mappingConfig,
throttlingConfig,
messageMappingProcessorDispatcher);

return inboundMappingSink.getSink();
Expand All @@ -111,22 +120,30 @@ private Sink<Object, NotUsed> getSink() {
}

private Sink<Object, NotUsed> mapMessage() {
final Sink<MappingContext, NotUsed> mappingSink = Flow.<MappingContext>create()
final Flow<Object, InboundMappingOutcomes, NotUsed> mapMessageFlow = Flow.create()
.statefulMapConcat(StatefulExternalMessageHandler::new)
// parallelize potentially CPU-intensive payload mapping on this actor's dispatcher
.mapAsync(processorPoolSize, mappingContext -> CompletableFuture.supplyAsync(
() -> {
logger.debug("Received inbound Message to map: {}", mappingContext);
return mapInboundMessage(mappingContext.message, mappingContext.mappingProcessor);
},
messageMappingProcessorDispatcher)
)
);

final Flow<Object, InboundMappingOutcomes, NotUsed> flowWithOptionalThrottling;
if (throttlingConfig != null) {
flowWithOptionalThrottling = mapMessageFlow
.throttle(throttlingConfig.getLimit(), throttlingConfig.getInterval(),
outcomes -> outcomes.getOutcomes().size());
} else {
flowWithOptionalThrottling = mapMessageFlow;
}

return flowWithOptionalThrottling
// map returns outcome
.map(Object.class::cast)
.to(inboundDispatchingSink);

return Flow.create()
.statefulMapConcat(StatefulExternalMessageHandler::new)
.to(mappingSink);
}

private int determinePoolSize(final int connectionPoolSize, final int maxPoolSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -25,6 +26,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
Expand Down Expand Up @@ -212,7 +214,7 @@ private void startKafkaConsumer(final ConsumerData consumerData, final boolean d
final DefaultKafkaConsumerSourceSupplier sourceSupplier =
new DefaultKafkaConsumerSourceSupplier(propertiesFactory, consumerData.getAddress(), dryRun);
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), kafkaConfig.getConsumerConfig(), sourceSupplier,
KafkaConsumerActor.props(connection(), sourceSupplier,
consumerData.getAddress(), getInboundMappingSink(), consumerData.getSource(), dryRun);
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
Expand Down Expand Up @@ -240,6 +242,11 @@ protected CompletionStage<Status.Status> startPublisherActor() {
return CompletableFuture.completedFuture(DONE);
}

@Override
protected Optional<ThrottlingConfig> getThrottlingConfig() {
return Optional.of(kafkaConfig.getConsumerConfig().getThrottlingConfig());
}

private void stopPublisherActor() {
if (kafkaPublisherActor != null) {
logger.debug("Stopping child actor <{}>.", kafkaPublisherActor.path());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.service.config.KafkaConsumerConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.BaseConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
Expand Down Expand Up @@ -70,7 +69,6 @@ final class KafkaConsumerActor extends BaseConsumerActor {

@SuppressWarnings("unused")
private KafkaConsumerActor(final Connection connection,
final KafkaConsumerConfig kafkaConfig,
final KafkaConsumerSourceSupplier sourceSupplier,
final String sourceAddress, final Sink<Object, NotUsed> inboundMappingSink,
final Source source, final boolean dryRun) {
Expand All @@ -85,15 +83,15 @@ private KafkaConsumerActor(final Connection connection,
: input -> null;
final KafkaMessageTransformer kafkaMessageTransformer =
new KafkaMessageTransformer(source, sourceAddress, headerEnforcementFilterFactory, inboundMonitor);
kafkaStream = new KafkaConsumerStream(kafkaConfig, sourceSupplier, kafkaMessageTransformer, dryRun,
kafkaStream = new KafkaConsumerStream(sourceSupplier, kafkaMessageTransformer, dryRun,
Materializer.createMaterializer(this::getContext));
}

static Props props(final Connection connection, final KafkaConsumerConfig kafkaConfig,
static Props props(final Connection connection,
final KafkaConsumerSourceSupplier sourceSupplier, final String sourceAddress,
final Sink<Object, NotUsed> inboundMappingSink, final Source source,
final boolean dryRun) {
return Props.create(KafkaConsumerActor.class, connection, kafkaConfig, sourceSupplier, sourceAddress,
return Props.create(KafkaConsumerActor.class, connection, sourceSupplier, sourceAddress,
inboundMappingSink, source, dryRun);
}

Expand Down Expand Up @@ -153,16 +151,13 @@ private final class KafkaConsumerStream {
@Nullable private Consumer.Control consumerControl;

private KafkaConsumerStream(
final KafkaConsumerConfig kafkaConfig,
final KafkaConsumerSourceSupplier sourceSupplier,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun,
final Materializer materializer) {

this.materializer = materializer;
runnableKafkaStream = sourceSupplier.get()
.throttle(kafkaConfig.getThrottlingConfig().getLimit(),
kafkaConfig.getThrottlingConfig().getInterval())
.filter(consumerRecord -> isNotDryRun(consumerRecord, dryRun))
.filter(consumerRecord -> consumerRecord.value() != null)
.filter(this::isNotExpired)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
TestProbe.apply(actorSystem).ref(), actorSystem, actorSystem.settings().config());

return InboundMappingSink.createSink(inboundMappingProcessor, CONNECTION_ID, 99,
inboundDispatchingSink, connectivityConfig.getMappingConfig(),
inboundDispatchingSink, connectivityConfig.getMappingConfig(), null,
actorSystem.dispatchers().defaultGlobalDispatcher());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,

final Sink<Object, NotUsed> inboundMappingSink =
InboundMappingSink.createSink(inboundMappingProcessor, CONNECTION_ID, 99,
inboundDispatchingSink, TestConstants.CONNECTIVITY_CONFIG.getMappingConfig(),
inboundDispatchingSink, TestConstants.CONNECTIVITY_CONFIG.getMappingConfig(), null,
actorSystem.dispatchers().defaultGlobalDispatcher());

return Source.actorRef(99, OverflowStrategy.dropNew())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void onError() {
1,
inboundSink,
TestConstants.MAPPING_CONFIG,
null,
system.dispatchers().defaultGlobalDispatcher());
final ActorRef underTest = Source.actorRef(1, OverflowStrategy.dropNew())
.to(inboundMappingSink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private Sink<Object, NotUsed> setupMappingSink(final ActorRef testRef,

final MessageDispatcher messageDispatcher = actorSystem.dispatchers().defaultGlobalDispatcher();
return InboundMappingSink.createSink(inboundMappingProcessor, CONNECTION_ID, 99, inboundDispatchingSink,
TestConstants.MAPPING_CONFIG, messageDispatcher);
TestConstants.MAPPING_CONFIG, null, messageDispatcher);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.eclipse.ditto.connectivity.service.messaging.TestConstants.header;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -28,13 +26,11 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.HeaderMapping;
import org.eclipse.ditto.connectivity.model.PayloadMapping;
import org.eclipse.ditto.connectivity.model.ReplyTarget;
import org.eclipse.ditto.connectivity.service.config.KafkaConsumerConfig;
import org.eclipse.ditto.connectivity.service.messaging.AbstractConsumerActorTest;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.junit.Before;
Expand Down Expand Up @@ -63,18 +59,11 @@ public class KafkaConsumerActorTest extends AbstractConsumerActorTest<ConsumerRe
private static final String CUSTOM_KEY = "the.key";
private static final String CUSTOM_TIMESTAMP = "the.timestamp";

private KafkaConsumerConfig kafkaConsumerConfig;
private BoundedSourceQueue<ConsumerRecord<String, String>> sourceQueue;
private Source<ConsumerRecord<String, String>, Consumer.Control> source;

@Before
public void initKafka() {
kafkaConsumerConfig = Mockito.mock(KafkaConsumerConfig.class);
final ThrottlingConfig throttlingConfig = Mockito.mock(ThrottlingConfig.class);
when(throttlingConfig.getInterval()).thenReturn(Duration.ofSeconds(1));
when(throttlingConfig.getLimit()).thenReturn(10);
when(kafkaConsumerConfig.getThrottlingConfig()).thenReturn(throttlingConfig);

final Consumer.Control control = Mockito.mock(Consumer.Control.class);
final Pair<BoundedSourceQueue<ConsumerRecord<String, String>>, Source<ConsumerRecord<String, String>, NotUsed>>
sourcePair = Source.<ConsumerRecord<String, String>>queue(20)
Expand Down Expand Up @@ -117,7 +106,7 @@ protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMapping
));
final HeaderMapping mappingWithSpecialKafkaHeaders = ConnectivityModelFactory.newHeaderMapping(map);

return KafkaConsumerActor.props(CONNECTION, kafkaConsumerConfig, () -> source, "kafka", inboundMappingSink,
return KafkaConsumerActor.props(CONNECTION, () -> source, "kafka", inboundMappingSink,
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT)
.address("kafka")
Expand Down

0 comments on commit 1cca944

Please sign in to comment.