Skip to content

Commit

Permalink
modified transacted session behaviour after failover
Browse files Browse the repository at this point in the history
* when a session commits after being flagged as rolled back only during failover, automatically
  rolls back the session before throwing the HornetQException.TRANSACTION_ROLLED_BACK
* updated transaction-failover example
* added test in FailoverTest
  • Loading branch information
jmesnil committed Dec 4, 2009
1 parent 9ae251a commit 74cf822
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 15 deletions.
9 changes: 2 additions & 7 deletions examples/jms/transaction-failover/readme.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ <h1>JMS Failover With Transaction Example</h1>
<p>When a <em>transacted</em> JMS session is used, once-and-only once delivery is guaranteed.</p>
<ul>
<li>if the failover occurs while there is an in-flight transaction, the transaction will be flagged as <em>rollback only</em>. In that case, the JMS client
will need to rollback the session and retry the transaction work.</li>
will need to retry the transaction work.</li>
<li>if the failover occurs while there is <em>no</em> in-flight transaction, the failover will be transparent to the user.</li>
</ul>
<p>HornetQ also provides an example for <a href="../non-transactional-failover/readme.html">non-transaction failover</a>.</p>
Expand Down Expand Up @@ -80,15 +80,10 @@ <h2>Example step-by-step</h2>
session.commit();
} catch (TransactionRolledBackException e)
{
...
System.err.println("transaction has been rolled back: " + e.getMessage());
}
</pre>

<li>In the <code>catch (TransactionRolledBackException e)</code> block, we rollback the session
<pre class="prettyprint">
session.rollback();
</pre>

<li>We resend all the messages</li>
<pre class="prettyprint">
sendMessages(session, producer, numMessages, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,22 @@ public boolean runExample() throws Exception
// Step 8. We send half of the messages, kill the live server and send the remaining messages
sendMessages(session, producer, numMessages, true);

// Step 9. As failover occured during transaction, the session has been marked for rollback only
try
{
// Step 9. As failover occured during transaction, the session has been marked for rollback only
session.commit();
} catch (TransactionRolledBackException e)
{
// Step 10. We rollback the transaction
session.rollback();
System.err.println("transaction has been rolled back: " + e.getMessage());
}

// Step 11. We resend all the messages
// Step 10. We resend all the messages
sendMessages(session, producer, numMessages, false);
// Step 12. We commit the session succesfully: the messages will be all delivered to the activated backup server
// Step 11. We commit the session succesfully: the messages will be all delivered to the activated backup server
session.commit();


// Step 13. We are now transparently reconnected to server #0, the backup server.
// Step 12. We are now transparently reconnected to server #0, the backup server.
// We consume the messages sent before the crash of the live server and commit the session.
for (int i = 0; i < numMessages; i++)
{
Expand All @@ -102,7 +101,7 @@ public boolean runExample() throws Exception
}
finally
{
// Step 14. Be sure to close our resources!
// Step 13. Be sure to close our resources!

if (connection != null)
{
Expand Down
2 changes: 2 additions & 0 deletions src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ public void commit() throws HornetQException

if (rollbackOnly)
{
rollback(false);

throw new HornetQException(TRANSACTION_ROLLED_BACK,
"The transaction was rolled back on failover to a backup server");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.hornetq.tests.integration.cluster.failover;

import static org.hornetq.tests.util.RandomUtil.randomInt;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -42,6 +44,7 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.SimpleString;

/**
Expand Down Expand Up @@ -328,6 +331,90 @@ public void connectionFailed(HornetQException me)
assertEquals(0, sf.numConnections());
}

/**
* Test that once the transacted session has throw a TRANSACTION_ROLLED_BACK exception,
* it can be reused again
*/
public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();

sf.setBlockOnNonPersistentSend(true);
sf.setBlockOnPersistentSend(true);

ClientSession session = sf.createSession(false, false);

session.createQueue(ADDRESS, ADDRESS, null, true);

final CountDownLatch latch = new CountDownLatch(1);

class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
latch.countDown();
}

}

session.addFailureListener(new MyListener());

ClientProducer producer = session.createProducer(ADDRESS);

final int numMessages = 100;

for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(i % 2 == 0);

setBody(i, message);

message.putIntProperty("counter", i);

producer.send(message);
}

fail(session, latch);

assertTrue(session.isRollbackOnly());

try
{
session.commit();

fail("Should throw exception");
}
catch (HornetQException e)
{
assertEquals(HornetQException.TRANSACTION_ROLLED_BACK, e.getCode());
}

ClientMessage message = session.createClientMessage(false);
int counter = randomInt();
message.putIntProperty("counter", counter);

producer.send(message);

// session is working again
session.commit();

session.start();

ClientConsumer consumer = session.createConsumer(ADDRESS);

session.start();

message = consumer.receiveImmediate();

assertNotNull(message);
assertEquals(counter, message.getIntProperty("counter").intValue());

session.close();

assertEquals(0, sf.numSessions());

assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();
Expand Down Expand Up @@ -409,7 +496,7 @@ public void connectionFailed(HornetQException me)
assertEquals(0, sf.numConnections());
}

public void testTransactedMessagesWithConsumerStartedBedoreFailover() throws Exception
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();

Expand Down

0 comments on commit 74cf822

Please sign in to comment.