From 354409952ed7cd3c9d7883e21389e75ed9a022b5 Mon Sep 17 00:00:00 2001 From: JiriOndrusek Date: Wed, 22 Nov 2017 15:38:03 +0100 Subject: [PATCH] [ARTEMIS-1527] - [Artemis Testsuite] ActiveMQMessageHandlerTest#testServerShutdownAndReconnect fails --- .../core/client/impl/ClientConsumerImpl.java | 33 ++++++- .../client/impl/ClientSessionFactoryImpl.java | 94 +++++++++++-------- .../core/client/impl/ClientSessionImpl.java | 10 +- .../core/impl/ActiveMQSessionContext.java | 6 +- .../spi/core/remoting/SessionContext.java | 4 +- .../client/HornetQClientSessionContext.java | 6 +- 6 files changed, 104 insertions(+), 49 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index f937c405e5c..865be332985 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -131,6 +132,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { private final ClassLoader contextClassLoader; + private final ScheduledExecutorService scheduledExecutorService; + // Constructors // --------------------------------------------------------------------------------- @@ -146,7 +149,8 @@ public ClientConsumerImpl(final ClientSessionInternal session, final Executor flowControlExecutor, final SessionContext sessionContext, final ClientSession.QueueQuery queueInfo, - final ClassLoader contextClassLoader) { + final ClassLoader contextClassLoader, + final ScheduledExecutorService scheduledExecutorService) { this.consumerContext = consumerContext; this.queueName = queueName; @@ -173,6 +177,8 @@ public ClientConsumerImpl(final ClientSessionInternal session, this.flowControlExecutor = flowControlExecutor; + this.scheduledExecutorService = scheduledExecutorService; + if (logger.isTraceEnabled()) { logger.trace(this + ":: being created at", new Exception("trace")); } @@ -930,10 +936,29 @@ private void waitForOnMessageToComplete(boolean waitForOnMessage) { sessionExecutor.execute(future); - boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS); + //wait for result using scheduled executor + scheduledExecutorService.schedule(new AwaitFutureTask(future), ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); + } + + /** + * Waiting for future result has to be scheduled to allow other tasks to be executed during waiting. + * (original code - future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS)) + */ + private class AwaitFutureTask implements Runnable { + private FutureLatch futureLatch; + + public AwaitFutureTask(FutureLatch futureLatch) { + this.futureLatch = futureLatch; + } + + @Override + public void run() { + //dont wait for a long tome, wait period is frced by scheduling tasks into future + boolean ok = futureLatch.await(1); - if (!ok) { - ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(); + if (!ok) { + ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(); + } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index addbbbde733..e6486ddc4f6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -669,7 +669,7 @@ private ClientSession createSessionInternal(final String username, SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge); - ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); + ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), scheduledThreadPool); synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) { @@ -783,57 +783,75 @@ private void getConnectionWithRetry(final int reconnectAttempts) { retryIntervalMultiplier, new Exception("trace")); } - long interval = retryInterval; + //first attempt of rery connection is executed without timeout, so it is not scheduled + new ScheduledRetryConnection( 0, retryInterval).run(); + } - int count = 0; + /** + * Each attempt of retry connection has to be scheduled to allow other threads to be executed while retry connection is still runnning. + */ + private class ScheduledRetryConnection implements Runnable { + private int count; + private long interval; - while (clientProtocolManager.isAlive()) { - if (logger.isDebugEnabled()) { - logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts); - } + public ScheduledRetryConnection(int count, long interval) { + this.count = count; + this.interval = interval; + } + + @Override + public void run() { - if (getConnection() != null) { + if (clientProtocolManager.isAlive()) { if (logger.isDebugEnabled()) { - logger.debug("Reconnection successful"); + logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts); } - return; - } else { - // Failed to get connection - - if (reconnectAttempts != 0) { - count++; - if (reconnectAttempts != -1 && count == reconnectAttempts) { - if (reconnectAttempts != 1) { - ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); + if (getConnection() != null) { + if (logger.isDebugEnabled()) { + logger.debug("Reconnection successful"); + } + return; + } else { + // Failed to get connection + if (reconnectAttempts != 0) { + count++; + if (reconnectAttempts != -1 && count == reconnectAttempts) { + if (reconnectAttempts != 1) { + ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); + } + return; } - return; - } + try { + //wait interval is changed to 1, real waiting is achieved by scheduling thread with interval "interval" + if (clientProtocolManager.waitOnLatch(1)) { + return; + } + } catch (InterruptedException ignore) { + throw new ActiveMQInterruptedException(createTrace); + } - if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { - ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier); - } + // Exponential back-off + long newInterval = (long) (interval * retryIntervalMultiplier); - try { - if (clientProtocolManager.waitOnLatch(interval)) { - return; + if (newInterval > maxRetryInterval) { + newInterval = maxRetryInterval; } - } catch (InterruptedException ignore) { - throw new ActiveMQInterruptedException(createTrace); - } - // Exponential back-off - long newInterval = (long) (interval * retryIntervalMultiplier); + interval = newInterval; - if (newInterval > maxRetryInterval) { - newInterval = maxRetryInterval; - } + if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { + ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier); + } - interval = newInterval; - } else { - logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); - return; + ScheduledRetryConnection command = new ScheduledRetryConnection(count, interval); + //schedule next check + scheduledThreadPool.schedule(command, interval, TimeUnit.MILLISECONDS); + } else { + logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); + return; + } } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index d5a867da224..0a2d4d0ae0e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -74,6 +75,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // to be sent to consumers as consumers will need a separate consumer for flow control private final Executor flowControlExecutor; + private final ScheduledExecutorService scheduledExecutorService; + /** * All access to producers are guarded (i.e. synchronized) on itself. */ @@ -172,7 +175,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final SessionContext sessionContext, final Executor executor, final Executor flowControlExecutor, - final Executor closeExecutor) throws ActiveMQException { + final Executor closeExecutor, + final ScheduledExecutorService scheduledExecutorService) throws ActiveMQException { this.sessionFactory = sessionFactory; this.name = name; @@ -230,6 +234,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning(); this.closeExecutor = closeExecutor; + + this.scheduledExecutorService = scheduledExecutorService; } // ClientSession implementation @@ -1508,7 +1514,7 @@ private ClientConsumer internalCreateConsumer(final SimpleString queueName, final boolean browseOnly) throws ActiveMQException { checkClosed(); - ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor); + ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor, scheduledExecutorService); addConsumer(consumer); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index d2bcc963cf3..e584cc46380 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -269,7 +270,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, int ackBatchSize, boolean browseOnly, Executor executor, - Executor flowControlExecutor) throws ActiveMQException { + Executor flowControlExecutor, + ScheduledExecutorService scheduledExecutorService) throws ActiveMQException { long consumerID = idGenerator.generateID(); ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); @@ -282,7 +284,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, // could be overridden on the queue settings // The value we send is just a hint - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), scheduledExecutorService); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 1f15cc68933..64d485982b8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -20,6 +20,7 @@ import javax.transaction.xa.Xid; import java.util.HashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -233,7 +234,8 @@ public abstract ClientConsumerInternal createConsumer(SimpleString queueName, int ackBatchSize, boolean browseOnly, Executor executor, - Executor flowControlExecutor) throws ActiveMQException; + Executor flowControlExecutor, + ScheduledExecutorService scheduledThreadPool) throws ActiveMQException; /** * Performs a round trip to the server requesting what is the current tx timeout on the session diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index caa94a10f6e..343629d0270 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.hornetq.client; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; @@ -82,7 +83,8 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, int ackBatchSize, boolean browseOnly, Executor executor, - Executor flowControlExecutor) throws ActiveMQException { + Executor flowControlExecutor, + ScheduledExecutorService scheduledExecutorService) throws ActiveMQException { long consumerID = idGenerator.generateID(); ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); @@ -95,7 +97,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName, // could be overridden on the queue settings // The value we send is just a hint - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), scheduledExecutorService); } }