From e5460de8139c0ef7e9bc28ab32665496767c00f8 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Wed, 24 May 2017 08:33:19 +0100 Subject: [PATCH] ARTEMIS-1179: Add Optional Client JMS Destination Cache Add topic and queue cache maps in Session. Add configuration to use cache or not with defaulting to false, which keeps existing behaviour as the default. --- .../jms/client/ActiveMQConnection.java | 9 ++++- .../jms/client/ActiveMQConnectionFactory.java | 23 ++++++++--- .../artemis/jms/client/ActiveMQSession.java | 38 +++++++++++++++++-- .../jms/client/ActiveMQXAConnection.java | 3 +- .../artemis/jms/client/ActiveMQXASession.java | 3 +- .../artemis/uri/ConnectionFactoryURITest.java | 12 ++++++ .../ActiveMQRAManagedConnectionFactory.java | 8 ++++ .../artemis/ra/ActiveMQResourceAdapter.java | 31 +++++++++++++++ .../ra/ConnectionFactoryProperties.java | 17 +++++++++ docs/user-manual/en/perf-tuning.md | 5 +++ docs/user-manual/en/using-jms.md | 11 ++++++ .../artemis-ra-rar/src/main/resources/ra.xml | 6 +++ .../ra/ActiveMQResourceAdapterConfigTest.java | 6 +++ 13 files changed, 159 insertions(+), 13 deletions(-) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index c90e630ce1a..90ab9525994 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -126,6 +126,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme private final int transactionBatchSize; + private final boolean cacheDestinations; + private ClientSession initialSession; private final Exception creationStack; @@ -143,6 +145,7 @@ public ActiveMQConnection(final ConnectionFactoryOptions options, final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final boolean cacheDestinations, final ClientSessionFactory sessionFactory) { this.options = options; @@ -164,6 +167,8 @@ public ActiveMQConnection(final ConnectionFactoryOptions options, this.transactionBatchSize = transactionBatchSize; + this.cacheDestinations = cacheDestinations; + creationStack = new Exception(); } @@ -654,9 +659,9 @@ protected ActiveMQSession createAMQSession(boolean isXA, ClientSession session, int type) { if (isXA) { - return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, session, type); + return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, session, type); } else { - return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, session, type); + return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, session, type); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java index dee81573d15..06acd25bb01 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java @@ -84,6 +84,8 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte private String deserializationWhiteList; + private boolean cacheDestinations; + private boolean finalizeChecks; @Override @@ -428,6 +430,15 @@ public synchronized void setTransactionBatchSize(final int transactionBatchSize) this.transactionBatchSize = transactionBatchSize; } + public synchronized boolean isCacheDestinations() { + return this.cacheDestinations; + } + + public synchronized void setCacheDestinations(final boolean cacheDestinations) { + checkWrite(); + this.cacheDestinations = cacheDestinations; + } + public synchronized long getClientFailureCheckPeriod() { return serverLocator.getClientFailureCheckPeriod(); } @@ -766,19 +777,19 @@ protected synchronized ActiveMQConnection createConnectionInternal(final String if (isXA) { if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) { - connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) { - connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) { - connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } } else { if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) { - connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) { - connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) { - connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 2ea145a77d1..a8aceec8eff 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -45,8 +45,10 @@ import javax.transaction.xa.XAResource; import java.io.Serializable; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -95,6 +97,12 @@ public class ActiveMQSession implements QueueSession, TopicSession { private final Set consumers = new HashSet<>(); + private final boolean cacheDestination; + + private final Map topicCache = new ConcurrentHashMap<>(); + + private final Map queueCache = new ConcurrentHashMap<>(); + // Constructors -------------------------------------------------- protected ActiveMQSession(final ConnectionFactoryOptions options, @@ -102,6 +110,7 @@ protected ActiveMQSession(final ConnectionFactoryOptions options, final boolean transacted, final boolean xa, final int ackMode, + final boolean cacheDestination, final ClientSession session, final int sessionType) { this.options = options; @@ -117,6 +126,8 @@ protected ActiveMQSession(final ConnectionFactoryOptions options, this.transacted = transacted; this.xa = xa; + + this.cacheDestination = cacheDestination; } // Session implementation ---------------------------------------- @@ -255,6 +266,8 @@ public void close() throws JMSException { throw JMSExceptionHelper.convertFromActiveMQException(e); } } + topicCache.clear(); + queueCache.clear(); } @Override @@ -367,7 +380,17 @@ public Queue createQueue(final String queueName) throws JMSException { } try { - return internalCreateQueue(queueName, false); + Queue queue = null; + if (cacheDestination) { + queue = queueCache.get(queueName); + } + if (queue == null) { + queue = internalCreateQueue(queueName, false); + } + if (cacheDestination) { + queueCache.put(queueName, queue); + } + return queue; } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } @@ -396,9 +419,18 @@ public Topic createTopic(final String topicName) throws JMSException { if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) { throw new IllegalStateException("Cannot create a topic on a QueueSession"); } - try { - return internalCreateTopic(topicName, false); + Topic topic = null; + if (cacheDestination) { + topic = topicCache.get(topicName); + } + if (topic == null) { + topic = internalCreateTopic(topicName, false); + } + if (cacheDestination) { + topicCache.put(topicName, topic); + } + return topic; } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java index 4407fbbb737..fcc9bb25168 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java @@ -41,8 +41,9 @@ public ActiveMQXAConnection(final ConnectionFactoryOptions options, final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final boolean cacheDestinations, final ClientSessionFactory sessionFactory) { - super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory); + super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, sessionFactory); } @Override diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java index 4a7694fc8e2..6ec936e84be 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java @@ -36,8 +36,9 @@ protected ActiveMQXASession(final ConnectionFactoryOptions options, boolean transacted, boolean xa, int ackMode, + boolean cacheDestinations, ClientSession session, int sessionType) { - super(options, connection, transacted, xa, ackMode, session, sessionType); + super(options, connection, transacted, xa, ackMode, cacheDestinations, session, sessionType); } } diff --git a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java index dc2d459790b..9c1b9b8d6c9 100644 --- a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java +++ b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java @@ -400,6 +400,18 @@ public void testJGroupsPropertiesURI() throws Exception { checkEquals(bean, connectionFactoryWithHA, factory); } + @Test + public void testCacheDestinations() throws Exception { + ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030"), null); + + Assert.assertFalse(factory.isCacheDestinations()); + + factory = parser.newObject(new URI("tcp://localhost:3030?cacheDestinations=true"), null); + + Assert.assertTrue(factory.isCacheDestinations()); + + } + private void populate(StringBuilder sb, BeanUtilsBean bean, ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java index a3541a6ec4b..da99a1326ac 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java @@ -580,6 +580,14 @@ public void setUseGlobalPools(final Boolean useGlobalPools) { mcfProperties.setUseGlobalPools(useGlobalPools); } + public Boolean isCacheDestinations() { + return mcfProperties.isCacheDestinations(); + } + + public void setCacheDestinations(final Boolean cacheDestinations) { + mcfProperties.setCacheDestinations(cacheDestinations); + } + public Integer getScheduledThreadPoolMaxSize() { return mcfProperties.getScheduledThreadPoolMaxSize(); } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index e9b13239f2a..36eabeb85ca 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -634,6 +634,32 @@ public Boolean isFailoverOnInitialConnection() { return raProperties.isFailoverOnInitialConnection(); } + /** + * Set cacheDestinations + * + * @param cacheDestinations The value + */ + public void setCacheDestinations(final Boolean cacheDestinations) { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")"); + } + + raProperties.setCacheDestinations(cacheDestinations); + } + + /** + * Get isCacheDestinations + * + * @return The value + */ + public Boolean isCacheDestinations() { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("isCacheDestinations()"); + } + + return raProperties.isCacheDestinations(); + } + /** * Set compressLargeMessage * @@ -1911,6 +1937,11 @@ private void setParams(final ActiveMQConnectionFactory cf, final ConnectionFacto cf.setFailoverOnInitialConnection(val); } + val = overrideProperties.isCacheDestinations() != null ? overrideProperties.isCacheDestinations() : raProperties.isCacheDestinations(); + if (val != null) { + cf.setCacheDestinations(val); + } + Integer val2 = overrideProperties.getConsumerMaxRate() != null ? overrideProperties.getConsumerMaxRate() : raProperties.getConsumerMaxRate(); if (val2 != null) { cf.setConsumerMaxRate(val2); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java index 442952fb37b..ba0484aa78e 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java @@ -112,6 +112,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions { private Boolean useGlobalPools; + private Boolean cacheDestinations; + private Integer initialMessagePacketSize; private Integer scheduledThreadPoolMaxSize; @@ -612,6 +614,21 @@ public void setUseGlobalPools(final Boolean useGlobalPools) { this.useGlobalPools = useGlobalPools; } + public Boolean isCacheDestinations() { + if (ConnectionFactoryProperties.trace) { + ActiveMQRALogger.LOGGER.trace("isCacheDestinations()"); + } + return cacheDestinations; + } + + public void setCacheDestinations(final Boolean cacheDestinations) { + if (ConnectionFactoryProperties.trace) { + ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")"); + } + hasBeenUpdated = true; + this.cacheDestinations = cacheDestinations; + } + public Integer getScheduledThreadPoolMaxSize() { if (ConnectionFactoryProperties.trace) { ActiveMQRALogger.LOGGER.trace("getScheduledThreadPoolMaxSize()"); diff --git a/docs/user-manual/en/perf-tuning.md b/docs/user-manual/en/perf-tuning.md index f131a151a16..010c19feb47 100644 --- a/docs/user-manual/en/perf-tuning.md +++ b/docs/user-manual/en/perf-tuning.md @@ -150,6 +150,11 @@ tuning: java.lang.String does not require copying before it is written to the wire, so if you re-use `SimpleString` instances between calls then you can avoid some unnecessary copying. + +- If using frameworks like Spring, configure destinations permanently broker side + and enable `destinationCache` on the client side. + See the [Setting The Destination Cache](using-jms.md) + for more information on this. ## Tuning Transport Settings diff --git a/docs/user-manual/en/using-jms.md b/docs/user-manual/en/using-jms.md index d921cafd79a..6646293210d 100644 --- a/docs/user-manual/en/using-jms.md +++ b/docs/user-manual/en/using-jms.md @@ -377,3 +377,14 @@ consumer to send acknowledgements in batches rather than individually saving valuable bandwidth. This can be configured on the connection factory via the `transactionBatchSize` element and is set in bytes. The default is 1024 \* 1024. + +### Setting The Destination Cache + +Many frameworks such as Spring resolve the destination by name on every operation, +this can cause a performance issue and extra calls to the broker, +in a scenario where destinations (addresses) are permanent broker side, +such as they are managed by a platform or operations team. +using `destinationCache` element, you can toggle on the destination cache +to improve the performance and reduce the calls to the broker. +This should not be used if destinations (addresses) are not permanent broker side, +as in dynamic creation/deletion. diff --git a/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml b/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml index db571a33be8..6292ee68298 100644 --- a/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml +++ b/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml @@ -258,6 +258,12 @@ PasswordCodec java.lang.String org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;key=clusterpassword;algorithm=something + + + Cache destinations per session + CacheDestinations + java.lang.Boolean + false --> diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java index cd051fe6c20..e4dd15cab95 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java @@ -253,6 +253,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase { " boolean\n" + " \n" + " \n" + + " " + + " Cache destinations per session" + + " CacheDestinations" + + " boolean" + + " " + + " " + " \n" + " max number of threads for scheduled thread pool\n" + " ScheduledThreadPoolMaxSize\n" +