From 2eac97aaff4b3da81c189385dbbc4031386a6f57 Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 14 Sep 2015 11:18:31 -0500 Subject: [PATCH] ARTEMIS-210 count references It is possible for the closure of one resource to potentially impact another since they are now sharing the same ServerLocator instance. Keep track of references to avoid this. --- .../ra/ActiveMQRAManagedConnection.java | 2 +- .../artemis/ra/ActiveMQResourceAdapter.java | 184 +++++++++--------- .../artemis/ra/inflow/ActiveMQActivation.java | 2 +- .../ra/OutgoingConnectionTest.java | 51 +++++ 4 files changed, 150 insertions(+), 89 deletions(-) diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java index dc84ace6aad..9f628ed778a 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java @@ -301,7 +301,7 @@ public void destroy() throws ResourceException { // we must close the ActiveMQConnectionFactory because it contains a ServerLocator if (connectionFactory != null) { - connectionFactory.close(); + ra.closeConnectionFactory(mcf.getProperties()); } } catch (Throwable e) { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index 24c659fcc14..b4370c96764 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -43,17 +44,18 @@ import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; import org.apache.activemq.artemis.ra.recovery.RecoveryManager; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.artemis.utils.SensitiveDataCodec; @@ -127,7 +129,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { * configured the exact same way. Using the same connection factory instance also makes connection load-balancing * behave as expected for outbound connections. */ - private final Map knownConnectionFactories = new HashMap(); + private final Map> knownConnectionFactories = new HashMap>(); /** * Constructor @@ -275,8 +277,8 @@ public void stop() { managedConnectionFactories.clear(); - for (ActiveMQConnectionFactory knownConnectionFactory : knownConnectionFactories.values()) { - knownConnectionFactory.close(); + for (Pair pair : knownConnectionFactories.values()) { + pair.getA().close(); } knownConnectionFactories.clear(); @@ -1600,119 +1602,119 @@ public void setJgroupsChannelRefName(String jgroupsChannelRefName) { raProperties.setJgroupsChannelRefName(jgroupsChannelRefName); } - public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { + public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { ActiveMQConnectionFactory cf; boolean known = false; - synchronized (knownConnectionFactories) { - if (!knownConnectionFactories.keySet().contains(overrideProperties)) { - List connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); + if (!knownConnectionFactories.keySet().contains(overrideProperties)) { + List connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); - String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); + String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); - Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); + Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); - String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); + String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); - String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); + String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); - String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); - if (ha == null) { - ha = ActiveMQClient.DEFAULT_IS_HA; - } + if (ha == null) { + ha = ActiveMQClient.DEFAULT_IS_HA; + } - if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { - BroadcastEndpointFactory endpointFactory = null; + if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { + BroadcastEndpointFactory endpointFactory = null; - if (jgroupsLocatorClassName != null) { - String jchannelRefName = raProperties.getJgroupsChannelRefName(); - JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); - endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); - } - else if (discoveryAddress != null) { - Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); - if (discoveryPort == null) { - discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; - } - - String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); - } - else if (jgroupsFileName != null) { - endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); - } - Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); - if (refreshTimeout == null) { - refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; + if (jgroupsLocatorClassName != null) { + String jchannelRefName = raProperties.getJgroupsChannelRefName(); + JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); + endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); + } + else if (discoveryAddress != null) { + Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); + if (discoveryPort == null) { + discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; } - Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); + String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); + endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); + } + else if (jgroupsFileName != null) { + endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); + } + Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); + if (refreshTimeout == null) { + refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; + } - if (initialTimeout == null) { - initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; - } + Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); + if (initialTimeout == null) { + initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; + } - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); - } + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); - } + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); + } + + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); } - else if (connectorClassName != null) { - TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); + } + } + else if (connectorClassName != null) { + TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; - List> connectionParams; - if (overrideProperties.getParsedConnectorClassNames() != null) { - connectionParams = overrideProperties.getParsedConnectionParameters(); + List> connectionParams; + if (overrideProperties.getParsedConnectorClassNames() != null) { + connectionParams = overrideProperties.getParsedConnectionParameters(); + } + else { + connectionParams = raProperties.getParsedConnectionParameters(); + } + + for (int i = 0; i < connectorClassName.size(); i++) { + TransportConfiguration tc; + if (connectionParams == null || i >= connectionParams.size()) { + tc = new TransportConfiguration(connectorClassName.get(i)); + ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); } else { - connectionParams = raProperties.getParsedConnectionParameters(); + tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); } - for (int i = 0; i < connectorClassName.size(); i++) { - TransportConfiguration tc; - if (connectionParams == null || i >= connectionParams.size()) { - tc = new TransportConfiguration(connectorClassName.get(i)); - ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); - } - else { - tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); - } - - transportConfigurations[i] = tc; - } + transportConfigurations[i] = tc; + } - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + - Arrays.toString(transportConfigurations) + " with ha=" + ha); - } + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + + Arrays.toString(transportConfigurations) + " with ha=" + ha); + } - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); - } + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); } else { - throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); } - - setParams(cf, overrideProperties); - knownConnectionFactories.put(overrideProperties, cf); } else { - cf = knownConnectionFactories.get(overrideProperties); - known = true; + throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); } + + setParams(cf, overrideProperties); + knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1))); + } + else { + Pair pair = knownConnectionFactories.get(overrideProperties); + cf = pair.getA(); + pair.getB().incrementAndGet(); + known = true; } if (known && cf.getServerLocator().isClosed()) { @@ -1978,4 +1980,12 @@ public void setManagedConnectionFactory(ActiveMQRAManagedConnectionFactory activ public SensitiveDataCodec getCodecInstance() { return raProperties.getCodecInstance(); } + + public synchronized void closeConnectionFactory(ConnectionFactoryProperties properties) { + Pair pair = knownConnectionFactories.get(properties); + int references = pair.getB().decrementAndGet(); + if (pair.getA() != null && pair.getA() != defaultActiveMQConnectionFactory && references == 0) { + knownConnectionFactories.remove(properties).getA().close(); + } + } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 52c05df1963..8e55061c14e 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -427,7 +427,7 @@ public void run() { } if (spec.isHasBeenUpdated() && factory != null) { - factory.close(); + ra.closeConnectionFactory(spec); factory = null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java index db7f2cd7d9b..a1cf442d84a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java @@ -363,4 +363,55 @@ public void testSharedActiveMQConnectionFactory() throws Exception { } } } + + @Test + public void testSharedActiveMQConnectionFactoryWithClose() throws Exception { + Session s = null; + Session s2 = null; + ActiveMQRAManagedConnection mc = null; + ActiveMQRAManagedConnection mc2 = null; + + try { + server.getConfiguration().setSecurityEnabled(false); + resourceAdapter = new ActiveMQResourceAdapter(); + + resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName()); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager(); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setResourceAdapter(resourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection(); + + QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection(); + s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection(); + + mc.destroy(); + + MessageProducer producer = s2.createProducer(ActiveMQJMSClient.createQueue(MDBQUEUE)); + producer.send(s2.createTextMessage("x")); + } + finally { + if (s != null) { + s.close(); + } + + if (mc != null) { + mc.destroy(); + } + + if (s2 != null) { + s2.close(); + } + + if (mc2 != null) { + mc2.destroy(); + } + } + } }