From af1f79bff503ee02ac119efceac65928f671fd1e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 14 Dec 2015 20:11:53 -0500 Subject: [PATCH] ARTEMIS-302 more changes around XA reliability (resilience on failures) --- .../core/client/impl/ClientSessionImpl.java | 14 +- .../activemq/artemis/ra/ActiveMQRALogger.java | 10 +- .../ra/ActiveMQRAManagedConnection.java | 2 +- .../artemis/ra/ActiveMQRAXAResource.java | 21 +- .../artemis/ra/ActiveMQResourceAdapter.java | 188 ++--- .../artemis/ra/inflow/ActiveMQActivation.java | 76 +- .../ra/inflow/ActiveMQMessageHandler.java | 3 +- .../impl/journal/JournalStorageManager.java | 3 +- .../core/server/cluster/impl/BridgeImpl.java | 23 + .../core/server/impl/ServerConsumerImpl.java | 3 +- .../core/server/impl/ServerSessionImpl.java | 9 +- .../transaction/impl/TransactionImpl.java | 148 ++-- .../transaction/impl/TransactionImplTest.java | 673 ++++++++++++++++++ .../byteman/ClusteredBridgeReconnectTest.java | 228 ++++++ .../byteman/ConcurrentDeliveryCancelTest.java | 92 +-- .../tests/extras/byteman/TimeoutXATest.java | 12 +- ...BMultipleHandlersServerDisconnectTest.java | 116 ++- .../cluster/bridge/BridgeReconnectTest.java | 4 + .../failover/AsynchronousFailoverTest.java | 8 +- .../integration/ra/ResourceAdapterTest.java | 48 +- .../integration/xa/BasicXaRecoveryTest.java | 3 + .../tests/unit/ra/ResourceAdapterTest.java | 24 +- 22 files changed, 1371 insertions(+), 337 deletions(-) create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java create mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index dc8968006a0..221413c3bf5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -539,8 +539,13 @@ public void rollback() throws ActiveMQException { rollback(false); } - @Override - public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException { + public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException + { + rollback(isLastMessageAsDelivered, true); + } + + public void rollback(final boolean isLastMessageAsDelivered, final boolean waitConsumers) throws ActiveMQException + { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")"); } @@ -559,7 +564,7 @@ public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQExce // We need to make sure we don't get any inflight messages for (ClientConsumerInternal consumer : cloneConsumers()) { - consumer.clear(true); + consumer.clear(waitConsumers); } // Acks must be flushed here *after connection is stopped and all onmessages finished executing @@ -1173,7 +1178,7 @@ public void end(final Xid xid, final int flags) throws XAException { try { if (rollbackOnly) { try { - rollback(); + rollback(false, false); } catch (Throwable ignored) { ActiveMQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored); @@ -1252,6 +1257,7 @@ public boolean setTransactionTimeout(final int seconds) throws XAException { return sessionContext.configureTransactionTimeout(seconds); } catch (Throwable t) { + markRollbackOnly(); // The TM will ignore any errors from here, if things are this screwed up we mark rollbackonly // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java index 6ec3a5e7def..7853f094b94 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java @@ -74,20 +74,20 @@ public interface ActiveMQRALogger extends BasicLogger { void awaitingJMSServerCreation(); @LogMessage(level = Logger.Level.INFO) - @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT) - void rebalancingConnections(); + @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections on even {0}.", format = Message.Format.MESSAGE_FORMAT) + void rebalancingConnections(String event); @LogMessage(level = Logger.Level.WARN) @Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT) - void problemResettingXASession(); + void problemResettingXASession(@Cause Throwable t); @LogMessage(level = Logger.Level.WARN) @Message(id = 152002, value = "Unable to roll local transaction back", format = Message.Format.MESSAGE_FORMAT) void unableToRollbackTX(); @LogMessage(level = Logger.Level.WARN) - @Message(id = 152003, value = "unable to reset session after failure", format = Message.Format.MESSAGE_FORMAT) - void unableToResetSession(); + @Message(id = 152003, value = "unable to reset session after failure, we will place the MDB Inflow now in setup mode for activation={0}" , format = Message.Format.MESSAGE_FORMAT) + void unableToResetSession(String spec, @Cause Exception e); @LogMessage(level = Logger.Level.WARN) @Message(id = 152004, value = "Handling JMS exception failure", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java index 3cd1515593d..97c603264f9 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java @@ -811,7 +811,7 @@ else if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION) { private void createCF() { if (connectionFactory == null) { - connectionFactory = ra.createActiveMQConnectionFactory(mcf.getProperties()); + connectionFactory = ra.getConnectionFactory(mcf.getProperties()); } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java index 093ee0f21b1..9d485f31f65 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java @@ -21,8 +21,8 @@ import javax.transaction.xa.Xid; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ActiveMQXAResource; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; /** * ActiveMQXAResource. @@ -76,13 +76,18 @@ public void start(final Xid xid, final int flags) throws XAException { ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource; try { - //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this - sessionInternal.resetIfNeeded(); - } - catch (ActiveMQException e) { - ActiveMQRALogger.LOGGER.problemResettingXASession(); - } - try { + try { + //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this + sessionInternal.resetIfNeeded(); + } + catch (ActiveMQException e) { + ActiveMQRALogger.LOGGER.problemResettingXASession(e); + + XAException xaException = new XAException(XAException.XAER_RMFAIL); + xaException.initCause(e); + throw xaException; + } + xaResource.start(xid, flags); } finally { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index f9781c19561..8c1a9b424ef 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -29,11 +29,12 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Hashtable; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; @@ -141,7 +142,7 @@ public ActiveMQResourceAdapter() { raProperties = new ActiveMQRAProperties(); configured = new AtomicBoolean(false); - activations = new ConcurrentHashMap<>(); + activations = Collections.synchronizedMap(new IdentityHashMap()); recoveryManager = new RecoveryManager(); } @@ -1570,7 +1571,7 @@ protected ActiveMQRAProperties getProperties() { */ protected void setup() throws ActiveMQException { raProperties.init(); - defaultActiveMQConnectionFactory = createActiveMQConnectionFactory(raProperties); + defaultActiveMQConnectionFactory = newConnectionFactory(raProperties); recoveryActiveMQConnectionFactory = createRecoveryActiveMQConnectionFactory(raProperties); Map recoveryConfProps = new HashMap<>(); @@ -1623,126 +1624,133 @@ public void setJgroupsChannelRefName(String jgroupsChannelRefName) { raProperties.setJgroupsChannelRefName(jgroupsChannelRefName); } - public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { + public synchronized ActiveMQConnectionFactory getConnectionFactory(final ConnectionFactoryProperties overrideProperties) { ActiveMQConnectionFactory cf; boolean known = false; if (!knownConnectionFactories.keySet().contains(overrideProperties)) { - List connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); + cf = newConnectionFactory(overrideProperties); + knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + } + else { + Pair pair = knownConnectionFactories.get(overrideProperties); + cf = pair.getA(); + pair.getB().incrementAndGet(); + known = true; + } - String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); + if (known && cf.getServerLocator().isClosed()) { + knownConnectionFactories.remove(overrideProperties); + cf = newConnectionFactory(overrideProperties); + knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + } - Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); + return cf; + } - String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); + public ActiveMQConnectionFactory newConnectionFactory(ConnectionFactoryProperties overrideProperties) { + ActiveMQConnectionFactory cf; + List connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); - String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); + String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); - String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); - if (ha == null) { - ha = ActiveMQClient.DEFAULT_IS_HA; - } + String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); - if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { - BroadcastEndpointFactory endpointFactory = null; + String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); - if (jgroupsLocatorClassName != null) { - String jchannelRefName = raProperties.getJgroupsChannelRefName(); - JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); - endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); - } - else if (discoveryAddress != null) { - Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); - if (discoveryPort == null) { - discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; - } - - String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); - } - else if (jgroupsFileName != null) { - endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); - } - Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); - if (refreshTimeout == null) { - refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; - } + String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + + if (ha == null) { + ha = ActiveMQClient.DEFAULT_IS_HA; + } - Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); + if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { + BroadcastEndpointFactory endpointFactory = null; - if (initialTimeout == null) { - initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; + if (jgroupsLocatorClassName != null) { + String jchannelRefName = raProperties.getJgroupsChannelRefName(); + JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); + endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); + } + else if (discoveryAddress != null) { + Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); + if (discoveryPort == null) { + discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; } - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); + String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); + endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); + } + else if (jgroupsFileName != null) { + endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); + } + Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); + if (refreshTimeout == null) { + refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; + } - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); - } + Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); - } + if (initialTimeout == null) { + initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; } - else if (connectorClassName != null) { - TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; - List> connectionParams; - if (overrideProperties.getParsedConnectorClassNames() != null) { - connectionParams = overrideProperties.getParsedConnectionParameters(); - } - else { - connectionParams = raProperties.getParsedConnectionParameters(); - } + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); - for (int i = 0; i < connectorClassName.size(); i++) { - TransportConfiguration tc; - if (connectionParams == null || i >= connectionParams.size()) { - tc = new TransportConfiguration(connectorClassName.get(i)); - ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); - } - else { - tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); - } - - transportConfigurations[i] = tc; - } + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); + } - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + - Arrays.toString(transportConfigurations) + " with ha=" + ha); - } + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); + } + } + else if (connectorClassName != null) { + TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; + + List> connectionParams; + if (overrideProperties.getParsedConnectorClassNames() != null) { + connectionParams = overrideProperties.getParsedConnectionParameters(); + } + else { + connectionParams = raProperties.getParsedConnectionParameters(); + } - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + for (int i = 0; i < connectorClassName.size(); i++) { + TransportConfiguration tc; + if (connectionParams == null || i >= connectionParams.size()) { + tc = new TransportConfiguration(connectorClassName.get(i)); + ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); } else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); } + + transportConfigurations[i] = tc; } - else { - throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); + + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + + Arrays.toString(transportConfigurations) + " with ha=" + ha); } - setParams(cf, overrideProperties); - knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + } } else { - Pair pair = knownConnectionFactories.get(overrideProperties); - cf = pair.getA(); - pair.getB().incrementAndGet(); - known = true; - } - - if (known && cf.getServerLocator().isClosed()) { - knownConnectionFactories.remove(overrideProperties); - cf = createActiveMQConnectionFactory(overrideProperties); + throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); } + setParams(cf, overrideProperties); return cf; } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 79b9cb91b29..d7a242d8aa7 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -48,7 +48,6 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.ra.ActiveMQRABundle; @@ -420,12 +419,19 @@ public void run() { // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up } - if (threadTearDown.isAlive()) { - if (factory != null) { - // This will interrupt any threads waiting on reconnect + if (factory != null) { + try { + // closing the factory will help making sure pending threads are closed factory.close(); - factory = null; } + catch (Throwable e) { + ActiveMQRALogger.LOGGER.warn(e); + } + + factory = null; + } + + if (threadTearDown.isAlive()) { threadTearDown.interrupt(); try { @@ -440,11 +446,6 @@ public void run() { } } - if (spec.isHasBeenUpdated() && factory != null) { - ra.closeConnectionFactory(spec); - factory = null; - } - nodes.clear(); lastReceived = false; @@ -465,23 +466,11 @@ protected void setupCF() throws Exception { factory = (ActiveMQConnectionFactory) fac; } else { - ActiveMQRAConnectionFactory raFact = (ActiveMQRAConnectionFactory) fac; - if (spec.isHasBeenUpdated()) { - factory = raFact.getResourceAdapter().createActiveMQConnectionFactory(spec); - } - else { - factory = raFact.getDefaultFactory(); - if (factory != ra.getDefaultActiveMQConnectionFactory()) { - ActiveMQRALogger.LOGGER.warnDifferentConnectionfactory(); - } - } + factory = ra.newConnectionFactory(spec); } } - else if (spec.isHasBeenUpdated()) { - factory = ra.createActiveMQConnectionFactory(spec); - } else { - factory = ra.getDefaultActiveMQConnectionFactory(); + factory = ra.newConnectionFactory(spec); } } @@ -627,9 +616,18 @@ public String toString() { return buffer.toString(); } - public void rebalance() { - ActiveMQRALogger.LOGGER.rebalancingConnections(); - reconnect(null); + public void startReconnectThread(final String threadName) { + if (trace) { + ActiveMQRALogger.LOGGER.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this); + } + Runnable runnable = new Runnable() { + @Override + public void run() { + reconnect(null); + } + }; + Thread t = new Thread(runnable, threadName); + t.start(); } /** @@ -638,6 +636,9 @@ public void rebalance() { * @param failure if reconnecting in the event of a failure */ public void reconnect(Throwable failure) { + if (trace) { + ActiveMQRALogger.LOGGER.trace("reconnecting activation " + this); + } if (failure != null) { if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) { ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); @@ -728,6 +729,7 @@ public void release() { } private class RebalancingListener implements ClusterTopologyListener { + @Override public void nodeUP(TopologyMember member, boolean last) { boolean newNode = false; @@ -741,14 +743,8 @@ public void nodeUP(TopologyMember member, boolean last) { } if (lastReceived && newNode) { - Runnable runnable = new Runnable() { - @Override - public void run() { - rebalance(); - } - }; - Thread t = new Thread(runnable, "NodeUP Connection Rebalancer"); - t.start(); + ActiveMQRALogger.LOGGER.rebalancingConnections("nodeUp " + member.toString()); + startReconnectThread("NodeUP Connection Rebalancer"); } else if (last) { lastReceived = true; @@ -759,14 +755,8 @@ else if (last) { public void nodeDown(long eventUID, String nodeID) { if (nodes.remove(nodeID)) { removedNodes.put(nodeID, eventUID); - Runnable runnable = new Runnable() { - @Override - public void run() { - rebalance(); - } - }; - Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer"); - t.start(); + ActiveMQRALogger.LOGGER.rebalancingConnections("nodeDown " + nodeID); + startReconnectThread("NodeDOWN Connection Rebalancer"); } } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index a180fc78199..b0d64ccf5c0 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -386,7 +386,8 @@ public void onMessage(final ClientMessage message) { session.resetIfNeeded(); } catch (ActiveMQException e) { - ActiveMQRALogger.LOGGER.unableToResetSession(); + ActiveMQRALogger.LOGGER.unableToResetSession(activation.toString(), e); + activation.startReconnectThread("Reset MessageHandler after Failure Thread"); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index fbf96553642..77cfd0d457f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -27,6 +27,7 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -2924,7 +2925,7 @@ protected static class ScheduledDeliveryEncoding extends QueueEncoding { @Override public String toString() { - return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]"; + return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "(" + new Date(scheduledDeliveryTime) + ")]"; } private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index b8b30bcd74d..f638fbcf797 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -28,11 +28,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; @@ -218,6 +220,11 @@ public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long mes return bytes; } + // for tests + public ClientSessionFactory getSessionFactory() { + return csf; + } + /* (non-Javadoc) * @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages() */ @@ -905,8 +912,24 @@ protected void connect() { scheduleRetryConnect(); } } + catch (ActiveMQInterruptedException e) { + ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this); + } + catch (InterruptedException e) { + ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this); + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this); + if (csf != null) { + try { + csf.close(); + csf = null; + } + catch (Throwable ignored) { + } + } + fail(false); + scheduleRetryConnect(); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index e04c35c5a1e..4a8b16ac8f7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -285,7 +285,8 @@ public HandleStatus handle(final MessageReference ref) throws Exception { // If the consumer is stopped then we don't accept the message, it // should go back into the // queue for delivery later. - if (!started || transferring || !callback.isWritable(this)) { + // TCP-flow control has to be done first than everything else otherwise we may lose notifications + if (!callback.isWritable(this) || !started || transferring ) { return HandleStatus.BUSY; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d036fdfacdd..e21102c9e78 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1047,7 +1047,7 @@ public synchronized void xaStart(final Xid xid) throws Exception { ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString()); try { - if (!tx.isEffective()) { + if (tx.getState() != Transaction.State.PREPARED) { // we don't want to rollback anything prepared here if (tx.getXid() != null) { resourceManager.removeTransaction(tx.getXid()); @@ -1085,7 +1085,7 @@ public synchronized void xaFailed(final Xid xid) throws Exception { } if (theTX.isEffective()) { - ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared"); + ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it " + theTX.getState()); tx = null; } else { @@ -1568,9 +1568,10 @@ private void doRollback(final boolean clientFailed, if (theTx.getState() == State.ROLLEDBACK) { Transaction newTX = newTransaction(); cancelAndRollback(clientFailed, newTX, wasStarted, toCancel); - throw new IllegalStateException("Transaction has already been rolled back"); } - cancelAndRollback(clientFailed, theTx, wasStarted, toCancel); + else { + cancelAndRollback(clientFailed, theTx, wasStarted, toCancel); + } } private void cancelAndRollback(boolean clientFailed, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 3490feee212..ee90c4a4f56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -32,6 +33,8 @@ public class TransactionImpl implements Transaction { + private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); + private List operations; private static final int INITIAL_NUM_PROPERTIES = 10; @@ -105,8 +108,7 @@ public TransactionImpl(final long id, final Xid xid, final StorageManager storag @Override public boolean isEffective() { - return state == State.PREPARED || state == State.COMMITTED; - + return state == State.PREPARED || state == State.COMMITTED || state == State.ROLLEDBACK; } @Override @@ -141,32 +143,43 @@ public long getCreateTime() { @Override public boolean hasTimedOut(final long currentTime, final int defaultTimeout) { - if (timeoutSeconds == -1) { - return getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000; - } - else { - return getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000; + synchronized (timeoutLock) { + boolean timedout; + if (timeoutSeconds == -1) { + timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000; + } + else { + timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000; + } + + if (timedout) { + markAsRollbackOnly(new ActiveMQException("TX Timeout")); + } + + return timedout; } } @Override public void prepare() throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::" + this); + } storageManager.readLock(); try { synchronized (timeoutLock) { if (isEffective()) { - ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call"); + ActiveMQServerLogger.LOGGER.debug("TransactionImpl::prepare::" + this + " is being ignored"); return; } if (state == State.ROLLBACK_ONLY) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this); + } + + internalRollback(); + if (exception != null) { - // this TX will never be rolled back, - // so we reset it now - beforeRollback(); - afterRollback(); - if (operations != null) { - operations.clear(); - } throw exception; } else { @@ -216,14 +229,17 @@ public void commit() throws Exception { @Override public void commit(final boolean onePhase) throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::commit::" + this); + } synchronized (timeoutLock) { if (state == State.COMMITTED) { // I don't think this could happen, but just in case - ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call"); + ActiveMQServerLogger.LOGGER.debug("TransactionImpl::commit::" + this + " is being ignored"); return; } if (state == State.ROLLBACK_ONLY) { - rollback(); + internalRollback(); if (exception != null) { throw exception; @@ -236,12 +252,12 @@ public void commit(final boolean onePhase) throws Exception { if (xid != null) { if (onePhase && state != State.ACTIVE || !onePhase && state != State.PREPARED) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } else { if (state != State.ACTIVE) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } @@ -249,6 +265,11 @@ public void commit(final boolean onePhase) throws Exception { doCommit(); + // We want to make sure that nothing else gets done after the commit is issued + // this will eliminate any possibility or races + final List operationsToComplete = this.operations; + this.operations = null; + // We use the Callback even for non persistence // If we are using non-persistence with replication, the replication manager will have // to execute this runnable in the correct order @@ -263,7 +284,7 @@ public void onError(final int errorCode, final String errorMessage) { @Override public void done() { - afterCommit(); + afterCommit(operationsToComplete); } }); @@ -285,44 +306,65 @@ protected void doCommit() throws Exception { @Override public void rollback() throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::rollback::" + this); + } + synchronized (timeoutLock) { if (state == State.ROLLEDBACK) { // I don't think this could happen, but just in case - ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace")); + ActiveMQServerLogger.LOGGER.debug("TransactionImpl::rollback::" + this + " is being ignored"); return; } if (xid != null) { if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } else { if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) { - throw new IllegalStateException("Transaction is in invalid state " + state); + throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); } } - beforeRollback(); - - doRollback(); - state = State.ROLLEDBACK; + internalRollback(); + } + } - // We use the Callback even for non persistence - // If we are using non-persistence with replication, the replication manager will have - // to execute this runnable in the correct order - storageManager.afterCompleteOperations(new IOCallback() { + private void internalRollback() throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::internalRollback " + this); + } - @Override - public void onError(final int errorCode, final String errorMessage) { - ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); - } + beforeRollback(); - @Override - public void done() { - afterRollback(); - } - }); + try { + doRollback(); + state = State.ROLLEDBACK; + } + catch (IllegalStateException e) { + // Something happened before and the TX didn't make to the Journal / Storage + // We will like to execute afterRollback and clear anything pending + ActiveMQServerLogger.LOGGER.warn(e); } + // We want to make sure that nothing else gets done after the commit is issued + // this will eliminate any possibility or races + final List operationsToComplete = this.operations; + this.operations = null; + + // We use the Callback even for non persistence + // If we are using non-persistence with replication, the replication manager will have + // to execute this runnable in the correct order + storageManager.afterCompleteOperations(new IOCallback() { + + public void onError(final int errorCode, final String errorMessage) { + ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); + } + + public void done() { + afterRollback(operationsToComplete); + } + }); } @Override @@ -361,10 +403,14 @@ public Xid getXid() { } @Override - public void markAsRollbackOnly(final ActiveMQException exception1) { + public void markAsRollbackOnly(final ActiveMQException exception) { synchronized (timeoutLock) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("TransactionImpl::" + this + " marking rollbackOnly for " + exception.toString() + ", msg=" + exception.getMessage()); + } + if (isEffective()) { - ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)"); + ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared, committed or rolledback!)"); return; } @@ -373,7 +419,7 @@ public void markAsRollbackOnly(final ActiveMQException exception1) { } state = State.ROLLBACK_ONLY; - this.exception = exception1; + this.exception = exception; } } @@ -434,19 +480,23 @@ private void checkCreateOperations() { } } - private synchronized void afterCommit() { - if (operations != null) { - for (TransactionOperation operation : operations) { + private synchronized void afterCommit(List oeprationsToComplete) { + if (oeprationsToComplete != null) { + for (TransactionOperation operation : oeprationsToComplete) { operation.afterCommit(this); } + // Help out GC here + oeprationsToComplete.clear(); } } - private synchronized void afterRollback() { - if (operations != null) { - for (TransactionOperation operation : operations) { + private synchronized void afterRollback(List oeprationsToComplete) { + if (oeprationsToComplete != null) { + for (TransactionOperation operation : oeprationsToComplete) { operation.afterRollback(this); } + // Help out GC here + oeprationsToComplete.clear(); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java new file mode 100644 index 00000000000..3909c3c8f02 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -0,0 +1,673 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.transaction.impl; + +import javax.transaction.xa.Xid; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.paging.PageTransactionInfo; +import org.apache.activemq.artemis.core.paging.PagedMessage; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.persistence.GroupingInfo; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; +import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; +import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.replication.ReplicationManager; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.RouteContextList; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.JournalLoader; +import org.apache.activemq.artemis.core.transaction.ResourceManager; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperation; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class TransactionImplTest extends ActiveMQTestBase { + + @Test + public void testTimeoutAndThenCommitWithARollback() throws Exception { + TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10); + Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10)); + + final AtomicInteger commit = new AtomicInteger(0); + final AtomicInteger rollback = new AtomicInteger(0); + + tx.addOperation(new TransactionOperation() { + @Override + public void beforePrepare(Transaction tx) throws Exception { + + } + + @Override + public void afterPrepare(Transaction tx) { + + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + + } + + @Override + public void afterCommit(Transaction tx) { + System.out.println("commit..."); + commit.incrementAndGet(); + } + + @Override + public void beforeRollback(Transaction tx) throws Exception { + + } + + @Override + public void afterRollback(Transaction tx) { + System.out.println("rollback..."); + rollback.incrementAndGet(); + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + + @Override + public List getListOnConsumer(long consumerID) { + return null; + } + }); + + for (int i = 0; i < 2; i++) { + try { + tx.commit(); + Assert.fail("Exception expected!"); + } + catch (ActiveMQException expected) { + } + } + + // it should just be ignored! + tx.rollback(); + + Assert.assertEquals(0, commit.get()); + Assert.assertEquals(1, rollback.get()); + + } + + @Test + public void testTimeoutThenRollbackWithRollback() throws Exception { + TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10); + Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10)); + + final AtomicInteger commit = new AtomicInteger(0); + final AtomicInteger rollback = new AtomicInteger(0); + + tx.addOperation(new TransactionOperation() { + @Override + public void beforePrepare(Transaction tx) throws Exception { + + } + + @Override + public void afterPrepare(Transaction tx) { + + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + + } + + @Override + public void afterCommit(Transaction tx) { + System.out.println("commit..."); + commit.incrementAndGet(); + } + + @Override + public void beforeRollback(Transaction tx) throws Exception { + + } + + @Override + public void afterRollback(Transaction tx) { + System.out.println("rollback..."); + rollback.incrementAndGet(); + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + + @Override + public List getListOnConsumer(long consumerID) { + return null; + } + }); + + tx.rollback(); + + // This is a case where another failure was detected (In parallel with the TX timeout for instance) + tx.markAsRollbackOnly(new ActiveMQException("rollback only again")); + tx.rollback(); + + Assert.assertEquals(0, commit.get()); + Assert.assertEquals(1, rollback.get()); + + } + + class FakeSM implements StorageManager { + + @Override + public OperationContext getContext() { + return null; + } + + @Override + public void lineUpContext() { + + } + + @Override + public OperationContext newContext(Executor executor) { + return null; + } + + @Override + public OperationContext newSingleThreadContext() { + return null; + } + + @Override + public void setContext(OperationContext context) { + + } + + @Override + public void stop(boolean ioCriticalError) throws Exception { + + } + + @Override + public void pageClosed(SimpleString storeName, int pageNumber) { + + } + + @Override + public void pageDeleted(SimpleString storeName, int pageNumber) { + + } + + @Override + public void pageWrite(PagedMessage message, int pageNumber) { + + } + + @Override + public void afterCompleteOperations(IOCallback run) { + run.done(); + } + + @Override + public boolean waitOnOperations(long timeout) throws Exception { + return false; + } + + @Override + public void waitOnOperations() throws Exception { + + } + + @Override + public void beforePageRead() throws Exception { + + } + + @Override + public void afterPageRead() throws Exception { + + } + + @Override + public ByteBuffer allocateDirectBuffer(int size) { + return null; + } + + @Override + public void freeDirectBuffer(ByteBuffer buffer) { + + } + + @Override + public void clearContext() { + + } + + @Override + public void confirmPendingLargeMessageTX(Transaction transaction, + long messageID, + long recordID) throws Exception { + + } + + @Override + public void confirmPendingLargeMessage(long recordID) throws Exception { + + } + + @Override + public void storeMessage(ServerMessage message) throws Exception { + + } + + @Override + public void storeReference(long queueID, long messageID, boolean last) throws Exception { + + } + + @Override + public void deleteMessage(long messageID) throws Exception { + + } + + @Override + public void storeAcknowledge(long queueID, long messageID) throws Exception { + + } + + @Override + public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception { + + } + + @Override + public void updateDeliveryCount(MessageReference ref) throws Exception { + + } + + @Override + public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { + + } + + @Override + public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception { + + } + + @Override + public void deleteDuplicateID(long recordID) throws Exception { + + } + + @Override + public void storeMessageTransactional(long txID, ServerMessage message) throws Exception { + + } + + @Override + public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception { + + } + + @Override + public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception { + + } + + @Override + public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception { + + } + + @Override + public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception { + + } + + @Override + public void deleteCursorAcknowledge(long ackID) throws Exception { + + } + + @Override + public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception { + + } + + @Override + public void deletePageComplete(long ackID) throws Exception { + + } + + @Override + public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception { + + } + + @Override + public void storeDuplicateIDTransactional(long txID, + SimpleString address, + byte[] duplID, + long recordID) throws Exception { + + } + + @Override + public void updateDuplicateIDTransactional(long txID, + SimpleString address, + byte[] duplID, + long recordID) throws Exception { + + } + + @Override + public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception { + + } + + @Override + public LargeServerMessage createLargeMessage() { + return null; + } + + @Override + public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception { + return null; + } + + @Override + public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) { + return null; + } + + @Override + public void prepare(long txID, Xid xid) throws Exception { + + } + + @Override + public void commit(long txID) throws Exception { + + } + + @Override + public void commit(long txID, boolean lineUpContext) throws Exception { + + } + + @Override + public void rollback(long txID) throws Exception { + + } + + @Override + public void rollbackBindings(long txID) throws Exception { + + } + + @Override + public void commitBindings(long txID) throws Exception { + + } + + @Override + public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception { + + } + + @Override + public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception { + + } + + @Override + public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception { + + } + + @Override + public void deletePageTransactional(long recordID) throws Exception { + + } + + @Override + public JournalLoadInformation loadMessageJournal(PostOffice postOffice, + PagingManager pagingManager, + ResourceManager resourceManager, + Map queueInfos, + Map>> duplicateIDMap, + Set> pendingLargeMessages, + List pendingNonTXPageCounter, + JournalLoader journalLoader) throws Exception { + return null; + } + + @Override + public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception { + return 0; + } + + @Override + public void deleteHeuristicCompletion(long id) throws Exception { + + } + + @Override + public void addQueueBinding(long tx, Binding binding) throws Exception { + + } + + @Override + public void deleteQueueBinding(long tx, long queueBindingID) throws Exception { + + } + + @Override + public JournalLoadInformation loadBindingJournal(List queueBindingInfos, + List groupingInfos) throws Exception { + return null; + } + + @Override + public void addGrouping(GroupBinding groupBinding) throws Exception { + + } + + @Override + public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception { + + } + + @Override + public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception { + + } + + @Override + public void deleteAddressSetting(SimpleString addressMatch) throws Exception { + + } + + @Override + public List recoverAddressSettings() throws Exception { + return null; + } + + @Override + public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception { + + } + + @Override + public void deleteSecurityRoles(SimpleString addressMatch) throws Exception { + + } + + @Override + public List recoverPersistedRoles() throws Exception { + return null; + } + + @Override + public long storePageCounter(long txID, long queueID, long value) throws Exception { + return 0; + } + + @Override + public long storePendingCounter(long queueID, long pageID, int inc) throws Exception { + return 0; + } + + @Override + public void deleteIncrementRecord(long txID, long recordID) throws Exception { + + } + + @Override + public void deletePageCounter(long txID, long recordID) throws Exception { + + } + + @Override + public void deletePendingPageCounter(long txID, long recordID) throws Exception { + + } + + @Override + public long storePageCounterInc(long txID, long queueID, int add) throws Exception { + return 0; + } + + @Override + public long storePageCounterInc(long queueID, int add) throws Exception { + return 0; + } + + @Override + public Journal getBindingsJournal() { + return null; + } + + @Override + public Journal getMessageJournal() { + return null; + } + + @Override + public void startReplication(ReplicationManager replicationManager, + PagingManager pagingManager, + String nodeID, + boolean autoFailBack, + long initialReplicationSyncTimeout) throws Exception { + + } + + @Override + public boolean addToPage(PagingStore store, + ServerMessage msg, + Transaction tx, + RouteContextList listCtx) throws Exception { + return false; + } + + @Override + public void stopReplication() { + + } + + @Override + public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception { + + } + + @Override + public void storeID(long journalID, long id) throws Exception { + + } + + @Override + public void deleteID(long journalD) throws Exception { + + } + + @Override + public void readLock() { + + } + + @Override + public void readUnLock() { + + } + + @Override + public void persistIdGenerator() { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public boolean isStarted() { + return false; + } + + @Override + public long generateID() { + return 0; + } + + @Override + public long getCurrentID() { + return 0; + } + } +} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java new file mode 100644 index 00000000000..387493769fa --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java @@ -0,0 +1,228 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.management.Notification; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * This will simulate a failure of a failure. + * The bridge could eventually during a race or multiple failures not be able to reconnect because it failed again. + * this should make the bridge to always reconnect itself. + */ +@RunWith(BMUnitRunner.class) +public class ClusteredBridgeReconnectTest extends ClusterTestBase { + + static ThreadLocal inConnect = new ThreadLocal(); + + public static void enterConnect() { + inConnect.set(Boolean.TRUE); + } + + public static void exitConnect() { + inConnect.set(null); + } + + public static volatile boolean shouldFail = false; + + public static void send() { + if (inConnect.get() != null) { + if (shouldFail) { + shouldFail = false; + throw new NullPointerException("just because it's a test..."); + } + } + } + + @Test + @BMRules( + rules = {@BMRule( + name = "enter", + targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl", + targetMethod = "connect", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.enterConnect();"), @BMRule( + name = "exit", + targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl", + targetMethod = "connect", + targetLocation = "EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.exitConnect();"), @BMRule( + name = "send", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl", + targetMethod = "send(org.apache.activemq.artemis.core.protocol.core.Packet)", + targetLocation = "EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.send();") + + }) + public void testReconnectBridge() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, true); + createQueue(1, "queues.testaddress", "queue0", null, true); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 1, 1, false); + + ClientSession session0 = sfs[0].createSession(); + ClientSession session1 = sfs[0].createSession(); + + session0.start(); + session1.start(); + + ClientProducer producer = session0.createProducer("queues.testaddress"); + + int NUMBER_OF_MESSAGES = 100; + + Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size()); + + ClusterConnectionImpl connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0]; + Assert.assertEquals(1, connection.getRecords().size()); + + MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0]; + ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge(); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + ClientMessage msg = session0.createMessage(true); + producer.send(msg); + session0.commit(); + + if (i == 17) { + shouldFail = true; + bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!")); + } + } + + int cons0Count = 0, cons1Count = 0; + + while (true) { + ClientMessage msg = consumers[0].getConsumer().receive(1000); + if (msg == null) { + break; + } + cons0Count++; + msg.acknowledge(); + session0.commit(); + } + + while (true) { + ClientMessage msg = consumers[1].getConsumer().receive(1000); + if (msg == null) { + break; + } + cons1Count++; + msg.acknowledge(); + session1.commit(); + } + + Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count); + + session0.commit(); + session1.commit(); + + stopServers(0, 1); + + } + + static CountDownLatch latch; + static CountDownLatch latch2; + static Thread main; + + public static void pause(SimpleString clusterName) { + if (clusterName.toString().startsWith("queue0")) { + try { + latch2.countDown(); + latch.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void pause2(Notification notification) { + if (notification.getType() == CoreNotificationType.BINDING_REMOVED) { + SimpleString clusterName = notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); + boolean inMain = main == Thread.currentThread(); + if (clusterName.toString().startsWith("queue0") && !inMain) { + try { + latch2.countDown(); + latch.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + public static void restart2() { + latch.countDown(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + shouldFail = false; + } + + @Override + @After + public void tearDown() throws Exception { + closeAllConsumers(); + closeAllSessionFactories(); + closeAllServerLocatorsFactories(); + super.tearDown(); + } + + public boolean isNetty() { + return true; + } +} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java index 5b2dd0dac5a..52cc9d8af97 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java @@ -74,6 +74,11 @@ public static void resetLatches(int numberOfThreads) { latchFlag.setCount(1); } + @Override + protected boolean usePersistence() { + return true; + } + @Test @BMRules( rules = {@BMRule( @@ -84,6 +89,7 @@ public static void resetLatches(int numberOfThreads) { action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")}) public void testConcurrentCancels() throws Exception { + System.out.println(server.getConfiguration().getJournalLocation().toString()); server.getAddressSettingsRepository().clear(); AddressSettings settings = new AddressSettings(); settings.setMaxDeliveryAttempts(-1); @@ -184,18 +190,6 @@ public void run() { } } }); - // - // consumer.close(); - // - // threads.add(new Thread("ClientFailing") - // { - // public void run() - // { - // ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession(); - // impl.getConnection().fail(new HornetQException("failure")); - // } - // }); - // for (Thread t : threads) { t.start(); @@ -213,47 +207,55 @@ public void run() { } Connection connection = cf.createConnection(); - connection.setClientID("myID"); - - Thread.sleep(2000); // I am too lazy to call end on all the transactions - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - HashMap mapCount = new HashMap<>(); - - while (true) { - TextMessage message = (TextMessage) consumer.receiveNoWait(); - if (message == null) { - break; - } + try { + connection.setClientID("myID"); - Integer value = message.getIntProperty("i"); + Thread.sleep(5000); // I am too lazy to call end on all the transactions + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + HashMap mapCount = new HashMap<>(); + + while (true) { + TextMessage message = (TextMessage) consumer.receiveNoWait(); + if (message == null) { + break; + } - AtomicInteger count = mapCount.get(value); - if (count == null) { - count = new AtomicInteger(0); - mapCount.put(message.getIntProperty("i"), count); - } + Integer value = message.getIntProperty("i"); - count.incrementAndGet(); - } + AtomicInteger count = mapCount.get(value); + if (count == null) { + count = new AtomicInteger(0); + mapCount.put(message.getIntProperty("i"), count); + } - boolean failed = false; - for (int i = 0; i < numberOfMessages; i++) { - AtomicInteger count = mapCount.get(i); - if (count == null) { - System.out.println("Message " + i + " not received"); - failed = true; + count.incrementAndGet(); } - else if (count.get() > 1) { - System.out.println("Message " + i + " received " + count.get() + " times"); - failed = true; + + boolean failed = false; + for (int i = 0; i < numberOfMessages; i++) { + AtomicInteger count = mapCount.get(i); + if (count == null) { + System.out.println("Message " + i + " not received"); + failed = true; + } + else if (count.get() > 1) { + System.out.println("Message " + i + " received " + count.get() + " times"); + failed = true; + } } - } - Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed); + if (failed) { + System.err.println("Failed"); + System.exit(-1); + } - connection.close(); + Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed); + } + finally { + connection.close(); + } } } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java index 22611bce813..76679efa588 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java @@ -27,6 +27,7 @@ import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -91,7 +92,7 @@ public void setUp() throws Exception { @BMRule( name = "afterRollback TX", targetClass = "org.apache.activemq.artemis.core.transaction.impl.TransactionImpl", - targetMethod = "afterRollback()", + targetMethod = "afterRollback", targetLocation = "ENTRY", helper = "org.apache.activemq.artemis.tests.extras.byteman.TimeoutXATest", action = "afterRollback()")}) @@ -166,23 +167,20 @@ public void run() { Thread.sleep(1000); removingTXAwait0.countDown(); - enteredRollbackLatch.await(); + Assert.assertTrue(enteredRollbackLatch.await(10, TimeUnit.SECONDS)); waitingRollbackLatch.countDown(); t.join(); consumer.close(); -// -// connction2.start(); -// + consumer = session.createConsumer(queue); for (int i = 0; i < 10; i++) { Assert.assertNotNull(consumer.receive(5000)); } Assert.assertNull(consumer.receiveNoWait()); -// session.commit(); -// session.close(); + connection.close(); connction2.close(); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java index 41f47b55ebe..063a254645a 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java @@ -70,6 +70,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase ServerLocator nettyLocator; + // This thread will keep bugging the handlers. + // if they behave well with XA, the test pass! + final AtomicBoolean running = new AtomicBoolean(true); + private volatile boolean playTXTimeouts = true; private volatile boolean playServerClosingSession = true; private volatile boolean playServerClosingConsumer = true; @@ -85,6 +89,7 @@ public void setUp() throws Exception { super.setUp(); createQueue(true, "outQueue"); DummyTMLocator.startTM(); + running.set(true); } @Override @@ -113,6 +118,11 @@ public void testReconnectMDBNoMessageLoss() throws Exception { server.getAddressSettingsRepository().addMatch("#", settings); ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); resourceAdapter = qResourceAdapter; + resourceAdapter.setConfirmationWindowSize(-1); + resourceAdapter.setCallTimeout(1000L); + resourceAdapter.setConsumerWindowSize(1024 * 1024); + resourceAdapter.setReconnectAttempts(-1); + resourceAdapter.setRetryInterval(100L); // qResourceAdapter.setTransactionManagerLocatorClass(DummyTMLocator.class.getName()); // qResourceAdapter.setTransactionManagerLocatorMethod("getTM"); @@ -125,17 +135,18 @@ public void testReconnectMDBNoMessageLoss() throws Exception { final int NUMBER_OF_SESSIONS = 10; ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); - spec.setMaxSession(NUMBER_OF_SESSIONS); + spec.setTransactionTimeout(1); - spec.setReconnectAttempts(-1); - spec.setConfirmationWindowSize(-1); - spec.setReconnectInterval(1000); - spec.setCallTimeout(1000L); + spec.setMaxSession(NUMBER_OF_SESSIONS); + spec.setSetupAttempts(-1); + spec.setSetupInterval(100); spec.setResourceAdapter(qResourceAdapter); spec.setUseJNDI(false); spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); - spec.setConsumerWindowSize(1024 * 1024); + + // Some the routines would be screwed up if using the default one + Assert.assertFalse(spec.isHasBeenUpdated()); TestEndpointFactory endpointFactory = new TestEndpointFactory(true); qResourceAdapter.endpointActivation(endpointFactory, spec); @@ -146,7 +157,6 @@ public void testReconnectMDBNoMessageLoss() throws Exception { final int NUMBER_OF_MESSAGES = 1000; Thread producer = new Thread() { - @Override public void run() { try { ServerLocator locator = createInVMLocator(0); @@ -155,11 +165,18 @@ public void run() { ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED); + StringBuffer buffer = new StringBuffer(); + + for (int b = 0; b < 500; b++) { + buffer.append("ab"); + } + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { ClientMessage message = session.createMessage(true); - message.getBodyBuffer().writeString("teststring " + i); + message.getBodyBuffer().writeString(buffer.toString() + i); + message.putIntProperty("i", i); clientProducer.send(message); @@ -181,12 +198,7 @@ public void run() { final AtomicBoolean metaDataFailed = new AtomicBoolean(false); - // This thread will keep bugging the handlers. - // if they behave well with XA, the test pass! - final AtomicBoolean running = new AtomicBoolean(true); - Thread buggerThread = new Thread() { - @Override public void run() { while (running.get()) { try { @@ -197,7 +209,7 @@ public void run() { return; } - List serverSessions = lookupServerSessions("resource-adapter"); + List serverSessions = lookupServerSessions("resource-adapter", NUMBER_OF_SESSIONS); System.err.println("Contains " + serverSessions.size() + " RA sessions"); @@ -256,6 +268,13 @@ else if (serverSessions.size() == NUMBER_OF_SESSIONS) { break; } + if (i == NUMBER_OF_MESSAGES * 0.50) { + // This is to make sure the MDBs will survive a reboot + // and no duplications or message loss will happen because of this + System.err.println("Rebooting the MDBs at least once!"); + activation.startReconnectThread("I"); + } + if (i == NUMBER_OF_MESSAGES * 0.90) { System.out.println("Disabled failures at " + i); playTXTimeouts = false; @@ -266,17 +285,7 @@ else if (serverSessions.size() == NUMBER_OF_SESSIONS) { System.out.println("Received " + i + " messages"); - Assert.assertNotNull(message); - message.acknowledge(); - - Integer value = message.getIntProperty("i"); - AtomicInteger mapCount = new AtomicInteger(1); - - mapCount = mapCounter.putIfAbsent(value, mapCount); - - if (mapCount != null) { - mapCount.incrementAndGet(); - } + doReceiveMessage(message); if (i % 200 == 0) { System.out.println("received " + i); @@ -285,6 +294,20 @@ else if (serverSessions.size() == NUMBER_OF_SESSIONS) { } session.commit(); + + while (true) { + ClientMessage message = consumer.receiveImmediate(); + if (message == null) { + break; + } + + System.out.println("Received extra message " + message); + + doReceiveMessage(message); + } + + session.commit(); + Assert.assertNull(consumer.receiveImmediate()); StringWriter writer = new StringWriter(); @@ -328,14 +351,42 @@ else if (atomicInteger.get() > 1) { } - private List lookupServerSessions(String parameter) { - List serverSessions = new LinkedList<>(); + private void doReceiveMessage(ClientMessage message) throws Exception { + Assert.assertNotNull(message); + message.acknowledge(); + Integer value = message.getIntProperty("i"); + AtomicInteger mapCount = new AtomicInteger(1); - for (ServerSession session : server.getSessions()) { - if (session.getMetaData(parameter) != null) { - serverSessions.add(session); - } + mapCount = mapCounter.putIfAbsent(value, mapCount); + + if (mapCount != null) { + mapCount.incrementAndGet(); } + } + + private List lookupServerSessions(String parameter, int numberOfSessions) { + long timeout = System.currentTimeMillis() + 50000; + List serverSessions = new LinkedList(); + do { + if (!serverSessions.isEmpty()) { + System.err.println("Retry on serverSessions!!! currently with " + serverSessions.size()); + serverSessions.clear(); + try { + Thread.sleep(100); + } + catch (Exception e) { + break; + } + } + serverSessions.clear(); + for (ServerSession session : server.getSessions()) { + if (session.getMetaData(parameter) != null) { + serverSessions.add(session); + } + } + } while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis()); + + System.err.println("Returning " + serverSessions.size() + " sessions"); return serverSessions; } @@ -347,7 +398,6 @@ public TestEndpointFactory(boolean deliveryTransacted) { isDeliveryTransacted = deliveryTransacted; } - @Override public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException { TestEndpoint retEnd = new TestEndpoint(); if (xaResource != null) { @@ -356,7 +406,6 @@ public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableE return retEnd; } - @Override public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { return isDeliveryTransacted; } @@ -397,7 +446,6 @@ public void beforeDelivery(Method method) throws NoSuchMethodException, Resource } - @Override public void onMessage(Message message) { Integer value = 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java index e937b26ff04..03a1b6d477a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java @@ -680,6 +680,10 @@ public void testDeliveringCountOnBridgeConnectionFailure() throws Exception { } } + for (int i = 0; i < 100 && queue.getDeliveringCount() != 0; i++) { + Thread.sleep(10); + } + System.out.println("Check.. DeliveringCount: " + queue.getDeliveringCount()); assertEquals("Delivering count of a source queue should be zero on connection failure", 0, queue.getDeliveringCount()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java index f7a6d257ee8..d21bd84d2c0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -185,7 +186,12 @@ private void runTest(final TestRunner runnable) throws Throwable { AsynchronousFailoverTest.log.info("Fail complete"); - t.join(); + t.join(TimeUnit.SECONDS.toMillis(20)); + if (t.isAlive()) { + System.out.println(threadDump("Thread still running from the test")); + t.interrupt(); + fail("Test didn't complete successful, thread still running"); + } runnable.checkForExceptions(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java index 272214b8940..aea899cf791 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java @@ -16,13 +16,21 @@ */ package org.apache.activemq.artemis.tests.integration.ra; +import javax.jms.Connection; +import javax.resource.ResourceException; +import javax.resource.spi.endpoint.MessageEndpoint; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; -import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @@ -35,16 +43,6 @@ import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.junit.Test; -import javax.jms.Connection; -import javax.resource.ResourceException; -import javax.resource.spi.endpoint.MessageEndpoint; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; - public class ResourceAdapterTest extends ActiveMQRATestBase { @Test @@ -86,29 +84,17 @@ public void testStartStopActivationManyTimes() throws Exception { ServerLocatorImpl serverLocator = (ServerLocatorImpl) ra.getDefaultActiveMQConnectionFactory().getServerLocator(); - Field f = Class.forName(ServerLocatorImpl.class.getName()).getDeclaredField("factories"); - Set resources = ra.getRecoveryManager().getResources(); - f.setAccessible(true); - - Set factories = (Set) f.get(serverLocator); - for (int i = 0; i < 10; i++) { System.out.println(i); - assertEquals(factories.size(), 0); activation.start(); - assertEquals(factories.size(), 15); assertEquals(1, resources.size()); activation.stop(); - assertEquals(factories.size(), 0); } - System.out.println("before RA stop => " + factories.size()); ra.stop(); assertEquals(0, resources.size()); - System.out.println("after RA stop => " + factories.size()); - assertEquals(factories.size(), 0); locator.close(); } @@ -402,7 +388,7 @@ public void testResourceAdapterSetupNoOverrideDiscovery() throws Exception { spec.setUseJNDI(false); spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); DiscoveryGroupConfiguration dc = fac.getServerLocator().getDiscoveryGroupConfiguration(); UDPBroadcastEndpointFactory udpDg = (UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory(); assertEquals(udpDg.getGroupAddress(), "231.6.6.6"); @@ -430,7 +416,7 @@ public void testResourceAdapterSetupOverrideDiscovery() throws Exception { spec.setDiscoveryPort(1234); spec.setDiscoveryInitialWaitTimeout(1L); spec.setDiscoveryRefreshTimeout(1L); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); DiscoveryGroupConfiguration dc = fac.getServerLocator().getDiscoveryGroupConfiguration(); UDPBroadcastEndpointFactory udpDg = (UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory(); assertEquals(udpDg.getGroupAddress(), "231.6.6.6"); @@ -455,7 +441,7 @@ public void testResourceAdapterSetupNoHAOverride() throws Exception { spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); assertTrue(fac.isHA()); @@ -477,7 +463,7 @@ public void testResourceAdapterSetupNoHADefault() throws Exception { spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); assertFalse(fac.isHA()); @@ -499,7 +485,7 @@ public void testResourceAdapterSetupHAOverride() throws Exception { spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); spec.setHA(true); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); assertTrue(fac.isHA()); @@ -522,7 +508,7 @@ public void testResourceAdapterSetupNoReconnectAttemptsOverride() throws Excepti spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); assertEquals(100, fac.getReconnectAttempts()); @@ -544,7 +530,7 @@ public void testResourceAdapterSetupReconnectAttemptDefault() throws Exception { spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); assertEquals(-1, fac.getReconnectAttempts()); @@ -566,7 +552,7 @@ public void testResourceAdapterSetupReconnectAttemptsOverride() throws Exception spec.setDestinationType("javax.jms.Queue"); spec.setDestination(MDBQUEUE); spec.setReconnectAttempts(100); - ActiveMQConnectionFactory fac = qResourceAdapter.createActiveMQConnectionFactory(spec); + ActiveMQConnectionFactory fac = qResourceAdapter.getConnectionFactory(spec); assertEquals(100, fac.getReconnectAttempts()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java index 6cb6d0bab2d..61069ac4140 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java @@ -381,6 +381,9 @@ public void testNonPersistent(final boolean commit) throws Exception { else { clientSession.rollback(xid); } + + xids = clientSession.recover(XAResource.TMSTARTRSCAN); + Assert.assertEquals(xids.length, 0); } @Test diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java index 82612490bca..3932f7550e9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java @@ -24,22 +24,22 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.ra.ConnectionFactoryProperties; import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.ra.ConnectionFactoryProperties; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; @@ -99,7 +99,7 @@ public void test2DefaultConnectionFactorySame() throws Exception { public void testCreateConnectionFactoryNoOverrides() throws Exception { ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); ra.setConnectorClassName(InVMConnectorFactory.class.getName()); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(new ConnectionFactoryProperties()); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(new ConnectionFactoryProperties()); Assert.assertEquals(factory.getCallTimeout(), ActiveMQClient.DEFAULT_CALL_TIMEOUT); Assert.assertEquals(factory.getClientFailureCheckPeriod(), ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD); Assert.assertEquals(factory.getClientID(), null); @@ -211,7 +211,7 @@ public void testCreateConnectionFactoryOverrides() throws Exception { connectionFactoryProperties.setThreadPoolMaxSize(17); connectionFactoryProperties.setTransactionBatchSize(18); connectionFactoryProperties.setUseGlobalPools(!ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(connectionFactoryProperties); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(connectionFactoryProperties); Assert.assertEquals(factory.getCallTimeout(), 1); Assert.assertEquals(factory.getClientFailureCheckPeriod(), 2); Assert.assertEquals(factory.getClientID(), "myid"); @@ -245,7 +245,7 @@ public void testCreateConnectionFactoryOverrideConnector() throws Exception { ArrayList value = new ArrayList<>(); value.add(NettyConnectorFactory.class.getName()); connectionFactoryProperties.setParsedConnectorClassNames(value); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(connectionFactoryProperties); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(connectionFactoryProperties); ActiveMQConnectionFactory defaultFactory = ra.getDefaultActiveMQConnectionFactory(); Assert.assertNotSame(factory, defaultFactory); } @@ -258,7 +258,7 @@ public void testCreateConnectionFactoryOverrideDiscovery() throws Exception { connectionFactoryProperties.setDiscoveryAddress("myhost"); connectionFactoryProperties.setDiscoveryPort(5678); connectionFactoryProperties.setDiscoveryLocalBindAddress("newAddress"); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(connectionFactoryProperties); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(connectionFactoryProperties); ActiveMQConnectionFactory defaultFactory = ra.getDefaultActiveMQConnectionFactory(); Assert.assertNotSame(factory, defaultFactory); DiscoveryGroupConfiguration dc = factory.getServerLocator().getDiscoveryGroupConfiguration(); @@ -272,7 +272,7 @@ public void testCreateConnectionFactoryOverrideDiscovery() throws Exception { public void testCreateConnectionFactoryMultipleConnectors() { ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); ra.setConnectorClassName(NETTY_CONNECTOR_FACTORY + "," + INVM_CONNECTOR_FACTORY + "," + NETTY_CONNECTOR_FACTORY); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(new ConnectionFactoryProperties()); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(new ConnectionFactoryProperties()); TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations(); assertNotNull(configurations); assertEquals(3, configurations.length); @@ -289,7 +289,7 @@ public void testCreateConnectionFactoryMultipleConnectorsAndParams() { ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); ra.setConnectorClassName(NETTY_CONNECTOR_FACTORY + "," + INVM_CONNECTOR_FACTORY + "," + NETTY_CONNECTOR_FACTORY); ra.setConnectionParameters("host=host1;port=61616, serverid=0, host=host2;port=61617"); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(new ConnectionFactoryProperties()); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(new ConnectionFactoryProperties()); TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations(); assertNotNull(configurations); assertEquals(3, configurations.length); @@ -316,7 +316,7 @@ public void testCreateConnectionFactoryMultipleConnectorsOverride() { value.add(NETTY_CONNECTOR_FACTORY); value.add(INVM_CONNECTOR_FACTORY); overrideProperties.setParsedConnectorClassNames(value); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(overrideProperties); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(overrideProperties); TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations(); assertNotNull(configurations); assertEquals(3, configurations.length); @@ -351,7 +351,7 @@ public void testCreateConnectionFactoryMultipleConnectorsOverrideAndParams() { map3.put("serverid", "1"); connectionParameters.add(map3); overrideProperties.setParsedConnectionParameters(connectionParameters); - ActiveMQConnectionFactory factory = ra.createActiveMQConnectionFactory(overrideProperties); + ActiveMQConnectionFactory factory = ra.getConnectionFactory(overrideProperties); TransportConfiguration[] configurations = factory.getServerLocator().getStaticTransportConfigurations(); assertNotNull(configurations); assertEquals(3, configurations.length); @@ -372,7 +372,7 @@ public void testCreateConnectionFactoryThrowsException() throws Exception { ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); ConnectionFactoryProperties connectionFactoryProperties = new ConnectionFactoryProperties(); try { - ra.createActiveMQConnectionFactory(connectionFactoryProperties); + ra.getConnectionFactory(connectionFactoryProperties); Assert.fail("should throw exception"); } catch (IllegalArgumentException e) {