Skip to content

Commit

Permalink
[#1081] Fix client actors not reacting to connectivity config updates.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 26, 2021
1 parent ddae6dc commit 8a5f674
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Expand Up @@ -358,20 +358,21 @@ public void postStop() {

@Override
public void onConnectivityConfigModified(final ConnectivityConfig modifiedConfig) {
connectionContext = connectionContext.withConnectivityConfig(modifiedConfig);
final var modifiedContext = connectionContext.withConnectivityConfig(modifiedConfig);
if (hasInboundMapperConfigChanged(modifiedConfig)) {
logger.debug("Config changed for InboundMappingProcessor, recreating it.");
final var inboundMappingProcessor =
InboundMappingProcessor.of(connectionContext, getContext().getSystem(), protocolAdapter, logger);
InboundMappingProcessor.of(modifiedContext, getContext().getSystem(), protocolAdapter, logger);
inboundMappingProcessorActor.tell(new ReplaceInboundMappingProcessor(inboundMappingProcessor), getSelf());
}
if (hasOutboundMapperConfigChanged(modifiedConfig)) {
logger.debug("Config changed for OutboundMappingProcessor, recreating it.");
final var outboundMappingProcessor =
OutboundMappingProcessor.of(connectionContext, getContext().getSystem(), protocolAdapter, logger);
OutboundMappingProcessor.of(modifiedContext, getContext().getSystem(), protocolAdapter, logger);
outboundMappingProcessorActor.tell(new ReplaceOutboundMappingProcessor(outboundMappingProcessor),
getSelf());
}
connectionContext = modifiedContext;
}

/**
Expand Down
Expand Up @@ -434,14 +434,14 @@ public ThreadSafeDittoLoggingAdapter log() {

@Override
public void onConnectivityConfigModified(final ConnectivityConfig connectivityConfig) {
this.connectivityConfig = connectivityConfig;
final Amqp10Config amqp10Config = connectivityConfig.getConnectionConfig().getAmqp10Config();
if (hasMessageRateLimiterConfigChanged(amqp10Config)) {
this.messageRateLimiter = MessageRateLimiter.of(amqp10Config, messageRateLimiter);
log.info("Built new rate limiter from existing one with modified config: {}", amqp10Config);
} else {
log.debug("Relevant config for MessageRateLimiter unchanged, do nothing.");
}
this.connectivityConfig = connectivityConfig;
}

private boolean hasMessageRateLimiterConfigChanged(final Amqp10Config amqp10Config) {
Expand Down

0 comments on commit 8a5f674

Please sign in to comment.