Skip to content

Commit

Permalink
ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used
Browse files Browse the repository at this point in the history
The deliver loop won't give up trying to deliver messages when
back-pressure kicks in (credits and/or TCP) if msg grouping is used and
there are many consumers registered: this change will allow the loop
to exit by instructing the logic that the group consumer is the only
consumer to check.

(cherry picked from commit 8dd0e94)
  • Loading branch information
franz1981 authored and clebertsuconic committed Aug 8, 2018
1 parent b6ec824 commit 0b24d0b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
Expand Up @@ -2365,7 +2365,12 @@ private void deliver() {
}
}

if (pos == endPos) {
if (groupConsumer != null || exclusive) {
if (noDelivery > 0) {
break;
}
noDelivery = 0;
} else if (pos == endPos) {
// Round robin'd all

if (noDelivery == size) {
Expand Down Expand Up @@ -2917,7 +2922,7 @@ private boolean deliverDirect(final MessageReference ref) {
return true;
}

if (pos == startPos) {
if (pos == startPos || groupConsumer != null || exclusive) {
// Tried them all
break;
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -1289,6 +1290,65 @@ public void testTotalIteratorOrder() throws Exception {
}
}

@Test
public void testGroupMessageWithManyConsumers() throws Exception {
final CountDownLatch firstMessageHandled = new CountDownLatch(1);
final CountDownLatch finished = new CountDownLatch(2);
final Consumer groupConsumer = new FakeConsumer() {

int count = 0;

@Override
public synchronized HandleStatus handle(MessageReference reference) {
if (count == 0) {
//the first message is handled and will be used to determine this consumer
//to be the group consumer
count++;
firstMessageHandled.countDown();
return HandleStatus.HANDLED;
} else if (count <= 2) {
//the next two attempts to send the second message will be done
//attempting a direct delivery and an async one after that
count++;
finished.countDown();
return HandleStatus.BUSY;
} else {
//this shouldn't happen, because the last attempt to deliver
//the second message should have stop the delivery loop:
//it will succeed just to let the message being handled and
//reduce the message count to 0
return HandleStatus.HANDLED;
}
}
};
final Consumer noConsumer = new FakeConsumer() {
@Override
public synchronized HandleStatus handle(MessageReference reference) {
Assert.fail("this consumer isn't allowed to consume any message");
throw new AssertionError();
}
};
final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
null, null, false, true, false,
scheduledExecutor, null, null, null,
ArtemisExecutor.delegate(executor), null, null);
queue.addConsumer(groupConsumer);
queue.addConsumer(noConsumer);
final MessageReference firstMessageReference = generateReference(queue, 1);
final SimpleString groupName = SimpleString.toSimpleString("group");
firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
final MessageReference secondMessageReference = generateReference(queue, 2);
secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
queue.addTail(firstMessageReference, true);
Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName));
queue.addTail(secondMessageReference, true);
final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS);
Assert.assertTrue(atLeastTwoDeliverAttempts);
Thread.sleep(1000);
Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount());
}

private QueueImpl getNonDurableQueue() {
return getQueue(QueueImplTest.queue1, false, false, null);
}
Expand Down

0 comments on commit 0b24d0b

Please sign in to comment.