Permalink
Browse files

Fixed bug with duplicate detection on depage and transactional send

  • Loading branch information...
1 parent 02dee30 commit 7391970db4b1f3ff81859700b6a2e1918fa26661 @clebertsuconic clebertsuconic committed Dec 6, 2009
@@ -49,6 +49,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.SimpleString;
@@ -282,7 +283,7 @@ public void returnProducerCredits(final int credits)
}
public void addSize(final ServerMessage message, final boolean add)
- {
+ {
long size = message.getMemoryEstimate();
if (add)
@@ -302,7 +303,7 @@ public void addSize(final ServerMessage message, final boolean add)
public void addSize(final MessageReference reference, final boolean add)
{
long size = MessageReferenceImpl.getMemoryEstimate();
-
+
if (add)
{
checkReleaseProducerFlowControlCredits(size);
@@ -400,20 +401,20 @@ public synchronized void stop() throws Exception
if (running)
{
running = false;
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
executor.execute(new Runnable()
{
public void run()
{
latch.countDown();
}
});
-
+
if (!latch.await(60, TimeUnit.SECONDS))
{
- log.warn("Timed out on waiting PagingStore " + this.address + " to shutdown");
+ log.warn("Timed out on waiting PagingStore " + this.address + " to shutdown");
}
if (currentPage != null)
@@ -726,7 +727,7 @@ private synchronized void checkReleaseProducerFlowControlCredits(final long size
}
}
- private void addSize(final long size)
+ private void addSize(final long size)
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
{
@@ -948,55 +949,67 @@ private boolean onDepage(final int pageId, final SimpleString destination, final
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
+ postOffice.route(message, depageTransaction);
+
+ // This means the page is duplicated. So we need to ignore this
+ if (depageTransaction.getState() == State.ROLLBACK_ONLY)
+ {
+ break;
+ }
+
+ PageTransactionInfo pageUserTransaction = null;
+
if (transactionIdDuringPaging >= 0)
{
- final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
+ pageUserTransaction = pagingManager.getTransaction(transactionIdDuringPaging);
- if (pageTransactionInfo == null)
+ if (pageUserTransaction == null)
{
- log.warn("Transaction " + pagedMessage.getTransactionID() +
- " used during paging not found, ignoring message " +
- message);
+ // This is not supposed to happen
+ log.warn("Transaction " + pagedMessage.getTransactionID() + " used during paging not found");
continue;
}
+ else
+ {
- // This is to avoid a race condition where messages are depaged
- // before the commit arrived
+ // This is to avoid a race condition where messages are depaged
+ // before the commit arrived
- while (running && !pageTransactionInfo.waitCompletion(500))
- {
- // This is just to give us a chance to interrupt the process..
- // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
- // the shutdown of the server
- if (isTrace)
+ while (running && !pageUserTransaction.waitCompletion(500))
{
- trace("Waiting pageTransaction to complete");
+ // This is just to give us a chance to interrupt the process..
+ // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
+ // the shutdown of the server
+ if (isTrace)
+ {
+ trace("Waiting pageTransaction to complete");
+ }
}
- }
- if (!running)
- {
- break;
- }
+ if (!running)
+ {
+ break;
+ }
- if (!pageTransactionInfo.isCommit())
- {
- if (isTrace)
+ if (!pageUserTransaction.isCommit())
{
- trace("Rollback was called after prepare, ignoring message " + message);
+ if (isTrace)
+ {
+ trace("Rollback was called after prepare, ignoring message " + message);
+ }
+ continue;
}
- continue;
}
- // Update information about transactions
- if (message.isDurable())
- {
- pageTransactionInfo.decrement();
- pageTransactionsToUpdate.add(pageTransactionInfo);
- }
}
- postOffice.route(message, depageTransaction);
+ // Update information about transactions
+ // This needs to be done after routing because of duplication detection
+ if (pageUserTransaction != null && message.isDurable())
+ {
+ pageUserTransaction.decrement();
+ pageTransactionsToUpdate.add(pageUserTransaction);
+ }
}
if (!running)
@@ -1023,7 +1036,7 @@ private boolean onDepage(final int pageId, final SimpleString destination, final
}
depageTransaction.commit();
-
+
storageManager.waitOnOperations();
if (isTrace)
@@ -95,6 +95,9 @@ private static void trace(String msg)
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
+
+ // Used on tests, to simulate failures on delete pages
+ private boolean deletePages = true;
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
@@ -285,6 +288,12 @@ public void compareJournalInformation(JournalLoadInformation[] journalInformatio
}
}
+
+ /** Used on tests only. To simulate missing page deletes*/
+ public void setDeletePages(final boolean deletePages)
+ {
+ this.deletePages = deletePages;
+ }
/**
* @param journalInformation
@@ -504,7 +513,10 @@ private void handlePageEvent(final ReplicationPageEventMessage packet) throws Ex
{
if (packet.isDelete())
{
- page.delete();
+ if (deletePages)
+ {
+ page.delete();
+ }
}
else
{
@@ -854,6 +854,12 @@ public GroupingHandler getGroupingHandler()
// Public
// ---------------------------------------------------------------------------------------
+ /** For tests only */
+ public ReplicationEndpoint getReplicationEndpoint()
+ {
+ return this.replicationEndpoint;
+ }
+
// Package protected
// ----------------------------------------------------------------------------
@@ -17,7 +17,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -29,7 +28,9 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.utils.SimpleString;
@@ -120,10 +121,20 @@ public void beforeReconnect(HornetQException exception)
session.commit();
+ ReplicationEndpointImpl endpoint = null;
+
if (failBeforeConsume)
{
failSession(session, latch);
}
+ else
+ {
+ endpoint = (ReplicationEndpointImpl)((HornetQServerImpl)server1Service).getReplicationEndpoint();
+ if (endpoint != null)
+ {
+ endpoint.setDeletePages(false);
+ }
+ }
session.start();
@@ -145,6 +156,11 @@ public void beforeReconnect(HornetQException exception)
session.commit();
+ if (endpoint != null)
+ {
+ endpoint.setDeletePages(true);
+ }
+
if (!failBeforeConsume)
{
failSession(session, latch);
@@ -69,18 +69,31 @@
public void testCrashDuringDeleteFile() throws Exception
{
- pageAndFail();
+ doTestCrashDuringDeleteFile(false);
+ }
+
+ public void testCrashDuringDeleteFileTransacted() throws Exception
+ {
+ doTestCrashDuringDeleteFile(true);
+ }
+
+ public void doTestCrashDuringDeleteFile(final boolean transacted) throws Exception
+ {
+ pageAndFail(transacted);
File pageDir = new File(getPageDir());
File directories[] = pageDir.listFiles();
assertEquals(1, directories.length);
- // When depage happened, a new empty page was supposed to be opened, what will create 3 files
- assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd page",
- 3,
- directories[0].list().length);
+ if (!transacted)
+ {
+ // When depage happened, a new empty page was supposed to be opened, what will create 3 files
+ assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd page",
+ 3,
+ directories[0].list().length);
+ }
Configuration config = createDefaultConfig();
@@ -122,7 +135,7 @@ public void testCrashDuringDeleteFile() throws Exception
/** This method will leave garbage on paging.
* It will not delete page files as if the server crashed right after commit,
* and before removing the file*/
- private void pageAndFail() throws Exception
+ private void pageAndFail(final boolean transacted) throws Exception
{
clearData();
Configuration config = createDefaultConfig();
@@ -142,22 +155,26 @@ private void pageAndFail() throws Exception
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ ClientSession session = sf.createSession(null, null, false, !transacted, !transacted, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true);
ClientProducer producer = session.createProducer(ADDRESS);
ClientMessage message = session.createClientMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
-
+
PagingStore store = server.getPostOffice().getPagingManager().getPageStore(ADDRESS);
int messages = 0;
while (!store.isPaging())
{
producer.send(message);
messages++;
+ if (transacted && messages % 100 == 0)
+ {
+ session.commit();
+ }
}
for (int i = 0; i < 2; i++)
@@ -166,6 +183,8 @@ private void pageAndFail() throws Exception
producer.send(message);
}
+ session.commit();
+
session.close();
assertTrue(server.getPostOffice().getPagingManager().getTotalMemory() > 0);

0 comments on commit 7391970

Please sign in to comment.