From dc76e2a6a00964990fa50ec091d96473616aba87 Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 18 Jul 2016 16:34:22 -0500 Subject: [PATCH 1/2] ARTEMIS-640 Allow config of cxn TTL check interval Add connection-ttl-check-interval configuration attribute to allow control of how frequently connection TTL checks are performed. --- .../api/config/ActiveMQDefaultConfiguration.java | 7 +++++++ .../artemis/core/config/Configuration.java | 4 ++++ .../core/config/impl/ConfigurationImpl.java | 16 ++++++++++++++++ .../deployers/impl/FileConfigurationParser.java | 2 ++ .../resources/schema/artemis-configuration.xsd | 8 ++++++++ .../core/config/impl/FileConfigurationTest.java | 1 + .../resources/ConfigurationTest-full-config.xml | 1 + docs/user-manual/en/configuration-index.md | 1 + docs/user-manual/en/connection-ttl.md | 5 +++++ .../client/IncompatibleVersionTest.java | 3 +-- .../integration/client/TemporaryQueueTest.java | 5 ++--- .../tests/integration/remoting/PingTest.java | 3 +-- 12 files changed, 49 insertions(+), 7 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index e8fe1c51ae8..b95e5157edf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -423,6 +423,9 @@ public static String getDefaultHapolicyBackupStrategy() { // Default large messages table name, used with Database storage type private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES"; + // Default period to wait between connection TTL checks + public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; + /** * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. */ @@ -1130,4 +1133,8 @@ public static String getDefaultDriverClassName() { public static String getDefaultLargeMessagesTableName() { return DEFAULT_LARGE_MESSAGES_TABLE_NAME; } + + public static long getDefaultConnectionTtlCheckInterval() { + return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 8eb7f10689c..400709ef1f1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -955,4 +955,8 @@ Configuration addDiscoveryGroupConfiguration(final String key, /** It will return all the connectors in a toString manner for debug purposes. */ String debugConnectors(); + Configuration setConnectionTtlCheckInterval(long connectionTtlCheckInterval); + + long getConnectionTtlCheckInterval(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 6ecdc77e9c2..8f1f6f51177 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -239,6 +239,8 @@ public class ConfigurationImpl implements Configuration, Serializable { protected boolean populateValidatedUser = ActiveMQDefaultConfiguration.isDefaultPopulateValidatedUser(); + private long connectionTtlCheckInterval = ActiveMQDefaultConfiguration.getDefaultConnectionTtlCheckInterval(); + /** * Parent folder for all data folders. */ @@ -1365,6 +1367,17 @@ public ConfigurationImpl setPopulateValidatedUser(boolean populateValidatedUser) return this; } + @Override + public long getConnectionTtlCheckInterval() { + return connectionTtlCheckInterval; + } + + @Override + public ConfigurationImpl setConnectionTtlCheckInterval(long connectionTtlCheckInterval) { + this.connectionTtlCheckInterval = connectionTtlCheckInterval; + return this; + } + @Override public int hashCode() { final int prime = 31; @@ -1440,6 +1453,7 @@ public int hashCode() { result = prime * result + (wildcardRoutingEnabled ? 1231 : 1237); result = prime * result + (resolveProtocols ? 1231 : 1237); result = prime * result + (int) (journalLockAcquisitionTimeout ^ (journalLockAcquisitionTimeout >>> 32)); + result = prime * result + (int) (connectionTtlCheckInterval ^ (connectionTtlCheckInterval >>> 32)); return result; } @@ -1692,6 +1706,8 @@ else if (!securitySettings.equals(other.securitySettings)) return false; if (journalLockAcquisitionTimeout != other.journalLockAcquisitionTimeout) return false; + if (connectionTtlCheckInterval != other.connectionTtlCheckInterval) + return false; return true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index deda1ad7d74..e884c31bb01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -278,6 +278,8 @@ public void parseMainConfig(final Element e, final Configuration config) throws config.setPopulateValidatedUser(getBoolean(e, "populate-validated-user", config.isPopulateValidatedUser())); + config.setConnectionTtlCheckInterval(getLong(e, "connection-ttl-check-interval", config.getConnectionTtlCheckInterval(), Validators.GT_ZERO)); + // parsing cluster password String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 53e5aa89e5e..5ac86a0bdf1 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -238,6 +238,14 @@ + + + + how often (in ms) to check connections for ttl violation + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 3f4edd8140c..27c997fd371 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -103,6 +103,7 @@ public void testDefaults() { Assert.assertEquals(true, conf.isGracefulShutdownEnabled()); Assert.assertEquals(12345, conf.getGracefulShutdownTimeout()); Assert.assertEquals(true, conf.isPopulateValidatedUser()); + Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval()); Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory()); Assert.assertEquals(95, conf.getMemoryWarningThreshold()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 8bd540deaee..3639da4a935 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -51,6 +51,7 @@ 127 true true + 98765 org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1 org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2 diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 57b36e0bef8..bf4461c0d0e 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -35,6 +35,7 @@ Name | Description [cluster-password](clusters.md "Clusters") | Cluster password. It applies to all cluster configurations. [cluster-user](clusters.md "Clusters") | Cluster username. It applies to all cluster configurations. [connection-ttl-override](connection-ttl.md) | if set, this will override how long (in ms) to keep a connection alive without receiving a ping. -1 disables this setting. Default -1 +[connection-ttl-check-period](connection-ttl.md) | how often (in ms) to check connections for ttl violation. Default 2000 [connectors.connector](configuring-transports.md "Understanding Connectors") | The URL for the connector. This is a list [create-bindings-dir](persistence.md "Configuring the bindings journal") | true means that the server will create the bindings directory on start up. Default=true [create-journal-dir](persistence.md) | true means that the journal directory will be created. Default=true diff --git a/docs/user-manual/en/connection-ttl.md b/docs/user-manual/en/connection-ttl.md index c24b5ac874c..f4b6738dba2 100644 --- a/docs/user-manual/en/connection-ttl.md +++ b/docs/user-manual/en/connection-ttl.md @@ -114,6 +114,11 @@ server side. This can be done by specifying the The default value for `connection-ttl-override` is `-1` which means "do not override" (i.e. let clients use their own values). +The logic to check connections for TTL violations runs periodically on +the broker. By default, the checks are done every 2,000 milliseconds. +However, this can be changed if necessary by using the +`connection-ttl-check-interval` attribute. + ## Closing core sessions or JMS connections that you have failed to close As previously discussed, it's important that all core client sessions diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IncompatibleVersionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IncompatibleVersionTest.java index 21905df538e..e2c3ae52786 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IncompatibleVersionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IncompatibleVersionTest.java @@ -37,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; -import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.version.impl.VersionImpl; @@ -162,7 +161,7 @@ private void doTestClientVersionCompatibility(boolean compatible) throws Excepti fail("Invalid Exception type:" + e.getType()); } long start = System.currentTimeMillis(); - while (System.currentTimeMillis() < start + 3 * RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL) { + while (System.currentTimeMillis() < start + 3 * server.getConfiguration().getConnectionTtlCheckInterval()) { if (server.getConnectionCount() == 0) { break; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java index 9503ddb7245..a25805cd157 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java @@ -37,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; import org.apache.activemq.artemis.core.remoting.CloseListener; -import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -475,7 +474,7 @@ public boolean intercept(final Packet packet, final RemotingConnection connectio session = sf.createSession(false, true, true); session.createTemporaryQueue(address, queue); - assertTrue("server has not received any ping from the client", pingOnServerLatch.await(2 * RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS)); + assertTrue("server has not received any ping from the client", pingOnServerLatch.await(2 * server.getConfiguration().getConnectionTtlCheckInterval(), TimeUnit.MILLISECONDS)); assertEquals(1, server.getConnectionCount()); RemotingConnection remotingConnection = server.getRemotingService().getConnections().iterator().next(); @@ -490,7 +489,7 @@ public void connectionClosed() { ((ClientSessionInternal) session).getConnection().fail(new ActiveMQInternalErrorException("simulate a client failure")); // let some time for the server to clean the connections - assertTrue("server has not closed the connection", serverCloseLatch.await(2 * RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL + 2 * TemporaryQueueTest.CONNECTION_TTL, TimeUnit.MILLISECONDS)); + assertTrue("server has not closed the connection", serverCloseLatch.await(2 * server.getConfiguration().getConnectionTtlCheckInterval() + 2 * TemporaryQueueTest.CONNECTION_TTL, TimeUnit.MILLISECONDS)); // The next getCount will be asynchronously done at the end of failure. We will wait some time until it has reached there. for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && server.getConnectionCount() > 0; ) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/PingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/PingTest.java index ed4506f5258..564c6b616fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/PingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/PingTest.java @@ -31,7 +31,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping; import org.apache.activemq.artemis.core.remoting.CloseListener; -import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -397,7 +396,7 @@ public void connectionClosed() { Assert.assertTrue(clientLatch.await(8 * PingTest.CLIENT_FAILURE_CHECK_PERIOD, TimeUnit.MILLISECONDS)); // Server connection will be closed too, when client closes client side connection after failure is detected - Assert.assertTrue(serverLatch.await(2 * RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS)); + Assert.assertTrue(serverLatch.await(2 * server.getConfiguration().getConnectionTtlCheckInterval(), TimeUnit.MILLISECONDS)); long start = System.currentTimeMillis(); while (true) { From 89e0c461e5128768513943e60f6e3992f066f529 Mon Sep 17 00:00:00 2001 From: jbertram Date: Fri, 15 Jul 2016 08:36:24 -0500 Subject: [PATCH 2/2] ARTEMIS-611 refactor STOMP cxn TTL + heart-beat Adds 3 new URI properties for STOMP acceptors to allow finer grained configuration of heart-beat / connection-TTL behavior. --- .../impl/netty/TransportConstants.java | 9 + .../core/protocol/stomp/StompConnection.java | 6 +- .../protocol/stomp/StompProtocolManager.java | 2 +- .../stomp/v11/StompFrameHandlerV11.java | 130 ++++------- .../core/remoting/server/RemotingService.java | 6 + .../server/impl/RemotingServiceImpl.java | 130 +++++++---- .../en/protocols-interoperability.md | 65 ++++-- .../integration/stomp/StompOverHttpTest.java | 4 +- .../stomp/StompOverWebsocketTest.java | 4 +- .../tests/integration/stomp/StompTest.java | 18 ++ .../integration/stomp/StompTestBase.java | 88 +++++-- .../integration/stomp/v11/StompV11Test.java | 214 ++++++++++++++++++ .../stomp/v11/StompV11TestBase.java | 2 +- 13 files changed, 504 insertions(+), 174 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index f373639f931..53dc204d833 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -187,6 +187,12 @@ public class TransportConstants { public static final String CONNECTION_TTL = "connectionTtl"; + public static final String CONNECTION_TTL_MAX = "connectionTtlMax"; + + public static final String CONNECTION_TTL_MIN = "connectionTtlMin"; + + public static final String HEART_BEAT_TO_CONNECTION_TTL_MODIFIER = "heartBeatToConnectionTtlModifier"; + public static final String STOMP_ENABLE_MESSAGE_ID = "stomp-enable-message-id"; public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size"; @@ -230,6 +236,9 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); + allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX); + allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN); + allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER); allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID); allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 1cfd0a5718d..5396c5b03dc 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -269,7 +269,7 @@ public void destroy() { } } - Acceptor getAcceptorUsed() { + public Acceptor getAcceptorUsed() { return acceptorUsed; } @@ -720,4 +720,8 @@ public int getMinLargeMessageSize() { return minLargeMessageSize; } + public StompProtocolManager getManager() { + return manager; + } + } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 7642e69faa6..2c8751c2745 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -55,7 +55,7 @@ /** * StompProtocolManager */ -class StompProtocolManager extends AbstractProtocolManager { +public class StompProtocolManager extends AbstractProtocolManager { // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index 7f284dd016a..0eb1951f794 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -29,7 +29,10 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.utils.CertificateUtil; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -92,7 +95,7 @@ public StompFrame onConnect(StompFrame frame) { response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0"); } else { - response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.getServerHeartBeatValue()); + response.addHeader(Stomp.Headers.Connected.HEART_BEAT, Long.toString(heartBeater.serverPingPeriod) + "," + Long.toString(heartBeater.clientPingResponse)); } } } @@ -231,7 +234,7 @@ public void replySent(StompFrame reply) { } private void startHeartBeat() { - if (heartBeater != null) { + if (heartBeater != null && heartBeater.serverPingPeriod != 0) { heartBeater.start(); } } @@ -242,31 +245,50 @@ public StompFrame createPingFrame() { return frame; } - //server heart beat - //algorithm: - //(a) server ping: if server hasn't sent any frame within serverPing - //interval, send a ping. - //(b) accept ping: if server hasn't received any frame within - // 2*serverAcceptPing, disconnect! + /* + * HeartBeater functions: + * (a) server ping: if server hasn't sent any frame within serverPingPeriod interval, send a ping + * (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread + * can deal with closing connections which go stale + */ private class HeartBeater extends Thread { private static final int MIN_SERVER_PING = 500; - private static final int MIN_CLIENT_PING = 500; - long serverPing = 0; - long serverAcceptPing = 0; + long serverPingPeriod = 0; + long clientPingResponse; volatile boolean shutdown = false; - AtomicLong lastPingTime = new AtomicLong(0); - AtomicLong lastAccepted = new AtomicLong(0); - StompFrame pingFrame; + AtomicLong lastPingTimestamp = new AtomicLong(0); + ConnectionEntry connectionEntry; - private HeartBeater(long clientPing, long clientAcceptPing) { - if (clientPing != 0) { - serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING; + private HeartBeater(final long clientPing, final long clientAcceptPing) { + connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID()); + clientPingResponse = clientPing; + + String ttlMaxStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MAX); + long ttlMax = ttlMaxStr == null ? Long.MAX_VALUE : Long.valueOf(ttlMaxStr); + + String ttlMinStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MIN); + long ttlMin = ttlMinStr == null ? 500 : Long.valueOf(ttlMinStr); + + String heartBeatToTtlModifierStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER); + double heartBeatToTtlModifier = heartBeatToTtlModifierStr == null ? 2 : Double.valueOf(heartBeatToTtlModifierStr); + + // The connection's TTL should be clientPing * 2, MIN_CLIENT_PING, or ttlMax set on the acceptor + long connectionTtl = (long) (clientPing * heartBeatToTtlModifier); + if (connectionTtl < ttlMin) { + connectionTtl = ttlMin; + clientPingResponse = (long) (ttlMin / heartBeatToTtlModifier); } + else if (connectionTtl > ttlMax) { + connectionTtl = ttlMax; + clientPingResponse = (long) (ttlMax / heartBeatToTtlModifier); + } + ActiveMQServerLogger.LOGGER.info("Setting TTL to: " + connectionTtl); + connectionEntry.ttl = connectionTtl; if (clientAcceptPing != 0) { - serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING; + serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING; } } @@ -275,85 +297,32 @@ public synchronized void shutdown() { this.notify(); } - public String getServerHeartBeatValue() { - return String.valueOf(serverPing) + "," + String.valueOf(serverAcceptPing); - } - public void pinged() { - lastPingTime.set(System.currentTimeMillis()); + lastPingTimestamp.set(System.currentTimeMillis()); } @Override public void run() { - lastAccepted.set(System.currentTimeMillis()); - pingFrame = createPingFrame(); - synchronized (this) { while (!shutdown) { - long dur1 = 0; - long dur2 = 0; - - if (serverPing != 0) { - dur1 = System.currentTimeMillis() - lastPingTime.get(); - if (dur1 >= serverPing) { - lastPingTime.set(System.currentTimeMillis()); - connection.ping(pingFrame); - dur1 = 0; - } + long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get(); + if (lastPingPeriod >= serverPingPeriod) { + lastPingTimestamp.set(System.currentTimeMillis()); + connection.ping(createPingFrame()); + lastPingPeriod = 0; } - - if (serverAcceptPing != 0) { - dur2 = System.currentTimeMillis() - lastAccepted.get(); - - if (dur2 > (2 * serverAcceptPing)) { - connection.disconnect(false); - shutdown = true; - break; - } - } - - long waitTime1 = 0; - long waitTime2 = 0; - - if (serverPing > 0) { - waitTime1 = serverPing - dur1; - } - - if (serverAcceptPing > 0) { - waitTime2 = serverAcceptPing * 2 - dur2; - } - - long waitTime = 10L; - - if ((waitTime1 > 0) && (waitTime2 > 0)) { - waitTime = Math.min(waitTime1, waitTime2); - } - else if (waitTime1 > 0) { - waitTime = waitTime1; - } - else if (waitTime2 > 0) { - waitTime = waitTime2; - } - try { - this.wait(waitTime); + this.wait(serverPingPeriod - lastPingPeriod); } catch (InterruptedException e) { } } } } - - public void pingAccepted() { - this.lastAccepted.set(System.currentTimeMillis()); - } } @Override public void requestAccepted(StompFrame request) { - if (heartBeater != null) { - heartBeater.pingAccepted(); - } } @Override @@ -403,10 +372,7 @@ protected boolean parseCommand() throws ActiveMQStompException { // either "[\r]\n"s or "\n"s) while (true) { if (workingBuffer[offset] == NEW_LINE) { - if (heartBeater != null) { - //client ping - heartBeater.pingAccepted(); - } + //client ping nextChar = false; } else if (workingBuffer[offset] == CR) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java index 061e5a67c5f..ea3107c4603 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java @@ -20,6 +20,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -99,4 +100,9 @@ public interface RemotingService { */ Acceptor getAcceptor(String name); + Acceptor createAcceptor(String name, String uri) throws Exception; + + Acceptor createAcceptor(TransportConfiguration transportConfiguration); + + void destroyAcceptor(String name) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 3a073e929f5..1a8e32b915a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; +import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -77,8 +78,6 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class); - public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000; - // Attributes ---------------------------------------------------- private volatile boolean started = false; @@ -119,6 +118,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif private AtomicLong totalConnectionCount = new AtomicLong(0); + private long connectionTtlCheckInterval; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -163,6 +164,8 @@ public RemotingServiceImpl(final ClusterManager clusterManager, if (protocolManagerFactories != null) { loadProtocolManagerFactories(protocolManagerFactories); } + + this.connectionTtlCheckInterval = config.getConnectionTtlCheckInterval(); } private void setInterceptors(Configuration configuration) { @@ -198,67 +201,94 @@ public ThreadFactory run() { threadPool = Executors.newCachedThreadPool(tFactory); for (TransportConfiguration info : acceptorsConfig) { - try { - AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName()); + createAcceptor(info); + } - Map selectedProtocolFactories = new ConcurrentHashMap<>(); + /** + * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid + * race conditions. See {@link #startAcceptors()}. + */ - @SuppressWarnings("deprecation") - String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams()); - if (protocol != null) { - ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol(); - locateProtocols(protocol, info, selectedProtocolFactories); - } + // This thread checks connections that need to be closed, and also flushes confirmations + failureCheckAndFlushThread = new FailureCheckAndFlushThread(connectionTtlCheckInterval); - String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams()); + failureCheckAndFlushThread.start(); - if (protocols != null) { - locateProtocols(protocols, info, selectedProtocolFactories); - } + started = true; + } - ClusterConnection clusterConnection = lookupClusterConnection(info); + @Override + public Acceptor createAcceptor(String name, String uri) throws Exception { + AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser(); - // If empty: we get the default list - if (selectedProtocolFactories.isEmpty()) { - selectedProtocolFactories = protocolMap; - } + List configurations = parser.newObject(parser.expandURI(uri), name); - Map selectedProtocols = new ConcurrentHashMap<>(); - for (Map.Entry entry: selectedProtocolFactories.entrySet()) { - selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors)); - } + return createAcceptor(configurations.get(0)); + } + @Override + public Acceptor createAcceptor(TransportConfiguration info) { + Acceptor acceptor = null; - Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols); + try { + AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName()); - if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { - acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); - } + Map selectedProtocolFactories = new ConcurrentHashMap<>(); - acceptors.put(info.getName(), acceptor); + @SuppressWarnings("deprecation") + String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams()); + if (protocol != null) { + ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol(); + locateProtocols(protocol, info, selectedProtocolFactories); + } - if (managementService != null) { - acceptor.setNotificationService(managementService); + String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams()); - managementService.registerAcceptor(acceptor, info); - } + if (protocols != null) { + locateProtocols(protocols, info, selectedProtocolFactories); } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName()); + + ClusterConnection clusterConnection = lookupClusterConnection(info); + + // If empty: we get the default list + if (selectedProtocolFactories.isEmpty()) { + selectedProtocolFactories = protocolMap; } - } - /** - * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid - * race conditions. See {@link #startAcceptors()}. - */ + Map selectedProtocols = new ConcurrentHashMap<>(); + for (Entry entry: selectedProtocolFactories.entrySet()) { + selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors)); + } - // This thread checks connections that need to be closed, and also flushes confirmations - failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL); - failureCheckAndFlushThread.start(); + acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols); - started = true; + if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { + acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); + } + + acceptors.put(info.getName(), acceptor); + + if (managementService != null) { + acceptor.setNotificationService(managementService); + + managementService.registerAcceptor(acceptor, info); + } + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName()); + } + + return acceptor; + } + + @Override + public void destroyAcceptor(String name) throws Exception { + Acceptor acceptor = acceptors.get(name); + if (acceptor != null) { + acceptor.stop(); + acceptors.remove(name); + } } @Override @@ -423,6 +453,17 @@ private RemotingConnection getConnection(final Object remotingConnectionID) { } } + public ConnectionEntry getConnectionEntry(final Object remotingConnectionID) { + ConnectionEntry entry = connections.get(remotingConnectionID); + + if (entry != null) { + return entry; + } + else { + return null; + } + } + @Override public RemotingConnection removeConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.remove(remotingConnectionID); @@ -647,6 +688,7 @@ public void close(final boolean criticalError) { @Override public void run() { while (!closed) { + ActiveMQServerLogger.LOGGER.info("Checking..."); try { long now = System.currentTimeMillis(); diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index 5c19311aab7..5b486d0f11b 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -256,15 +256,6 @@ set). Apache ActiveMQ Artemis currently doesn't support virtual hosting, which means the 'host' header in CONNECT fram will be ignored. -#### Heart-beating - -Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat -intervals. The minimum interval for both client and server heartbeats is -500 milliseconds. That means if a client sends a CONNECT frame with -heartbeat values lower than 500, the server will defaults the value to -500 milliseconds regardless the values of the 'heart-beat' header in the -frame. - ### Mapping Stomp destinations to Apache ActiveMQ Artemis addresses and queues Stomp clients deals with *destinations* when sending messages and @@ -278,7 +269,14 @@ specified destination is mapped to an address. When a Stomp client subscribes (or unsubscribes) for a destination (using a `SUBSCRIBE` or `UNSUBSCRIBE` frame), the destination is mapped to an Apache ActiveMQ Artemis queue. -### STOMP and connection-ttl +### STOMP heart-beating and connection-ttl + +Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat +intervals. The minimum interval for both client and server heartbeats is +500 milliseconds. That means if a client sends a CONNECT frame with +heartbeat values lower than 500, the server will defaults the value to +500 milliseconds regardless the values of the 'heart-beat' header in the +frame. Well behaved STOMP clients will always send a DISCONNECT frame before closing their connections. In this case the server will clear up any @@ -288,19 +286,50 @@ they crash the server will have no way of knowing immediately whether the client is still alive or not. STOMP connections therefore default to a connection-ttl value of 1 minute (see chapter on [connection-ttl](#connection-ttl) for more information. This value can -be overridden using connection-ttl-override. - -If you need a specific connectionTtl for your stomp connections without -affecting the connectionTtlOverride setting, you can configure your -stomp acceptor with the "connectionTtl" property, which is used to set -the ttl for connections that are created from that acceptor. For -example: +be overridden using the `connection-ttl-override` property or if you +need a specific connectionTtl for your stomp connections without +affecting the broker-wide `connection-ttl-override` setting, you can +configure your stomp acceptor with the "connectionTtl" property, which +is used to set the ttl for connections that are created from that acceptor. +For example: tcp://localhost:61613?protocols=STOMP;connectionTtl=20000 The above configuration will make sure that any stomp connection that is created from that acceptor will have its connection-ttl set to 20 -seconds. +seconds. The `connectionTtl` set on an acceptor will take precedence over +`connection-ttl-override`. + +Since Stomp 1.0 doesn't support heart-beating then all connections from +Stomp 1.0 clients will have a connection TTL imposed upon them by the broker +based on the aforementioned configuration options. Likewise, any Stomp 1.1 +or 1.2 clients that don't specify a heart-beat or disable heart-beating +(e.g. by sending `0,0` in the `heart-beat` header) will have a connection +TTL imposed upon them by the broker. + +For Stomp 1.1 and 1.2 clients which send a valid `heart-beat` header then +their connection TTL will be set accordingly. However, the broker will not +set the connection TTL to the same value as the specified in the `heart-beat` +since even small network delays could then cause spurious disconnects. Instead, +the value in the heart-beat will be multiplied by the `heartBeatConnectionTtlModifer` +specified on the acceptor. The `heartBeatConnectionTtlModifer` is a decimal +value that defaults to 2.0 so for example, if a client sends a `heart-beat` +frame of `1000,0` the the connection TTL will be set to `2000` so that the +ping frames sent every 1000 milliseconds will have a sufficient cushion so as +not to be considered late and trigger a disconnect. + +The minimum and maximum connection TTL allowed can also be specified on the +acceptor via the `connectionTtlMin` and `connectionTtlMax` properties respectively. +The default `connectionTtlMin` is 500 and the default `connectionTtlMax` is Java's +`Long.MAX_VALUE` meaning there essentially is no max connection TTL by default. +Keep in mind that the `heartBeatConnectionTtlModifer` is relevant here. For +example, if a client sends a `heart-beat` header of `20000,0` and the acceptor +is using a `connectionTtlMax` of `30000` and a default `heartBeatConnectionTtlModifer` +of `2.0` then the connection TTL would be `40000` (i.e. `20000` * `2.0`) which would +exceed the `connectionTtlMax`. In this case the server would respond to the client +with a `heart-beat` header of `0,15000` (i.e. `30000` / `2.0`). As described +previously, this is to make sure there is a sufficient cushion for the client +heart-beats. The same kind of calculation is done for `connectionTtlMin`. > **Note** > diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java index 8e588f9aab7..783f35f6fdf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java @@ -37,13 +37,13 @@ public class StompOverHttpTest extends StompTest { @Override - protected void addChannelHandlers(SocketChannel ch) { + protected void addChannelHandlers(int index, SocketChannel ch) { ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new HttpResponseDecoder()); ch.pipeline().addLast(new HttpHandler()); ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler()); + ch.pipeline().addLast(new StompClientHandler(index)); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java index 39cd4f737bf..fa8c048a1d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java @@ -63,12 +63,12 @@ public StompOverWebsocketTest(Boolean useBinaryFrames) { } @Override - protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException { + protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException { ch.pipeline().addLast("http-codec", new HttpClientCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebsocketHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:8080/websocket"), WebSocketVersion.V13, null, false, null))); ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler()); + ch.pipeline().addLast(new StompClientHandler(index)); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 81558987e9b..f28f5b2fda1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -51,6 +51,24 @@ public class StompTest extends StompTestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + @Test + public void testConnectionTTL() throws Exception { + int index = 1; + int port = 61614; + + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start(); + createBootstrap(index, port); + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(index, frame); + frame = receiveFrame(index, 10000); + + Assert.assertTrue(frame.startsWith("CONNECTED")); + + Thread.sleep(5000); + + assertChannelClosed(index); + } + @Test public void testSendManyMessages() throws Exception { MessageConsumer consumer = session.createConsumer(queue); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index 9baf1232a74..7f73e488117 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -29,8 +29,10 @@ import java.net.Socket; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -97,13 +99,15 @@ public abstract class StompTestBase extends ActiveMQTestBase { protected boolean autoCreateServer = true; - private Bootstrap bootstrap; + private List bootstraps = new ArrayList<>(); - private Channel channel; +// private Channel channel; - private BlockingQueue priorityQueue; + private List> priorityQueues = new ArrayList<>(); - private EventLoopGroup group; + private List groups = new ArrayList<>(); + + private List channels = new ArrayList<>(); // Implementation methods // ------------------------------------------------------------------------- @@ -111,7 +115,6 @@ public abstract class StompTestBase extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - priorityQueue = new ArrayBlockingQueue<>(1000); if (autoCreateServer) { server = createServer(); addServer(server.getActiveMQServer()); @@ -133,18 +136,27 @@ public void setUp() throws Exception { } private void createBootstrap() { - group = new NioEventLoopGroup(); - bootstrap = new Bootstrap(); - bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() { + createBootstrap(0, port); + } + + protected void createBootstrap(int port) { + createBootstrap(0, port); + } + + protected void createBootstrap(final int index, int port) { + priorityQueues.add(index, new ArrayBlockingQueue(1000)); + groups.add(index, new NioEventLoopGroup()); + bootstraps.add(index, new Bootstrap()); + bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - addChannelHandlers(ch); + addChannelHandlers(index, ch); } }); // Start the client. try { - channel = bootstrap.connect("localhost", port).sync().channel(); + channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel()); handshake(); } catch (InterruptedException e) { @@ -156,10 +168,10 @@ public void initChannel(SocketChannel ch) throws Exception { protected void handshake() throws InterruptedException { } - protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException { + protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException { ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler()); + ch.pipeline().addLast(new StompClientHandler(index)); } protected void setUpAfterServer() throws Exception { @@ -224,9 +236,13 @@ public void tearDown() throws Exception { if (autoCreateServer) { connection.close(); - if (group != null) { - channel.close(); - group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); + for (EventLoopGroup group : groups) { + if (group != null) { + for (Channel channel : channels) { + channel.close(); + } + group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); + } } } super.tearDown(); @@ -234,8 +250,8 @@ public void tearDown() throws Exception { protected void cleanUp() throws Exception { connection.close(); - if (group != null) { - group.shutdown(); + if (groups.get(0) != null) { + groups.get(0).shutdown(); } } @@ -244,7 +260,7 @@ protected void reconnect() throws Exception { } protected void reconnect(long sleep) throws Exception { - group.shutdown(); + groups.get(0).shutdown(); if (sleep > 0) { Thread.sleep(sleep); @@ -278,22 +294,38 @@ protected String getTopicPrefix() { } protected void assertChannelClosed() throws InterruptedException { - boolean closed = channel.closeFuture().await(5000); + assertChannelClosed(0); + } + + protected void assertChannelClosed(int index) throws InterruptedException { + boolean closed = channels.get(index).closeFuture().await(5000); assertTrue("channel not closed", closed); } public void sendFrame(String data) throws Exception { - channel.writeAndFlush(data); + sendFrame(0, data); + } + + public void sendFrame(int index, String data) throws Exception { + channels.get(index).writeAndFlush(data); } public void sendFrame(byte[] data) throws Exception { + sendFrame(0, data); + } + + public void sendFrame(int index, byte[] data) throws Exception { ByteBuf buffer = Unpooled.buffer(data.length); buffer.writeBytes(data); - channel.writeAndFlush(buffer); + channels.get(index).writeAndFlush(buffer); } public String receiveFrame(long timeOut) throws Exception { - String msg = priorityQueue.poll(timeOut, TimeUnit.MILLISECONDS); + return receiveFrame(0, timeOut); + } + + public String receiveFrame(int index, long timeOut) throws Exception { + String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS); return msg; } @@ -344,6 +376,11 @@ public boolean isSecurityEnabled() { } class StompClientHandler extends SimpleChannelInboundHandler { + int index = 0; + + StompClientHandler(int index) { + this.index = index; + } StringBuffer currentMessage = new StringBuffer(""); @@ -356,7 +393,12 @@ protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Except String actualMessage = fullMessage.substring(0, messageEnd); fullMessage = fullMessage.substring(messageEnd + 2); currentMessage = new StringBuffer(""); - priorityQueue.add(actualMessage); + BlockingQueue queue = priorityQueues.get(index); + if (queue == null) { + queue = new ArrayBlockingQueue(1000); + priorityQueues.add(index, queue); + } + queue.add(actualMessage); if (fullMessage.length() > 0) { channelRead(ctx, fullMessage); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 8e6fbb4891c..dfcd1b9d9ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -740,6 +740,220 @@ public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception { } } + @Test + public void testHeartBeatToTTL() throws Exception { + ClientStompFrame frame; + ClientStompFrame reply; + int port = 61614; + + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000&connectionTtlMin=5000&connectionTtlMax=10000").start(); + StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + + //no heart beat at all if heat-beat absent + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + Thread.sleep(3000); + + assertEquals(0, connection.getFrameQueueSize()); + + try { + connection.disconnect(); + fail("Channel should be closed here already due to TTL"); + } + catch (Exception e) { + // ignore + } + + //no heart beat for (0,0) + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "0,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + IntegrationTestLogger.LOGGER.info("Reply: " + reply); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,0", reply.getHeader("heart-beat")); + + Thread.sleep(3000); + + assertEquals(0, connection.getFrameQueueSize()); + + try { + connection.disconnect(); + fail("Channel should be closed here already due to TTL"); + } + catch (Exception e) { + // ignore + } + + //heart-beat (1,0), should receive a min client ping accepted by server + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "1,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,2500", reply.getHeader("heart-beat")); + + Thread.sleep(7000); + + //now server side should be disconnected because we didn't send ping for 2 sec + frame = connection.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("content-type", "text/plain"); + frame.setBody("Hello World"); + + //send will fail + try { + connection.sendFrame(frame); + fail("connection should have been destroyed by now"); + } + catch (IOException e) { + //ignore + } + + //heart-beat (1,0), start a ping, then send a message, should be ok. + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "1,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,2500", reply.getHeader("heart-beat")); + + System.out.println("========== start pinger!"); + + connection.startPinger(2500); + + Thread.sleep(7000); + + //now server side should be disconnected because we didn't send ping for 2 sec + frame = connection.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("content-type", "text/plain"); + frame.setBody("Hello World"); + + //send will be ok + connection.sendFrame(frame); + + connection.stopPinger(); + + connection.disconnect(); + + //heart-beat (20000,0), should receive a max client ping accepted by server + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "20000,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,5000", reply.getHeader("heart-beat")); + + Thread.sleep(12000); + + //now server side should be disconnected because we didn't send ping for 2 sec + frame = connection.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("content-type", "text/plain"); + frame.setBody("Hello World"); + + //send will fail + try { + connection.sendFrame(frame); + fail("connection should have been destroyed by now"); + } + catch (IOException e) { + //ignore + } + } + + @Test + public void testHeartBeatToConnectionTTLModifier() throws Exception { + ClientStompFrame frame; + ClientStompFrame reply; + StompClientConnection connection; + int port = 61614; + + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start(); + + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "5000,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,5000", reply.getHeader("heart-beat")); + + Thread.sleep(6000); + + try { + connection.disconnect(); + fail("Connection should be closed here already due to TTL"); + } + catch (Exception e) { + // ignore + } + + server.getActiveMQServer().getRemotingService().destroyAcceptor("test"); + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start(); + + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "5000,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,5000", reply.getHeader("heart-beat")); + + Thread.sleep(6000); + + connection.disconnect(); + } + @Test public void testNack() throws Exception { connV11.connect(defUser, defPass); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java index d37d90b615e..f7374557692 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java @@ -103,7 +103,7 @@ protected JMSServerManager createServer() throws Exception { params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); - Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())).setConnectionTtlCheckInterval(500); ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));