From 4d242a7c7b8241c894a67ddb7165af260aa81fe2 Mon Sep 17 00:00:00 2001 From: jbertram Date: Thu, 10 Apr 2014 17:06:10 -0500 Subject: [PATCH] HORNETQ-1342 - Clean up DuplicateIDCache on queue removal --- .../core/postoffice/DuplicateIDCache.java | 2 + .../postoffice/impl/DuplicateIDCacheImpl.java | 23 +++- .../core/postoffice/impl/PostOfficeImpl.java | 17 +++ .../integration/DuplicateDetectionTest.java | 101 +++++++++++++++++- 4 files changed, 139 insertions(+), 4 deletions(-) diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/DuplicateIDCache.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/DuplicateIDCache.java index 478978b5387..5af293a00e6 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/DuplicateIDCache.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/DuplicateIDCache.java @@ -37,4 +37,6 @@ public interface DuplicateIDCache void load(List> theIds) throws Exception; void load(final Transaction tx, final byte[] duplID); + + void clear() throws Exception; } diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java index 56e3cd23c18..99406288b49 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -33,7 +33,7 @@ * * @author Tim Fox * - * Created 8 Dec 2008 16:35:55 + * Created 8 Dec 2008 16:35:55 */ public class DuplicateIDCacheImpl implements DuplicateIDCache { @@ -117,7 +117,7 @@ public void load(final List> theIds) throws Exception } - public void deleteFromCache(byte [] duplicateID) throws Exception + public void deleteFromCache(byte[] duplicateID) throws Exception { ByteArrayHolder bah = new ByteArrayHolder(duplicateID); @@ -241,6 +241,23 @@ private synchronized void addToCacheInMemory(final byte[] duplID, final long rec } } + public void clear() throws Exception + { + synchronized (this) + { + long tx = storageManager.generateUniqueID(); + for (Pair id : ids) + { + storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + } + storageManager.commit(tx); + + ids.clear(); + cache.clear(); + pos = 0; + } + } + private final class AddDuplicateIDOperation extends TransactionOperationAbstract { final byte[] duplID; @@ -296,7 +313,7 @@ public boolean equals(final Object other) { if (other instanceof ByteArrayHolder) { - ByteArrayHolder s = (ByteArrayHolder)other; + ByteArrayHolder s = (ByteArrayHolder) other; if (bytes.length != s.bytes.length) { diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java index 670fe5e1a9f..8bd431dd8e9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java @@ -500,6 +500,8 @@ public synchronized Binding removeBinding(final SimpleString uniqueName, Transac pagingManager.deletePageStore(binding.getAddress()); managementService.unregisterAddress(binding.getAddress()); + + deleteDuplicateCache(binding.getAddress()); } if (binding.getType() == BindingType.LOCAL_QUEUE) @@ -542,6 +544,16 @@ else if (binding.getType() == BindingType.DIVERT) return binding; } + private void deleteDuplicateCache(SimpleString address) throws Exception + { + DuplicateIDCache cache = duplicateIDCaches.remove(address); + + if (cache != null) + { + cache.clear(); + } + } + @Override public boolean isAddressBound(final SimpleString address) throws Exception { @@ -825,6 +837,11 @@ public DuplicateIDCache getDuplicateIDCache(final SimpleString address) return cache; } + public ConcurrentMap getDuplicateIDCaches() + { + return duplicateIDCaches; + } + public Object getNotificationLock() { return notificationLock; diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java index 38246f73aa9..fc66974052a 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java @@ -29,6 +29,7 @@ import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.config.Configuration; +import org.hornetq.core.postoffice.impl.PostOfficeImpl; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.transaction.impl.XidImpl; import org.hornetq.tests.util.ServiceTestBase; @@ -126,6 +127,105 @@ public void testSimpleDuplicateDetecion() throws Exception locator.close(); } + @Test + public void testDuplicateIDCacheMemoryRetentionForNonTemporaryQueues() throws Exception + { + testDuplicateIDCacheMemoryRetention(false); + } + + @Test + public void testDuplicateIDCacheMemoryRetentionForTemporaryQueues() throws Exception + { + testDuplicateIDCacheMemoryRetention(true); + } + + @Test + public void testDuplicateIDCacheJournalRetentionForNonTemporaryQueues() throws Exception + { + testDuplicateIDCacheMemoryRetention(false); + + messagingService.stop(); + + messagingService.start(); + + Assert.assertEquals(0, ((PostOfficeImpl) messagingService.getPostOffice()).getDuplicateIDCaches().size()); + } + + @Test + public void testDuplicateIDCacheJournalRetentionForTemporaryQueues() throws Exception + { + testDuplicateIDCacheMemoryRetention(true); + + messagingService.stop(); + + messagingService.start(); + + Assert.assertEquals(0, ((PostOfficeImpl) messagingService.getPostOffice()).getDuplicateIDCaches().size()); + } + + public void testDuplicateIDCacheMemoryRetention(boolean temporary) throws Exception + { + final int TEST_SIZE = 100; + + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY)); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + session.start(); + + Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); + + final SimpleString addressName = new SimpleString("DuplicateDetectionTestAddress"); + + for (int i = 0; i < TEST_SIZE; i++) + { + final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue_" + i); + + if (temporary) + { + session.createTemporaryQueue(addressName, queueName, null); + } + else + { + session.createQueue(addressName, queueName, null, true); + } + + ClientProducer producer = session.createProducer(addressName); + + ClientConsumer consumer = session.createConsumer(queueName); + + ClientMessage message = createMessage(session, 1); + SimpleString dupID = new SimpleString("abcdefg"); + message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData()); + producer.send(message); + ClientMessage message2 = consumer.receive(1000); + Assert.assertEquals(1, message2.getObjectProperty(propKey)); + + message = createMessage(session, 2); + message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData()); + producer.send(message); + message2 = consumer.receiveImmediate(); + Assert.assertNull(message2); + + producer.close(); + consumer.close(); + + Assert.assertEquals(1, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); + session.deleteQueue(queueName); + Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); + } + + session.close(); + + sf.close(); + + locator.close(); + + Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size()); + } + @Test public void testSimpleDuplicateDetectionWithString() throws Exception { @@ -558,7 +658,6 @@ public void testRollbackThenSend() throws Exception session.commit(); - message = consumer.receive(5000); assertNotNull(message); assertTrue(message.getStringProperty("key").equals(dupID1.toString()));