Skip to content

Commit

Permalink
only create producers with static addresses (no placeholders) on star…
Browse files Browse the repository at this point in the history
…tup, adapt AmqpClientActorTest to be closer to reality by tracking the created producers

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Aug 14, 2019
1 parent 7732ccf commit d92dd7e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 91 deletions.
4 changes: 2 additions & 2 deletions legal/NOTICE-THIRD-PARTY.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
* Source: https://github.com/ralfstx/minimal-json


## Caffeine cache (2.7.0)
## Caffeine cache (2.8.0)

* Maven coordinates: `com.github.ben-manes.caffeine:caffeine:2.7.0`
* Maven coordinates: `com.github.ben-manes.caffeine:caffeine:2.8.0`
* License: [Apache-2.0](licenses/Apache-2.0.txt)
* Project: https://github.com/ben-manes/caffeine
* Source: https://github.com/ben-manes/caffeine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.proton.amqp.Symbol;
import org.eclipse.ditto.model.base.common.Placeholders;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
Expand All @@ -51,6 +52,7 @@
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.services.connectivity.util.ConnectionLogUtil;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.utils.akka.LogUtil;

Expand Down Expand Up @@ -89,20 +91,25 @@ public final class AmqpPublisherActor extends BasePublisherActor<AmqpTarget> {
private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);

private final Session session;
private final LinkedHashMap<Destination, MessageProducer> replyToMap;
private final Map<Destination, MessageProducer> targetMap;
private final int replyToCacheSize;
private final LinkedHashMap<Destination, MessageProducer> dynamicTargets;
private final Map<Destination, MessageProducer> staticTargets;
private final int producerCacheSize;

@SuppressWarnings("unused")
private AmqpPublisherActor(final String connectionId, final List<Target> targets, final Session session,
final ConnectionConfig connectionConfig) {
super(connectionId, targets);
ConnectionLogUtil.enhanceLogWithConnectionId(log, connectionId);
this.session = checkNotNull(session, "session");
this.targetMap = new HashMap<>();
this.replyToMap = new LinkedHashMap<>(); // insertion order important for maintenance of reply-to cache
replyToCacheSize = checkArgument(connectionConfig.getAmqp10Config().getReplyToCacheSize(), i -> i > 0,
() -> "reply-to-cache-size must be 1 or more");
createAllTargetProducers(targets);
this.staticTargets = new HashMap<>();
this.dynamicTargets = new LinkedHashMap<>(); // insertion order important for maintenance of producer cache
producerCacheSize = checkArgument(connectionConfig.getAmqp10Config().getProducerCacheSize(), i -> i > 0,
() -> "producer-cache-size must be 1 or more");

// we open producers for static addresses (no placeholders) on startup and try to reopen them when closed.
// producers for other addresses (with placeholders, reply-to) are opened on demand and may be closed to
// respect the cache size limit
createStaticTargetProducers(targets);
}

/**
Expand All @@ -122,14 +129,14 @@ static Props props(final String connectionId, final List<Target> targets, final

@Override
protected void preEnhancement(final ReceiveBuilder receiveBuilder) {
receiveBuilder.match(ProducerClosedStatusReport.class, this::handleConnectionStatusReport);
receiveBuilder.match(ProducerClosedStatusReport.class, this::handleProducerClosedStatusReport);
}

private void handleConnectionStatusReport(final ProducerClosedStatusReport report) {
private void handleProducerClosedStatusReport(final ProducerClosedStatusReport report) {
final MessageProducer producer = report.getMessageProducer();
log.info("Got closed JMS producer '{}'", producer);
findByValue(replyToMap, producer).map(Map.Entry::getKey).forEach(replyToMap::remove);
findByValue(targetMap, producer).map(Map.Entry::getKey).forEach(this::createTargetProducer);
findByValue(dynamicTargets, producer).map(Map.Entry::getKey).forEach(dynamicTargets::remove);
findByValue(staticTargets, producer).map(Map.Entry::getKey).forEach(this::createTargetProducer);
}

@Override
Expand Down Expand Up @@ -223,7 +230,6 @@ private Message toJmsMessage(final ExternalMessage externalMessage, final AmqpTa
message = session.createMessage();
}
// replace default destination of session by message's actual destination
message.setJMSDestination(amqpTarget.getJmsDestination());

// some headers must be handled differently to be passed to amqp message
final Map<String, String> headers = externalMessage.getHeaders();
Expand Down Expand Up @@ -256,26 +262,26 @@ private Message toJmsMessage(final ExternalMessage externalMessage, final AmqpTa
@Nullable
private MessageProducer getProducer(final Destination destination) {
final MessageProducer messageProducer;
if (targetMap.containsKey(destination)) {
messageProducer = targetMap.get(destination);
if (staticTargets.containsKey(destination)) {
messageProducer = staticTargets.get(destination);
} else {
messageProducer = replyToMap.computeIfAbsent(destination, this::createReplyToProducer);
messageProducer = dynamicTargets.computeIfAbsent(destination, this::createProducer);
maintainReplyToMap();
}
return messageProducer;
}

private void maintainReplyToMap() {
// cache maintenance strategy = discard eldest
while (replyToMap.size() > replyToCacheSize) {
final Map.Entry<Destination, MessageProducer> cachedProducer = replyToMap.entrySet().iterator().next();
while (dynamicTargets.size() > producerCacheSize) {
final Map.Entry<Destination, MessageProducer> cachedProducer = dynamicTargets.entrySet().iterator().next();
closeCachedProducer(cachedProducer);
replyToMap.remove(cachedProducer.getKey());
dynamicTargets.remove(cachedProducer.getKey());
}
}

@Nullable
private MessageProducer createReplyToProducer(final Destination destination) {
private MessageProducer createProducer(final Destination destination) {
log.debug("Creating AMQP Producer for '{}'", destination);
try {
return session.createProducer(destination);
Expand All @@ -301,8 +307,8 @@ private void closeCachedProducer(final Map.Entry<Destination, MessageProducer> c
@Override
public void postStop() throws Exception {
super.postStop();
replyToMap.entrySet().forEach(this::closeCachedProducer);
targetMap.entrySet().forEach(this::closeCachedProducer);
dynamicTargets.entrySet().forEach(this::closeCachedProducer);
staticTargets.entrySet().forEach(this::closeCachedProducer);
}

@Override
Expand All @@ -313,7 +319,7 @@ protected DiagnosticLoggingAdapter log() {
// create a target producer. the previous incarnation, if any, must be closed.
private void createTargetProducer(final Destination destination) {
try {
targetMap.put(destination, session.createProducer(destination));
staticTargets.put(destination, session.createProducer(destination));
log.info("Target producer <{}> created", destination);
} catch (final JMSException jmsException) {
// target producer not creatable; stop self and request restart by parent
Expand All @@ -325,10 +331,13 @@ private void createTargetProducer(final Destination destination) {
}
}

private void createAllTargetProducers(final List<Target> targets) {
private void createStaticTargetProducers(final List<Target> targets) {
// using loop so that already created targets are closed on exception
for (final Target target : targets) {
createTargetProducer(toPublishTarget(target.getAddress()).getJmsDestination());
// only targets with static addresses should stay open
if (!Placeholders.containsAnyPlaceholder(target.getAddress())) {
createTargetProducer(toPublishTarget(target.getAddress()).getJmsDestination());
}
}
}

Expand All @@ -343,7 +352,6 @@ private static String jmsExceptionToString(final JMSException jmsException) {

private static Stream<Map.Entry<Destination, MessageProducer>> findByValue(
final Map<Destination, MessageProducer> producerMap, final MessageProducer value) {

return producerMap.entrySet().stream().filter(entry -> Objects.equals(entry.getValue(), value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public interface Amqp10Config {
int getConsumerThrottlingLimit();

/**
* Returns how many reply-to addresses to cache.
* Returns how many message producers to cache.
*
* @return the reply-to cache size.
* @return the message producer cache size.
*/
int getReplyToCacheSize();
int getProducerCacheSize();

/**
* An enumeration of the known config path expressions and their associated default values for
Expand All @@ -66,9 +66,9 @@ enum Amqp10ConfigValue implements KnownConfigValue {
CONSUMER_THROTTLING_LIMIT("consumer.throttling.limit", 100),

/**
* How many reply-to addresses to cache per client actor.
* How many message producers to cache per client actor.
*/
REPLY_TO_CACHE_SIZE("reply-to-cache-size", 10);
PRODUCER_CACHE_SIZE("producer-cache-size", 10);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public final class DefaultAmqp10Config implements Amqp10Config {

private final Duration consumerThrottlingInterval;
private final int consumerThrottlingLimit;
private final int replyToCacheSize;
private final int producerCacheSize;

private DefaultAmqp10Config(final ScopedConfig config) {
consumerThrottlingInterval = config.getDuration(Amqp10ConfigValue.CONSUMER_THROTTLING_INTERVAL.getConfigPath());
consumerThrottlingLimit = config.getInt(Amqp10ConfigValue.CONSUMER_THROTTLING_LIMIT.getConfigPath());
replyToCacheSize = config.getInt(Amqp10ConfigValue.REPLY_TO_CACHE_SIZE.getConfigPath());
producerCacheSize = config.getInt(Amqp10ConfigValue.PRODUCER_CACHE_SIZE.getConfigPath());
}

/**
Expand All @@ -62,8 +62,8 @@ public int getConsumerThrottlingLimit() {
}

@Override
public int getReplyToCacheSize() {
return replyToCacheSize;
public int getProducerCacheSize() {
return producerCacheSize;
}

@Override
Expand All @@ -76,21 +76,21 @@ public boolean equals(final Object o) {
}
final DefaultAmqp10Config that = (DefaultAmqp10Config) o;
return consumerThrottlingLimit == that.consumerThrottlingLimit &&
replyToCacheSize == that.replyToCacheSize &&
producerCacheSize == that.producerCacheSize &&
Objects.equals(consumerThrottlingInterval, that.consumerThrottlingInterval);
}

@Override
public int hashCode() {
return Objects.hash(consumerThrottlingInterval, consumerThrottlingLimit, replyToCacheSize);
return Objects.hash(consumerThrottlingInterval, consumerThrottlingLimit, producerCacheSize);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"consumerThrottlingInterval=" + consumerThrottlingInterval +
", consumerThrottlingLimit=" + consumerThrottlingLimit +
", replyToCacheSize=" + replyToCacheSize +
", producerCacheSize=" + producerCacheSize +
"]";
}
}
Loading

0 comments on commit d92dd7e

Please sign in to comment.