Skip to content

Commit

Permalink
Implementing Failure checks on XA for Bridges and other XA Failures
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 2, 2013
1 parent e9b4175 commit 495ceb9
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 3 deletions.
Expand Up @@ -315,4 +315,7 @@ public interface HornetQMessageBundle

@Message(id = 119081, value = "No Discovery Group configuration named {0} found", format = Message.Format.MESSAGE_FORMAT)
HornetQException noDiscoveryGroupFound(DiscoveryGroupConfiguration dg);

@Message(id = 119082, value = "No Transaction set on the XA Session", format = Message.Format.MESSAGE_FORMAT)
HornetQException noTXSet();
}
Expand Up @@ -623,6 +623,11 @@ public void forceConsumerDelivery(final long consumerID, final long sequence) th

public void acknowledge(final long consumerID, final long messageID) throws Exception
{

// it will create a Null TX if not started.
// to avoid Failover issues with reconnected Failover
checkNullXA();

ServerConsumer consumer = consumers.get(consumerID);

if (consumer == null)
Expand Down Expand Up @@ -807,6 +812,8 @@ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))

public synchronized void xaEnd(final Xid xid) throws Exception
{
checkFailedXA();

if (tx != null && tx.getXid().equals(xid))
{
if (tx.getState() == Transaction.State.SUSPENDED)
Expand Down Expand Up @@ -884,6 +891,8 @@ public synchronized void xaForget(final Xid xid) throws Exception

public synchronized void xaJoin(final Xid xid) throws Exception
{
checkFailedXA();

Transaction theTx = resourceManager.getTransaction(xid);

if (theTx == null)
Expand All @@ -907,6 +916,8 @@ public synchronized void xaJoin(final Xid xid) throws Exception

public synchronized void xaResume(final Xid xid) throws Exception
{
checkFailedXA();

if (tx != null)
{
final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
Expand Down Expand Up @@ -1018,6 +1029,8 @@ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))

public synchronized void xaStart(final Xid xid) throws Exception
{
checkFailedXA();

if (tx != null)
{
final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
Expand All @@ -1044,9 +1057,30 @@ public synchronized void xaStart(final Xid xid) throws Exception
}
}

private void checkFailedXA() throws HornetQException
{
// this could happen over a failed TX, after failover for instance
if (tx != null && tx.isRollbackOnly())
{
try
{
tx.rollback();
}
catch (Exception e)
{
HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
}
HornetQException ex = new HornetQException(XAException.XAER_NOTA, tx.getPendingException() != null ? tx.getPendingException().getMessage() : "There was a failure on the XA Transaction");
tx = null;
throw ex;
}
}

public synchronized void xaSuspend() throws Exception
{

checkFailedXA();

if (isTrace)
{
HornetQServerLogger.LOGGER.trace("xasuspend on " + this.tx);
Expand Down Expand Up @@ -1219,6 +1253,11 @@ public void receiveConsumerCredits(final long consumerID, final int credits) thr

public void sendLarge(final MessageInternal message) throws Exception
{
if (!checkNullXA())
{
return;
}

// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();

Expand All @@ -1239,6 +1278,11 @@ public void sendLarge(final MessageInternal message) throws Exception

public void send(final ServerMessage message, final boolean direct) throws Exception
{
if (!checkNullXA())
{
return;
}

//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage())
Expand Down Expand Up @@ -1294,6 +1338,23 @@ public void send(final ServerMessage message, final boolean direct) throws Excep
}
}

private boolean checkNullXA()
{
if (xa && this.tx == null)
{
tx = newTransaction();
tx.markAsRollbackOnly(HornetQMessageBundle.BUNDLE.noTXSet());
return false;
}
else
if (tx != null && tx.isRollbackOnly())
{
return false;
}

return true;
}

public void sendContinuations(final int packetSize,
final long messageBodySize,
final byte[] body,
Expand Down
Expand Up @@ -32,6 +32,8 @@ static enum State
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
}

Exception getPendingException();

void prepare() throws Exception;

void commit() throws Exception;
Expand All @@ -50,6 +52,8 @@ static enum State

State getState();

boolean isRollbackOnly();

void setState(State state);

void markAsRollbackOnly(HornetQException exception);
Expand Down
Expand Up @@ -142,6 +142,11 @@ public long getCreateTime()
return createTime;
}

public Exception getPendingException()
{
return exception;
}

public boolean hasTimedOut(final long currentTime,final int defaultTimeout)
{
if(timeoutSeconds == - 1)
Expand Down Expand Up @@ -431,6 +436,11 @@ public void markAsRollbackOnly(final HornetQException exception1)
this.exception = exception1;
}

public boolean isRollbackOnly()
{
return state == State.ROLLBACK_ONLY || state == State.ROLLEDBACK;
}

public synchronized void addOperation(final TransactionOperation operation)
{
checkCreateOperations();
Expand Down
Expand Up @@ -1186,6 +1186,47 @@ public void testXAMessagesSentSoRollbackOnEnd() throws Exception
Assert.assertNull(message);
}

//start a tx but sending messages after crash
public void testXAMessagesSentSoRollbackOnEnd2() throws Exception
{
createSessionFactory();

ClientSession session = createSession(sf, true, false, false);

Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);

ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);

session.start(xid, XAResource.TMNOFLAGS);

crash(session);

// sendMessagesSomeDurable(session, producer);

producer.send(createMessage(session, 1, true));

try
{
session.end(xid, XAResource.TMSUCCESS);

Assert.fail("Should throw exception");
}
catch (XAException e)
{
// Assert.assertEquals(XAException.XAER_NOTA, e.errorCode);
}

ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);

session.start();

ClientMessage message = consumer.receiveImmediate();

Assert.assertNull(message);
}

public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
createSessionFactory();
Expand Down Expand Up @@ -1354,6 +1395,50 @@ public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
}
}

public void testXAMessagesConsumedSoRollbackOnEnd2() throws Exception
{
createSessionFactory();

ClientSession session1 = createSessionAndQueue();

ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);

for (int i = 0; i < NUM_MESSAGES; i++)
{
// some are durable, some are not!
producer.send(createMessage(session1, i, true));
}

session1.commit();

ClientSession session2 = createSession(sf, true, false, false);

ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);

session2.start();

Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());

session2.start(xid, XAResource.TMNOFLAGS);

crash(session2);

receiveMessages(consumer);

try
{
session2.end(xid, XAResource.TMSUCCESS);

Assert.fail("Should throw exception");
}
catch (XAException e)
{
}

// Since the end was not accepted, the messages should be redelivered
receiveMessages(consumer);
}

public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
{
createSessionFactory();
Expand Down
Expand Up @@ -82,10 +82,11 @@ public void crash(ClientSession... sessions) throws Exception
public void crash(boolean waitFailure, ClientSession... sessions) throws Exception
{
CountDownLatch latch = new CountDownLatch(sessions.length);
for (ClientSession session : sessions)
CountDownSessionFailureListener listeners[] = new CountDownSessionFailureListener[sessions.length];
for (int i = 0; i < sessions.length; i++)
{
CountDownSessionFailureListener listener = new CountDownSessionFailureListener(latch);
session.addFailureListener(listener);
listeners[i] = new CountDownSessionFailureListener(latch);
sessions[i].addFailureListener(listeners[i]);
}

ClusterManager clusterManager = server.getClusterManager();
Expand All @@ -101,6 +102,10 @@ public void crash(boolean waitFailure, ClientSession... sessions) throws Excepti
Assert.assertTrue("Failed to stop the server! Latch count is " + latch.getCount() + " out of " +
sessions.length, ok);
}
for (int i = 0; i < sessions.length; i++)
{
sessions[i].removeFailureListener(listeners[i]);
}
}

public HornetQServer getServer()
Expand Down
Expand Up @@ -169,6 +169,12 @@ public State getState()
return null;
}

@Override
public boolean isRollbackOnly()
{
return false;
}

public Xid getXid()
{
return null;
Expand All @@ -179,6 +185,12 @@ public void markAsRollbackOnly(final HornetQException exception)

}

@Override
public Exception getPendingException()
{
return null;
}

public void prepare() throws Exception
{

Expand Down

0 comments on commit 495ceb9

Please sign in to comment.