Skip to content

Commit

Permalink
[BZ 1377703] Slow consumer detection not working when paging
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jan 9, 2017
1 parent 125602c commit 3855d95
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
Expand Up @@ -2782,13 +2782,14 @@ private long calculateRedeliveryDelay(final AddressSettings addressSettings, fin

public float getRate()
{
long locaMessageAdded = getMessagesAdded();
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
if (timeSlice == 0)
{
messagesAddedSnapshot.getAndSet(messagesAdded);
messagesAddedSnapshot.getAndSet(locaMessageAdded);
return 0.0f;
}
return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
}

// Inner classes
Expand Down
Expand Up @@ -33,10 +33,13 @@
import org.hornetq.api.core.management.CoreNotificationType;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.SlowConsumerPolicy;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -49,21 +52,25 @@
public class SlowConsumerTest extends ServiceTestBase
{
private boolean isNetty = false;
private boolean isPaging = false;

// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters
@Parameterized.Parameters(name = "persistent={0}, paging={1}")
public static Collection getParameters()
{
return Arrays.asList(new Object[][]{
{true},
{false}
{true, false},
{false, false},
{true, true},
{false, true}
});
}

public SlowConsumerTest(boolean isNetty)
public SlowConsumerTest(boolean isNetty, boolean isPaging)
{
this.isNetty = isNetty;
this.isPaging = isPaging;
}

private HornetQServer server;
Expand All @@ -78,17 +85,33 @@ public void setUp() throws Exception
{
super.setUp();

server = createServer(false, isNetty);
server = createServer(true, isNetty);

AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerCheckPeriod(1);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);

if (isPaging)
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(10 * 1024);
addressSettings.setPageSizeBytes(1024);
}
else
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
addressSettings.setPageSizeBytes(1024);

}

server.start();

server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);

server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();

locator = createFactory(isNetty);
}

Expand All @@ -99,10 +122,10 @@ public void testSlowConsumerKilled() throws Exception

ClientSession session = addClientSession(sf.createSession(false, true, true, false));

session.createQueue(QUEUE, QUEUE, null, false);

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

assertPaging();

final int numMessages = 25;

for (int i = 0; i < numMessages; i++)
Expand All @@ -126,6 +149,19 @@ public void testSlowConsumerKilled() throws Exception
}
}

private void assertPaging() throws Exception
{
Queue queue = server.locateQueue(QUEUE);
if (isPaging)
{
Assert.assertTrue(queue.getPageSubscription().isPaging());
}
else
{
Assert.assertFalse(queue.getPageSubscription().isPaging());
}
}

@Test
public void testSlowConsumerNotification() throws Exception
{
Expand All @@ -134,16 +170,21 @@ public void testSlowConsumerNotification() throws Exception

ClientSession session = addClientSession(sf.createSession(false, true, true, false));

session.createQueue(QUEUE, QUEUE, null, false);

AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
if (!isPaging)
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
}

server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);

assertPaging();

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

final int numMessages = 25;
Expand Down Expand Up @@ -195,7 +236,7 @@ public void onMessage(ClientMessage message)
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

assertTrue(notifLatch.await(3, TimeUnit.SECONDS));
assertTrue(notifLatch.await(15, TimeUnit.SECONDS));
}

@Test
Expand All @@ -205,8 +246,6 @@ public void testSlowConsumerSpared() throws Exception

ClientSession session = addClientSession(sf.createSession(true, true));

session.createQueue(QUEUE, QUEUE, null, false);

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

final int numMessages = 5;
Expand All @@ -216,6 +255,8 @@ public void testSlowConsumerSpared() throws Exception
producer.send(createTextMessage(session, "m" + i));
}

assertPaging();

ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

Expand All @@ -238,8 +279,6 @@ public void testFastThenSlowConsumerSpared() throws Exception

final ClientSession producerSession = addClientSession(sf.createSession(true, true));

session.createQueue(QUEUE, QUEUE, null, false);

final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE));

final AtomicLong messagesProduced = new AtomicLong(0);
Expand Down Expand Up @@ -289,6 +328,8 @@ public void run()

t.start();

assertPaging();

ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

Expand Down

0 comments on commit 3855d95

Please sign in to comment.