Skip to content

Commit

Permalink
https://issues.jboss.org/browse/JBPAPP-6091 - move message is not ign…
Browse files Browse the repository at this point in the history
…oring duplicates (feature was only working with moveMessages)
  • Loading branch information
clebertsuconic committed Mar 14, 2011
1 parent 26a54de commit f472fdf
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public boolean moveMessage(final long messageID, final String otherQueueName, fi
throw new IllegalArgumentException("No queue found for " + otherQueueName);
}

return queue.moveReference(messageID, binding.getAddress());
return queue.moveReference(messageID, binding.getAddress(), rejectDuplicates);
}
finally
{
Expand Down
15 changes: 5 additions & 10 deletions src/main/org/hornetq/core/server/impl/QueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ public void expire(final MessageReference ref) throws Exception
{
if (expiryAddress != null)
{
move(expiryAddress, ref, true);
move(expiryAddress, ref, true, false);
}
else
{
Expand Down Expand Up @@ -1101,7 +1101,7 @@ public synchronized boolean moveReference(final long messageID,
deliveringCount.incrementAndGet();
try
{
move(toAddress, ref);
move(toAddress, ref, false, rejectDuplicate);
}
catch (Exception e)
{
Expand Down Expand Up @@ -1627,11 +1627,6 @@ public int getNumberOfReferences()
return messageReferences.size();
}

private void move(final SimpleString toAddress, final MessageReference ref) throws Exception
{
move(toAddress, ref, false);
}

private void move(final SimpleString toAddress,
final Transaction tx,
final MessageReference ref,
Expand Down Expand Up @@ -1711,7 +1706,7 @@ private void sendToDeadLetterAddress(final MessageReference ref) throws Exceptio
QueueImpl.log.warn("Message has reached maximum delivery attempts, sending it to Dead Letter Address " + deadLetterAddress +
" from " +
name);
move(deadLetterAddress, ref, false);
move(deadLetterAddress, ref, false, false);
}
}
else
Expand All @@ -1723,15 +1718,15 @@ private void sendToDeadLetterAddress(final MessageReference ref) throws Exceptio
}
}

private void move(final SimpleString address, final MessageReference ref, final boolean expiry) throws Exception
private void move(final SimpleString address, final MessageReference ref, final boolean expiry, final boolean rejectDuplicate) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);

ServerMessage copyMessage = makeCopy(ref, expiry);

copyMessage.setAddress(address);

postOffice.route(copyMessage, tx, false);
postOffice.route(copyMessage, tx, false, rejectDuplicate);

acknowledge(tx, ref);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,92 @@ public void testMoveMessagesBack() throws Exception

}

public void testMoveMessagesBack2() throws Exception
{
server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false);
server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false);

ServerLocator locator = createInVMNonHALocator();

ClientSessionFactory sf = locator.createSessionFactory();

ClientSession session = sf.createSession(true, true);

ClientProducer prod1 = session.createProducer("q1");

int NUMBER_OF_MSGS = 10;

for (int i = 0; i < NUMBER_OF_MSGS; i++)
{
ClientMessage msg = session.createMessage(true);

msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));

prod1.send(msg);
}

session.commit();

ClientConsumer consumer = session.createConsumer("q1", true);
session.start();

assertNotNull(consumer.receive(5000));
consumer.close();

QueueControl q1Control = ManagementControlHelper.createQueueControl(new SimpleString("q1"),
new SimpleString("q1"),
mbeanServer);

QueueControl q2Control = ManagementControlHelper.createQueueControl(new SimpleString("q2"),
new SimpleString("q2"),
mbeanServer);

assertEquals(NUMBER_OF_MSGS, q1Control.moveMessages(null, "q2"));

long messageIDs[] = new long[NUMBER_OF_MSGS];

consumer = session.createConsumer("q2", true);

for (int i = 0 ; i < NUMBER_OF_MSGS; i++)
{
ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
messageIDs[i] = msg.getMessageID();
}

assertNull(consumer.receiveImmediate());

consumer.close();

for (int i = 0 ; i < NUMBER_OF_MSGS; i++)
{
q2Control.moveMessage(messageIDs[i], "q1");
}


session.start();
consumer = session.createConsumer("q1");

for (int i = 0; i < NUMBER_OF_MSGS; i++)
{
ClientMessage msg = consumer.receive(5000);
System.out.println("msg = " + msg);
assertNotNull(msg);
msg.acknowledge();
}

consumer.close();

session.deleteQueue("q1");

session.deleteQueue("q2");

session.close();

locator.close();

}

public void testPauseAndResume()
{
long counterPeriod = 1000;
Expand Down

0 comments on commit f472fdf

Please sign in to comment.