Skip to content
This repository has been archived by the owner on Jun 3, 2019. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
[JBTM-2617] Make JMS recovery helper to manage connection for every r…
…ecovery call
  • Loading branch information
Gytis Trikleris committed Feb 15, 2016
1 parent e70df2d commit 781869d
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 71 deletions.
Expand Up @@ -24,8 +24,9 @@
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
import org.jboss.logging.Logger;
Expand All @@ -34,7 +35,7 @@
* @author <a href="mailto:gytis@redhat.com">Gytis Trikleris</a>
*/
public class JmsXAResourceRecoveryHelper implements XAResourceRecoveryHelper,
AutoCloseable {
XAResource {

private static final Logger LOGGER =
Logger.getLogger(JmsXAResourceRecoveryHelper.class);
Expand All @@ -43,8 +44,6 @@ public class JmsXAResourceRecoveryHelper implements XAResourceRecoveryHelper,

private XAConnection xaConnection;

private XASession xaSession;

public JmsXAResourceRecoveryHelper(XAConnectionFactory xaConnectionFactory) {
this.xaConnectionFactory = xaConnectionFactory;
}
Expand All @@ -55,75 +54,125 @@ public boolean initialise(String properties) {
LOGGER.trace("Initialise with properties=" + properties);
}

return initialiseConnection() && initialiseSession();
return true;
}

@Override
public XAResource[] getXAResources() {
XAResource xaResource = xaSession.getXAResource();

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Returning XA resource: " + xaResource);
LOGGER.trace("Returning XA resource: " + this);
}

return new XAResource[] {
xaResource
};
return new XAResource[] { this };
}

@Override
public void close() {
public void start(Xid xid, int i) throws XAException {
try {
xaSession.close();
} catch (JMSException e) {
LOGGER.warn(e.getMessage(), e);
getDelegate().start(xid, i);
} finally {
disconnect();
}
}

@Override
public void end(Xid xid, int i) throws XAException {
try {
xaConnection.close();
} catch (JMSException e) {
LOGGER.warn(e.getMessage(), e);
getDelegate().end(xid, i);
} finally {
disconnect();
}
}

private boolean initialiseConnection() {
if (xaConnection != null) {
return true;
@Override
public int prepare(Xid xid) throws XAException {
try {
return getDelegate().prepare(xid);
} finally {
disconnect();
}
}

@Override
public void commit(Xid xid, boolean b) throws XAException {
try {
// TODO it's not efficient to keep connection forever
xaConnection = xaConnectionFactory.createXAConnection();
} catch (JMSException e) {
LOGGER.warn(e.getMessage(), e);
return false;
getDelegate().commit(xid, b);
} finally {
disconnect();
}
}

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Created xa connection: " + xaConnection);
@Override
public void rollback(Xid xid) throws XAException {
try {
getDelegate().rollback(xid);
} finally {
disconnect();
}
}

return true;
@Override
public Xid[] recover(int i) throws XAException {
try {
return getDelegate().recover(i);
} finally {
disconnect();
}
}

private boolean initialiseSession() {
if (xaSession != null) {
return true;
@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
try {
return getDelegate().isSameRM(xaResource);
} finally {
disconnect();
}
}

@Override
public void forget(Xid xid) throws XAException {
try {
// TODO it's not efficient to keep session forever
xaSession = xaConnection.createXASession();
} catch (JMSException e) {
LOGGER.warn(e.getMessage(), e);
return false;
getDelegate().forget(xid);
} finally {
disconnect();
}
}

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Created xa session: " + xaSession);
@Override
public int getTransactionTimeout() throws XAException {
try {
return getDelegate().getTransactionTimeout();
} finally {
disconnect();
}
}

return true;
@Override
public boolean setTransactionTimeout(int i) throws XAException {
try {
return getDelegate().setTransactionTimeout(i);
} finally {
disconnect();
}
}

private XAResource getDelegate() throws XAException {
try {
xaConnection = xaConnectionFactory.createXAConnection();
return xaConnection.createXASession().getXAResource();
} catch (JMSException e) {
LOGGER.warn("Failed to get delegate XA resource: " + e.getMessage());
throw new XAException(e.getMessage());
}
}

private void disconnect() throws XAException {
try {
xaConnection.close();
} catch (JMSException e) {
LOGGER.warn("Failed to close connection: " + e.getMessage());
throw new XAException(e.getMessage());
}
}

}
Expand Up @@ -21,7 +21,6 @@
*/
package org.jboss.narayana.jta.jms.classic;

import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand All @@ -35,7 +34,6 @@
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -62,7 +60,6 @@ public class JmsXAResourceRecoveryHelperTests {
@Before
public void before() throws Exception {
MockitoAnnotations.initMocks(this);

when(xaConnectionFactoryMock.createXAConnection())
.thenReturn(xaConnectionMock);
when(xaConnectionMock.createXASession()).thenReturn(xaSessionMock);
Expand All @@ -72,29 +69,110 @@ public void before() throws Exception {
}

@Test
public void shouldInitialiseOnlyOnce() throws Exception {
assertTrue(recoveryHelper.initialise(null));
assertTrue(recoveryHelper.initialise(null));
public void shouldGetXAResource() throws Exception {
XAResource[] xaResources = recoveryHelper.getXAResources();

assertEquals(1, xaResources.length);
assertThat(xaResources[0], sameInstance(recoveryHelper));
}

@Test
public void shouldStart() throws Exception {
recoveryHelper.start(null, 0);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).start(null, 0);
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldGetXAResource() throws Exception {
recoveryHelper.initialise(null);
public void shouldEnd() throws Exception {
recoveryHelper.end(null, 0);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).end(null, 0);
verify(xaConnectionMock, times(1)).close();
}

XAResource[] xaResources = recoveryHelper.getXAResources();
@Test
public void shouldPrepare() throws Exception {
recoveryHelper.prepare(null);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).prepare(null);
verify(xaConnectionMock, times(1)).close();
}

assertEquals(1, xaResources.length);
assertThat(xaResources[0], sameInstance(xaResourceMock));
@Test
public void shouldCommit() throws Exception {
recoveryHelper.commit(null, true);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).commit(null, true);
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldRollback() throws Exception {
recoveryHelper.rollback(null);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).rollback(null);
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldRecover() throws Exception {
recoveryHelper.recover(0);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).recover(0);
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldCloseResources() throws Exception {
recoveryHelper.initialise(null);
recoveryHelper.close();
verify(xaSessionMock, times(1)).close();
public void shouldCheckSameRM() throws Exception {
recoveryHelper.isSameRM(null);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).isSameRM(null);
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldForget() throws Exception {
recoveryHelper.forget(null);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).forget(null);
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldGetTransactionTimeout() throws Exception {
recoveryHelper.getTransactionTimeout();
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).getTransactionTimeout();
verify(xaConnectionMock, times(1)).close();
}

@Test
public void shouldSetTransactionTimeout() throws Exception {
recoveryHelper.setTransactionTimeout(0);
verify(xaConnectionFactoryMock, times(1)).createXAConnection();
verify(xaConnectionMock, times(1)).createXASession();
verify(xaSessionMock, times(1)).getXAResource();
verify(xaResourceMock, times(1)).setTransactionTimeout(0);
verify(xaConnectionMock, times(1)).close();
}

Expand Down
Expand Up @@ -57,14 +57,7 @@ public abstract class AbstractIntegrationTests {

protected Queue queue;

protected JmsXAResourceRecoveryHelper jmsXAResourceRecoveryHelper;

protected void closeResources() {
if (jmsXAResourceRecoveryHelper != null) {
jmsXAResourceRecoveryHelper.close();
jmsXAResourceRecoveryHelper = null;
}

if (connection != null) {
try {
connection.close();
Expand Down Expand Up @@ -132,15 +125,8 @@ protected void initJms() throws Exception {
}

protected void initJmsRecovery() throws Exception {
if (jmsXAResourceRecoveryHelper != null) {
throw new IntegrationTestRuntimeException(
"JmsXAResourceRecoveryHelper is already registered");
}

jmsXAResourceRecoveryHelper = new JmsXAResourceRecoveryHelper(
(XAConnectionFactory) jmsServer.lookup(JmsHelper.FACTORY_NAME));
jmsXAResourceRecoveryHelper.initialise(null);
registerRecoveryHelper(jmsXAResourceRecoveryHelper);
registerRecoveryHelper(new JmsXAResourceRecoveryHelper(
(XAConnectionFactory) jmsServer.lookup(JmsHelper.FACTORY_NAME)));
}

protected void registerRecoveryHelper(XAResourceRecoveryHelper helper) {
Expand Down

0 comments on commit 781869d

Please sign in to comment.