Skip to content

Commit

Permalink
HORNETQ-1342 - Clean up DuplicateIDCache on queue removal
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Apr 14, 2014
1 parent 57e9f5c commit 4d242a7
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 4 deletions.
Expand Up @@ -37,4 +37,6 @@ public interface DuplicateIDCache
void load(List<Pair<byte[], Long>> theIds) throws Exception;

void load(final Transaction tx, final byte[] duplID);

void clear() throws Exception;
}
Expand Up @@ -33,7 +33,7 @@
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
* Created 8 Dec 2008 16:35:55
* Created 8 Dec 2008 16:35:55
*/
public class DuplicateIDCacheImpl implements DuplicateIDCache
{
Expand Down Expand Up @@ -117,7 +117,7 @@ public void load(final List<Pair<byte[], Long>> theIds) throws Exception
}


public void deleteFromCache(byte [] duplicateID) throws Exception
public void deleteFromCache(byte[] duplicateID) throws Exception
{
ByteArrayHolder bah = new ByteArrayHolder(duplicateID);

Expand Down Expand Up @@ -241,6 +241,23 @@ private synchronized void addToCacheInMemory(final byte[] duplID, final long rec
}
}

public void clear() throws Exception
{
synchronized (this)
{
long tx = storageManager.generateUniqueID();
for (Pair<ByteArrayHolder, Long> id : ids)
{
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
}
storageManager.commit(tx);

ids.clear();
cache.clear();
pos = 0;
}
}

private final class AddDuplicateIDOperation extends TransactionOperationAbstract
{
final byte[] duplID;
Expand Down Expand Up @@ -296,7 +313,7 @@ public boolean equals(final Object other)
{
if (other instanceof ByteArrayHolder)
{
ByteArrayHolder s = (ByteArrayHolder)other;
ByteArrayHolder s = (ByteArrayHolder) other;

if (bytes.length != s.bytes.length)
{
Expand Down
Expand Up @@ -500,6 +500,8 @@ public synchronized Binding removeBinding(final SimpleString uniqueName, Transac
pagingManager.deletePageStore(binding.getAddress());

managementService.unregisterAddress(binding.getAddress());

deleteDuplicateCache(binding.getAddress());
}

if (binding.getType() == BindingType.LOCAL_QUEUE)
Expand Down Expand Up @@ -542,6 +544,16 @@ else if (binding.getType() == BindingType.DIVERT)
return binding;
}

private void deleteDuplicateCache(SimpleString address) throws Exception
{
DuplicateIDCache cache = duplicateIDCaches.remove(address);

if (cache != null)
{
cache.clear();
}
}

@Override
public boolean isAddressBound(final SimpleString address) throws Exception
{
Expand Down Expand Up @@ -825,6 +837,11 @@ public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
return cache;
}

public ConcurrentMap<SimpleString, DuplicateIDCache> getDuplicateIDCaches()
{
return duplicateIDCaches;
}

public Object getNotificationLock()
{
return notificationLock;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.tests.util.ServiceTestBase;
Expand Down Expand Up @@ -126,6 +127,105 @@ public void testSimpleDuplicateDetecion() throws Exception
locator.close();
}

@Test
public void testDuplicateIDCacheMemoryRetentionForNonTemporaryQueues() throws Exception
{
testDuplicateIDCacheMemoryRetention(false);
}

@Test
public void testDuplicateIDCacheMemoryRetentionForTemporaryQueues() throws Exception
{
testDuplicateIDCacheMemoryRetention(true);
}

@Test
public void testDuplicateIDCacheJournalRetentionForNonTemporaryQueues() throws Exception
{
testDuplicateIDCacheMemoryRetention(false);

messagingService.stop();

messagingService.start();

Assert.assertEquals(0, ((PostOfficeImpl) messagingService.getPostOffice()).getDuplicateIDCaches().size());
}

@Test
public void testDuplicateIDCacheJournalRetentionForTemporaryQueues() throws Exception
{
testDuplicateIDCacheMemoryRetention(true);

messagingService.stop();

messagingService.start();

Assert.assertEquals(0, ((PostOfficeImpl) messagingService.getPostOffice()).getDuplicateIDCaches().size());
}

public void testDuplicateIDCacheMemoryRetention(boolean temporary) throws Exception
{
final int TEST_SIZE = 100;

ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));

ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = sf.createSession(false, true, true);

session.start();

Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());

final SimpleString addressName = new SimpleString("DuplicateDetectionTestAddress");

for (int i = 0; i < TEST_SIZE; i++)
{
final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue_" + i);

if (temporary)
{
session.createTemporaryQueue(addressName, queueName, null);
}
else
{
session.createQueue(addressName, queueName, null, true);
}

ClientProducer producer = session.createProducer(addressName);

ClientConsumer consumer = session.createConsumer(queueName);

ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
Assert.assertEquals(1, message2.getObjectProperty(propKey));

message = createMessage(session, 2);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receiveImmediate();
Assert.assertNull(message2);

producer.close();
consumer.close();

Assert.assertEquals(1, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());
session.deleteQueue(queueName);
Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());
}

session.close();

sf.close();

locator.close();

Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());
}

@Test
public void testSimpleDuplicateDetectionWithString() throws Exception
{
Expand Down Expand Up @@ -558,7 +658,6 @@ public void testRollbackThenSend() throws Exception
session.commit();



message = consumer.receive(5000);
assertNotNull(message);
assertTrue(message.getStringProperty("key").equals(dupID1.toString()));
Expand Down

0 comments on commit 4d242a7

Please sign in to comment.