Skip to content

Commit

Permalink
ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used
Browse files Browse the repository at this point in the history
The deliver loop won't give up trying to deliver messages when
back-pressure kicks in (credits and/or TCP) if msg grouping is used and
there are many consumers registered: this change will allow the loop
to exit by instructing the logic that the group consumer is the only
consumer to check.
  • Loading branch information
franz1981 committed Aug 1, 2018
1 parent a7e4ce4 commit 226d12f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2484,10 +2484,10 @@ private void deliver() {
}
}

if (pos == endPos) {
// Round robin'd all
if (pos == endPos || groupConsumer != null) {
// Round robin'd all or groupConsumer is defined

if (noDelivery == size) {
if (noDelivery == size || (groupConsumer != null && noDelivery > 0)) {
if (handledconsumer != null) {
// this shouldn't really happen,
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
Expand Down Expand Up @@ -245,4 +246,63 @@ public void disconnect() {
Assert.assertTrue(gotLatch);
}

@Test
public void testGroupMessageWithManyConsumers() throws Exception {
final CountDownLatch firstMessageHandled = new CountDownLatch(1);
final CountDownLatch finished = new CountDownLatch(2);
final Consumer groupConsumer = new FakeConsumer() {

int count = 0;

@Override
public synchronized HandleStatus handle(MessageReference reference) {
if (count == 0) {
//the first message is handled and will be used to determine this consumer
//to be the group consumer
count++;
firstMessageHandled.countDown();
return HandleStatus.HANDLED;
} else if (count <= 2) {
//the next two attempts to send the second message will be done
//attempting a direct delivery and an async one after that
count++;
finished.countDown();
return HandleStatus.BUSY;
} else {
//this shouldn't happen, because the last attempt to deliver
//the second message should have stop the delivery loop:
//it will succeed just to let the message being handled and
//reduce the message count to 0
return HandleStatus.HANDLED;
}
}
};
final Consumer noConsumer = new FakeConsumer() {
@Override
public synchronized HandleStatus handle(MessageReference reference) {
Assert.fail("this consumer isn't allowed to consume any message");
throw new AssertionError();
}
};
final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
null, null, false, true, false,
scheduledExecutor, null, null, null,
ArtemisExecutor.delegate(executor), null, null);
queue.addConsumer(groupConsumer);
queue.addConsumer(noConsumer);
final MessageReference firstMessageReference = generateReference(queue, 1);
final SimpleString groupName = SimpleString.toSimpleString("group");
firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
final MessageReference secondMessageReference = generateReference(queue, 2);
secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
queue.addTail(firstMessageReference, true);
Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName));
queue.addTail(secondMessageReference, true);
final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS);
Assert.assertTrue(atLeastTwoDeliverAttempts);
Thread.sleep(1000);
Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount());
}

}

0 comments on commit 226d12f

Please sign in to comment.