Skip to content
Browse files

HORNETQ-1328 - Fix order of totalIterator()

  • Loading branch information...
1 parent a74c8b2 commit a44686837cc12c4ee004f32bbe4724d11b1dd9f4 @jbertram jbertram committed Feb 20, 2014
View
30 hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -2964,36 +2965,45 @@ public TotalQueueIterator()
@Override
public boolean hasNext()
{
+ if (messagesIterator.hasNext())
+ {
+ return true;
+ }
+ if (interIterator.hasNext())
+ {
+ return true;
+ }
if (pageIter != null)
{
if (pageIter.hasNext())
{
return true;
}
}
- if (interIterator.hasNext())
- {
- return true;
- }
- return messagesIterator.hasNext();
+ return false;
}
@Override
public MessageReference next()
{
+ if (messagesIterator.hasNext())
+ {
+ return messagesIterator.next();
+ }
+ if (interIterator.hasNext())
+ {
+ return interIterator.next();
+ }
if (pageIter != null)
{
if (pageIter.hasNext())
{
return pageIter.next();
}
}
- if (interIterator.hasNext())
- {
- return interIterator.next();
- }
- return messagesIterator.next();
+
+ throw new NoSuchElementException();
}
@Override
View
68 tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
@@ -22,18 +22,29 @@
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.FutureLatch;
+import org.hornetq.utils.LinkedListIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -1563,4 +1574,61 @@ public void run()
}
}
+ @Test
+ public void testTotalIteratorOrder() throws Exception
+ {
+ final String MY_ADDRESS = "myAddress";
+ final String MY_QUEUE = "myQueue";
+
+ HornetQServer server = HornetQServers.newHornetQServer(createDefaultConfig(false), true);
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(10 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory factory = createSessionFactory(locator);
+ ClientSession session = addClientSession(factory.createSession(false, true, true));
+
+ session.createQueue(MY_ADDRESS, MY_QUEUE, true);
+
+ ClientProducer producer = addClientProducer(session.createProducer(MY_ADDRESS));
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+ message.putIntProperty("order", i);
+ producer.send(message);
+ }
+
+ producer.close();
+ session.close();
+ factory.close();
+ locator.close();
+
+ Queue queue = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(MY_QUEUE))).getQueue();
+ LinkedListIterator<MessageReference> totalIterator = queue.totalIterator();
+
+ try
+ {
+ int i = 0;
+ while (totalIterator.hasNext())
+ {
+ MessageReference ref = totalIterator.next();
+ Assert.assertEquals(i++, ref.getMessage().getIntProperty("order").intValue());
+ }
+ }
+ finally
+ {
+ totalIterator.close();
+ server.stop();
+ }
+ }
}

0 comments on commit a446868

Please sign in to comment.
Something went wrong with that request. Please try again.