Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

https://issues.jboss.org/browse/HORNETQ-640 implemented XA recovery API

  • Loading branch information...
commit 4d7e33f6d1aa22a4d5a8b751d4df4166a01f6558 1 parent d9e80f5
@andytaylor andytaylor authored
View
348 examples/javaee/xarecovery/server/jbossts-properties.xml
@@ -1,348 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<transaction-service>
- <properties name="documentation">
- <!--
- This is the JBossTS configuration file. Note that starting with JBossTS 4.6
- the names jbossjta-properties.xml and jbossjts-properties.xml are obsolete.
-
- Both the JTA and JTS versions of JBossTS now read their configuration from
- jbossts-properties.xml, although the contents of the file differs between
- the JTA and JTS. Care should be taken to use the correct version of the file.
-
- ***************************
-
- Property values may be literals or be tokens of the form ${p1[,p2][:v]}
- in which case the token values are substituted for the values of the corresponding system
- properties as follows:
-
- - Any occurance of ${p} with the System.getProperty(p) value.
- If there is no such property p defined, then the ${p} reference will remain unchanged.
-
- - If the property reference is of the form ${p:v} and there is no such property p,
- then the default value v will be returned.
-
- - If the property reference is of the form ${p1,p2} or ${p1,p2:v} then
- the primary and the secondary properties will be tried in turn, before
- returning either the unchanged input, or the default value.
-
- The property ${/} is replaced with System.getProperty("file.separator")
- value and the property ${:} is replaced with System.getProperty("path.separator").
-
- Note this substitution applies to property values only at the point they are read from
- the config file. Tokens in system properties won't be substituted.
- -->
- </properties>
- <properties depends="common" name="arjuna">
- <!--
- Transaction Reaper Timeout (default is 120000 ms).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperTimeout" value="120000"/>
- <!--
- Transaction Reaper Mode, can be: PERIODIC or DYNAMIC. Default is DYNAMIC.
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.txReaperMode" value="DYNAMIC"/>
- <!--
- Transaction Reaper Cancel Wait Period (default is 500 ms, min is 10 msecs).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod" value="500"/>
- <!--
- Transaction Reaper Cancel Fail Wait Period (default is 500 ms, min is 10 msecs).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod" value="500"/>
- <!--
- Transaction Reaper Zombie Max (default is 8).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperZombieMax" value="8"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.asyncCommit" value="NO"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.asyncPrepare" value="NO"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.commitOnePhase" value="YES"/>
- <!--
- (default is defaultStore)
- -->
- <property name="com.arjuna.ats.arjuna.objectstore.localOSRoot" value="defaultStore"/>
- <!--
- default is under user.home - must be writeable!)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreDir" value="PutObjectStoreDirHere"/>
- <!--
- (default is ON)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreSync" value="ON"/>
- <!--
- (default is ShadowNoFileLockStore)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreType" value="ShadowNoFileLockStore"/>
- <!--
- (default is 255)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.hashedDirectories" value="255"/>
- <!--
- (default is ON)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.transactionSync" value="ON"/>
- <!--
- (Must be unique across all Arjuna instances.)
- -->
- <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
- <!--
- Base port number for determining a unique number to associate with an instance of the transaction service
- (which is needed in order to support multiple instances on the same machine).
- Use the value 0 to allow the system to select the first available port number.
- If the port number is non-zero and the port is in use then the value will be incremented until either a successful binding
- to the loopback address is created or until the the maximum number of ports (specified by the
- com.arjuna.ats.internal.arjuna.utils.SocketProcessIdMaxPorts property) have been tried or until the port number
- reaches the maximum possible port number.
- -->
- <property
- name="com.arjuna.ats.internal.arjuna.utils.SocketProcessIdPort" value="0"/>
- <!--
- The maximum number of ports to try starting from the value specified by the property
- com.arjuna.ats.internal.arjuna.utils.SocketProcessIdPort. Any non-numeric or value less than 1 will
- defautl to 1.
- -->
- <property
- name="com.arjuna.ats.internal.arjuna.utils.SocketProcessIdMaxPorts" value="1"/>
- <!--
- Run the TransactionStatusManager to allow out-of-process recovery managers to query
- the status of transactions owned by this coordinator. Default is YES.
- This can be set to NO in cases where an ObjectStore is used only by one transaction manager
- and the recovery manager for that store is in the same JVM. In any other cases disabling the
- TransactionStatusManager may cause crash recovery to misbehave.
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.transactionStatusManagerEnable" value="YES"/>
- <!-- property
- name="com.arjuna.ats.arjuna.coordinator.actionStore"
- value="HashedActionStore"
- value="JDBCActionStore"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcTxDbAccess"
- value="JDBCAccess"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreType"
- value="ShadowNoFileLockStore"
- value="JDBCStore"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcUserDbAccess"
- value="JDBCAccess"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeInitial"
- value="1"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeMaximum"
- value="1"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolPutConnections"
- value="false"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.size"
- value=""
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.period"
- value=""
- -->
- <!--
- The location for creating temporary files, e.g., Uids.
- Default is under user.home.
- IMPORTANT: make sure the directory is lockable, e.g., /tmp on Unix
- may not be!
- -->
- <!--
- <property
- name="com.arjuna.ats.arjuna.common.varDir"
- value="var"/>
- -->
- <!-- Should beforeCompletion synchronizations be fired even when it is known the
- transaction can't commit e.g. is marked rollbackOnly? (default NO).
- Note that turning this on still does not guarantee the syncronizations will run in all cases
- e.g. explicit rollback() calls. JTS users should also take into account the supportRollbackSync
- property which affects both beforeCompletion and afterCompletion syncs. -->
- <!--
- <property name="com.arjuna.ats.coordinator.beforeCompletionWhenRollbackOnly" value="NO"/>
- -->
- </properties>
- <properties name="common">
- <!-- CLF 2.0 properties -->
- <property name="com.arjuna.common.util.logging.DebugLevel"
- type="System" value="0x00000000"/>
- <property name="com.arjuna.common.util.logging.FacilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logging.VisibilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logger" type="System" value="log4j"/>
- </properties>
- <properties depends="arjuna" name="txoj">
- <!--
- (default is LockStore of installation - must be writeable!)
- -->
- <!--
- <property
- name="com.arjuna.ats.txoj.lockstore.lockStoreDir"
- value="LockStore"/>
- -->
- <!--
- (default is BasicLockStore)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.lockStoreType" value="BasicLockStore"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.multipleLockStore" value="NO"/>
- <!--
- (default is YES)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.singleLockStore" value="YES"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.txoj.lockstore.allowNestedLocking" value="YES"/>
- </properties>
- <properties depends="arjuna" name="jta">
- <!--
- Support subtransactions in the JTA layer?
- Default is NO.
- -->
- <property name="com.arjuna.ats.jta.supportSubtransactions" value="NO"/>
- <property name="com.arjuna.ats.jta.jtaTMImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple
- -->
- <property name="com.arjuna.ats.jta.jtaUTImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.UserTransactionImple
- -->
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
-
- <!--you'll need something like this if the HornetQ Server is remote-->
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>-->
-
- <!--you'll need something like this if the HornetQ Server is remote and has failover configured-->
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>-->
-
-
- <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
- </properties>
- <properties depends="arjuna,txoj,jta" name="recoverymanager">
- <!--
- Properties used only by the RecoveryManager.
- -->
- <!--
- Periodic recovery settings.
- Time values in this section are in seconds.
- -->
- <!--
- Interval in seconds between initiating the periodic recovery modules.
- Default is 120 seconds.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.periodicRecoveryPeriod" value="120"/>
- <!--
- Interval in seconds between first and second pass of periodic recovery.
- Default is 10 seconds.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" value="10"/>
- <!--
- The port number on which the recovery manager listens.
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryPort" value="4712"/>
- <!--
- The address on which the recovery manager listens.
- If running within an AS then the address the AS is bound to (jboss.bind.address) takes precedence
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryAddress" value=""/>
- <!--
- Periodic recovery modules to use. Invoked in sort-order of names.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension1" value="com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule"/>
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension2" value="com.arjuna.ats.internal.txoj.recovery.TORecoveryModule"/>
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension3" value="com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule"/>
- <!--
- Expired entry removal
- -->
- <!--
- Expiry scanners to use (order of invocation is random).
- Names must begin with "com.arjuna.ats.arjuna.recovery.expiryScanner"
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.expiryScannerTransactionStatusManager" value="com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner"/>
- <!--
- Interval, in hours, between running the expiry scanners.
- This can be quite long. The absolute value determines the interval -
- if the value is negative, the scan will NOT be run until after one
- interval has elapsed. If positive the first scan will be immediately
- after startup. Zero will prevent any scanning.
- Default = 12 = run immediately, then every 12 hours.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.expiryScanInterval" value="12"/>
- <!--
- This is the interval, in hours, after which a process that cannot be contacted will be considered dead.
- It should be long enough to avoid accidentally removing valid entries due to short lived
- transient errors such as network downtime. Zero = Never removed. Default is 12.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerExpiryTime" value="12"/>
- <!--
- Use this to fix the port on which the TransactionStatusManager listens,
- The default behaviour is to use any free port.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerPort" value="0"/>
- <!--
- Use this to fix the address on which the TransactionStatusManager binds,
- The default behaviour is to use the loopback address (ie localhost).
- If running within an AS then the address the AS is bound to (jboss.bind.address) takes precedence
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerAddress" value=""/>
- <!--
- For cases where the recovery manager is in process with the transaction manager and nothing else uses
- the ObjectStore, it is possible to disable the socket based recovery listener by setting this to NO.
- Caution: use of this property can allow multiple recovery processes to run on the same ObjectStore
- if you are not careful. That in turn can lead to incorrect transaction processing. Use with care.
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryListener" value="YES"/>
- </properties>
- <properties depends="jta" name="jdbc">
- <!--
- property name="com.arjuna.ats.jdbc.isolationLevel" value="TRANSACTION_SERIALIZABLE"/>
- -->
- </properties>
-</transaction-service>
View
72 src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
@@ -0,0 +1,72 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.jms.server.recovery;
+
+import org.jboss.tm.XAResourceRecovery;
+
+import javax.transaction.xa.XAResource;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/20/11
+ */
+public class HornetQResourceRecovery implements XAResourceRecovery
+{
+ private final XARecoveryConfig config;
+
+ private final HornetQXAResourceWrapper resourceWrapper;
+
+ public HornetQResourceRecovery(XARecoveryConfig config)
+ {
+ this.config = config;
+ resourceWrapper = new HornetQXAResourceWrapper(config);
+ }
+
+ public XAResource[] getXAResources()
+ {
+ return new XAResource[]{resourceWrapper};
+ }
+
+ public void close()
+ {
+ resourceWrapper.close();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HornetQResourceRecovery that = (HornetQResourceRecovery) o;
+
+ if (config != null ? !config.equals(that.config) : that.config != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return config != null ? config.hashCode() : 0;
+ }
+}
View
224 src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
@@ -1,224 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.recovery;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.transaction.xa.XAResource;
-
-import com.arjuna.ats.jta.recovery.XAResourceRecovery;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.logging.Logger;
-
-/**
- *
- * A XAResourceRecovery instance that can be used to recover any JMS provider.
- *
- * In reality only recover,rollback and commit will be called but we still need to
- * be implement all methods just in case.
- *
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class HornetQXAResourceRecovery implements XAResourceRecovery
-{
- private final boolean trace = HornetQXAResourceRecovery.log.isTraceEnabled();
-
- private static final Logger log = Logger.getLogger(HornetQXAResourceRecovery.class);
-
- private boolean hasMore;
-
- private HornetQXAResourceWrapper res;
-
- public HornetQXAResourceRecovery()
- {
- if (trace)
- {
- HornetQXAResourceRecovery.log.trace("Constructing HornetQXAResourceRecovery");
- }
- }
-
- public boolean initialise(final String config)
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " intialise: " + config);
- }
-
- String[] configs = config.split(";");
- XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
- for (int i = 0, configsLength = configs.length; i < configsLength; i++)
- {
- String s = configs[i];
- ConfigParser parser = new ConfigParser(s);
- String connectorFactoryClassName = parser.getConnectorFactoryClassName();
- Map<String, Object> connectorParams = parser.getConnectorParameters();
- String username = parser.getUsername();
- String password = parser.getPassword();
- TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
- xaRecoveryConfigs[i] = new XARecoveryConfig(transportConfiguration, username, password);
- }
-
-
-
- res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
-
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " initialised");
- }
-
- return true;
- }
-
- public boolean hasMoreResources()
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " hasMoreResources");
- }
-
- /*
- * The way hasMoreResources is supposed to work is as follows:
- * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
- * true it will call getXAResource.
- * It will repeat that until hasMoreResources returns false.
- * Then the sweep is over.
- * For the next sweep hasMoreResources should return true, etc.
- *
- * In our case where we only need to return one XAResource per sweep,
- * hasMoreResources should basically alternate between true and false.
- *
- *
- */
-
- hasMore = !hasMore;
-
- return hasMore;
- }
-
- public XAResource getXAResource()
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " getXAResource");
- }
-
- return res;
- }
-
- public XAResource[] getXAResources()
- {
- return new XAResource[] { res };
- }
-
- @Override
- protected void finalize()
- {
- res.close();
- }
-
- public static class ConfigParser
- {
- private final String connectorFactoryClassName;
-
- private final Map<String, Object> connectorParameters;
-
- private String username;
-
- private String password;
-
- public ConfigParser(final String config)
- {
- if (config == null || config.length() == 0)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- String[] strings = config.split(",");
-
- // First (mandatory) param is the connector factory class name
- if (strings.length < 1)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- connectorFactoryClassName = strings[0].trim();
-
- // Next two (optional) parameters are the username and password to use for creating the session for recovery
-
- if (strings.length >= 2)
- {
-
- username = strings[1].trim();
- if (username.length() == 0)
- {
- username = null;
- }
-
- if (strings.length == 2)
- {
- throw new IllegalArgumentException("If username is specified, password must be specified too");
- }
-
- password = strings[2].trim();
- if (password.length() == 0)
- {
- password = null;
- }
- }
-
- // other tokens are for connector configurations
- connectorParameters = new HashMap<String, Object>();
- if (strings.length >= 3)
- {
- for (int i = 3; i < strings.length; i++)
- {
- String[] str = strings[i].split("=");
- if (str.length == 2)
- {
- connectorParameters.put(str[0].trim(), str[1].trim());
- }
- }
- }
- }
-
- public String getConnectorFactoryClassName()
- {
- return connectorFactoryClassName;
- }
-
- public Map<String, Object> getConnectorParameters()
- {
- return connectorParameters;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
- }
-}
View
26 src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
@@ -60,7 +60,7 @@
private XARecoveryConfig[] xaRecoveryConfigs;
- private TransportConfiguration currentConnection;
+ //private TransportConfiguration currentConnection;
public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
{
@@ -71,7 +71,7 @@ public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
public Xid[] recover(final int flag) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("Recover " + currentConnection);
+ HornetQXAResourceWrapper.log.debug("Recover " + xaResource);
try
{
return xaResource.recover(flag);
@@ -85,7 +85,7 @@ public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
public void commit(final Xid xid, final boolean onePhase) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("Commit " + currentConnection + " xid " + " onePhase=" + onePhase);
+ HornetQXAResourceWrapper.log.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
try
{
xaResource.commit(xid, onePhase);
@@ -99,7 +99,7 @@ public void commit(final Xid xid, final boolean onePhase) throws XAException
public void rollback(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("Rollback " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("Rollback " + xaResource + " xid ");
try
{
xaResource.rollback(xid);
@@ -113,7 +113,7 @@ public void rollback(final Xid xid) throws XAException
public void forget(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("Forget " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("Forget " + xaResource + " xid ");
try
{
xaResource.forget(xid);
@@ -145,7 +145,7 @@ public boolean isSameRM(XAResource xaRes) throws XAException
public int prepare(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("prepare " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("prepare " + xaResource + " xid ");
try
{
return xaResource.prepare(xid);
@@ -159,7 +159,7 @@ public int prepare(final Xid xid) throws XAException
public void start(final Xid xid, final int flags) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("start " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("start " + xaResource + " xid ");
try
{
xaResource.start(xid, flags);
@@ -173,7 +173,7 @@ public void start(final Xid xid, final int flags) throws XAException
public void end(final Xid xid, final int flags) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("end " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("end " + xaResource + " xid ");
try
{
xaResource.end(xid, flags);
@@ -187,7 +187,7 @@ public void end(final Xid xid, final int flags) throws XAException
public int getTransactionTimeout() throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + xaResource + " xid ");
try
{
return xaResource.getTransactionTimeout();
@@ -201,7 +201,7 @@ public int getTransactionTimeout() throws XAException
public boolean setTransactionTimeout(final int seconds) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + xaResource + " xid ");
try
{
return xaResource.setTransactionTimeout(seconds);
@@ -214,7 +214,7 @@ public boolean setTransactionTimeout(final int seconds) throws XAException
public void connectionFailed(final HornetQException me, boolean failedOver)
{
- HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + currentConnection + " will attempt reconnect on next pass",
+ HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + csf + " will attempt reconnect on next pass",
me);
close();
}
@@ -299,7 +299,7 @@ protected XAResource connect() throws Exception
try
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(xaRecoveryConfig.getTransportConfiguration());
+ serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -320,12 +320,10 @@ protected XAResource connect() throws Exception
synchronized (HornetQXAResourceWrapper.lock)
{
delegate = cs;
- currentConnection = xaRecoveryConfig.getTransportConfiguration();
}
return delegate;
}
- currentConnection = null;
throw new HornetQException(HornetQException.NOT_CONNECTED);
}
View
36 src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
@@ -0,0 +1,36 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.jms.server.recovery;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.ra.HornetQRAConnectionFactory;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/20/11
+ */
+public interface RecoveryRegistry
+{
+ void register(HornetQResourceRecovery resourceRecovery);
+
+ void unRegister(HornetQResourceRecovery resourceRecovery);
+}
View
37 src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
@@ -13,7 +13,7 @@
package org.hornetq.jms.server.recovery;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.jms.client.HornetQConnectionFactory;
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
@@ -24,20 +24,20 @@
*/
public class XARecoveryConfig
{
- private final TransportConfiguration transportConfiguration;
+ private final HornetQConnectionFactory hornetQConnectionFactory;
private final String username;
private final String password;
- public XARecoveryConfig(TransportConfiguration transportConfiguration, String username, String password)
+ public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
{
- this.transportConfiguration = transportConfiguration;
+ this.hornetQConnectionFactory = hornetQConnectionFactory;
this.username = username;
this.password = password;
}
- public TransportConfiguration getTransportConfiguration()
+ public HornetQConnectionFactory getHornetQConnectionFactory()
{
- return transportConfiguration;
+ return hornetQConnectionFactory;
}
public String getUsername()
@@ -49,4 +49,29 @@ public String getPassword()
{
return password;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ XARecoveryConfig that = (XARecoveryConfig) o;
+
+ if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+ return false;
+ if (password != null ? !password.equals(that.password) : that.password != null) return false;
+ if (username != null ? !username.equals(that.username) : that.username != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
+ result = 31 * result + (username != null ? username.hashCode() : 0);
+ result = 31 * result + (password != null ? password.hashCode() : 0);
+ return result;
+ }
}
View
2  src/main/org/hornetq/ra/HornetQRAManagedConnection.java
@@ -249,6 +249,8 @@ public void destroy() throws ResourceException
HornetQRAManagedConnection.log.debug("Error unsetting the exception listener " + this, e);
}
+ mcf.stop();
+
destroyHandles();
try
View
15 src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
@@ -30,6 +30,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
/**
* HornetQ ManagedConectionFactory
@@ -76,6 +77,11 @@
*/
private HornetQConnectionFactory connectionFactory;
+ /*
+ * The resource recovery if there is one
+ * */
+ private HornetQResourceRecovery resourceRecovery;
+
/**
* Constructor
*/
@@ -747,6 +753,7 @@ protected synchronized HornetQConnectionFactory getHornetQConnectionFactory() th
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
}
return connectionFactory;
}
@@ -791,4 +798,12 @@ private HornetQRAConnectionRequestInfo getCRI(final HornetQRAConnectionRequestIn
return info;
}
}
+
+ public void stop()
+ {
+ if(resourceRecovery != null)
+ {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
+ }
}
View
22 src/main/org/hornetq/ra/HornetQResourceAdapter.java
@@ -41,8 +41,11 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
+import org.hornetq.jms.server.recovery.XARecoveryConfig;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.ra.recovery.RecoveryManager;
/**
* The resource adapter for HornetQ
@@ -106,6 +109,8 @@
private String unparsedJndiParams;
+ RecoveryManager recoveryManager;
+
/**
* Constructor
*/
@@ -119,6 +124,7 @@ public HornetQResourceAdapter()
raProperties = new HornetQRAProperties();
configured = new AtomicBoolean(false);
activations = new ConcurrentHashMap<ActivationSpec, HornetQActivation>();
+ recoveryManager = new RecoveryManager();
}
public TransactionManager getTM()
@@ -209,6 +215,8 @@ public void start(final BootstrapContext ctx) throws ResourceAdapterInternalExce
locateTM();
+ recoveryManager.start();
+
this.ctx = ctx;
HornetQResourceAdapter.log.info("HornetQ resource adaptor started");
@@ -241,8 +249,12 @@ public void stop()
if (defaultHornetQConnectionFactory != null)
{
defaultHornetQConnectionFactory.close();
+
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
+ recoveryManager.stop();
+
HornetQResourceAdapter.log.info("HornetQ resource adapter stopped");
}
@@ -1381,6 +1393,13 @@ public ClientSession createSession(final ClientSessionFactory parameterFactory,
}
+
+
+ public RecoveryManager getRecoveryManager()
+ {
+ return recoveryManager;
+ }
+
/**
* Get the resource adapter properties
*
@@ -1402,6 +1421,7 @@ protected HornetQRAProperties getProperties()
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
+ recoveryManager.register(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1439,7 +1459,7 @@ public HornetQConnectionFactory createHornetQConnectionFactory(final ConnectionF
{
ha = HornetQClient.DEFAULT_IS_HA;
}
-
+
if (connectorClassName != null)
{
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
View
14 src/main/org/hornetq/ra/Util.java
@@ -19,7 +19,7 @@
import javax.transaction.TransactionManager;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.ClassloadingUtil;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
/**
* Various utility functions
@@ -257,7 +257,7 @@ public static TransactionManager locateTM(final String locatorClass, final Strin
{
try
{
- ClassLoader loader = HornetQResourceAdapter.class.getClassLoader();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class<?> aClass = loader.loadClass(locatorClass);
Object o = aClass.newInstance();
Method m = aClass.getMethod(locatorMethod);
@@ -266,14 +266,18 @@ public static TransactionManager locateTM(final String locatorClass, final Strin
catch (Throwable e)
{
log.debug(e.getMessage(), e);
+ return null;
}
+ }
+
+ public static RecoveryRegistry locateRecoveryRegistry(final String locatorClass)
+ {
try
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class<?> aClass = loader.loadClass(locatorClass);
Object o = aClass.newInstance();
- Method m = aClass.getMethod(locatorMethod);
- return (TransactionManager)m.invoke(o);
+ return (RecoveryRegistry)o;
}
catch (Throwable e)
{
@@ -281,6 +285,4 @@ public static TransactionManager locateTM(final String locatorClass, final Strin
return null;
}
}
-
-
}
View
12 src/main/org/hornetq/ra/inflow/HornetQActivation.java
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.Util;
@@ -106,7 +107,10 @@
// Whether we are in the failure recovery loop
private AtomicBoolean inFailure = new AtomicBoolean(false);
-
+
+ /** Used to unregister recovery once this endpoint has gone*/
+ private HornetQResourceRecovery resourceRecovery;
+
static
{
try
@@ -314,6 +318,11 @@ protected synchronized void teardown()
{
HornetQActivation.log.debug("Tearing down " + spec);
+ if(resourceRecovery != null)
+ {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
+
for (HornetQMessageHandler handler : handlers)
{
handler.teardown();
@@ -331,6 +340,7 @@ protected void setupCF() throws Exception
if (spec.isHasBeenUpdated())
{
factory = ra.createHornetQConnectionFactory(spec);
+ resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword());
}
else
{
View
164 src/main/org/hornetq/ra/recovery/RecoveryManager.java
@@ -0,0 +1,164 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.ra.recovery;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
+import org.hornetq.jms.server.recovery.XARecoveryConfig;
+import org.hornetq.ra.Util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/21/11
+ */
+public class RecoveryManager
+{
+ private static Logger log = Logger.getLogger(RecoveryManager.class);
+
+ private RecoveryRegistry registry;
+
+ private String resourceRecoveryClassNames = "org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry;org.jboss.as.messaging.jms.AS7RecoveryRegistry";
+
+ private Map<XARecoveryConfig, HornetQResourceRecovery> configMap = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
+
+ public void start()
+ {
+ locateRecoveryRegistry();
+ }
+
+ public HornetQResourceRecovery register(HornetQConnectionFactory factory, String userName, String password)
+ {
+ if(!isRegistered(factory))
+ {
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ registry.register(resourceRecovery);
+ configMap.put(xaRecoveryConfig, resourceRecovery);
+ return resourceRecovery;
+ }
+ return null;
+ }
+
+ public void unRegister(HornetQResourceRecovery resourceRecovery)
+ {
+ registry.unRegister(resourceRecovery);
+ resourceRecovery.close();
+ }
+
+ public void stop()
+ {
+ for (HornetQResourceRecovery hornetQResourceRecovery : configMap.values())
+ {
+ registry.unRegister(hornetQResourceRecovery);
+ hornetQResourceRecovery.close();
+ }
+ configMap.clear();
+ }
+
+ private void locateRecoveryRegistry()
+ {
+ String locatorClasses[] = resourceRecoveryClassNames.split(";");
+
+ for (int i = 0 ; i < locatorClasses.length; i++)
+ {
+ registry = Util.locateRecoveryRegistry(locatorClasses[i]);
+ if (registry != null)
+ {
+ break;
+ }
+ }
+
+ if (registry == null)
+ {
+ registry = new RecoveryRegistry()
+ {
+ public void register(HornetQResourceRecovery resourceRecovery)
+ {
+ //no op
+ }
+
+ public void unRegister(HornetQResourceRecovery xaRecoveryConfig)
+ {
+ //no op
+ }
+ };
+ }
+ else
+ {
+ log.debug("Recovery Registry located = " + registry);
+ }
+ }
+
+
+ public boolean isRegistered(HornetQConnectionFactory factory)
+ {
+ for (XARecoveryConfig xaRecoveryConfig : configMap.keySet())
+ {
+ TransportConfiguration[] transportConfigurations = factory.getServerLocator().getStaticTransportConfigurations();
+
+ if (transportConfigurations != null)
+ {
+ TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getStaticTransportConfigurations();
+ if(xaConfigurations == null)
+ {
+ break;
+ }
+ if(transportConfigurations.length != xaConfigurations.length)
+ {
+ break;
+ }
+ boolean theSame=true;
+ for(int i = 0; i < transportConfigurations.length; i++)
+ {
+ TransportConfiguration tc = transportConfigurations[i];
+ TransportConfiguration xaTc = xaConfigurations[i];
+ if(!tc.equals(xaTc))
+ {
+ theSame = false;
+ break;
+ }
+ }
+ if(theSame)
+ {
+ return theSame;
+ }
+ }
+ else
+ {
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getDiscoveryGroupConfiguration();
+ if(discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.