From 3855d95709a7fe5284f898b33e7b95bde7d039e1 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 9 Jan 2017 11:35:39 -0500 Subject: [PATCH] [BZ 1377703] Slow consumer detection not working when paging --- .../hornetq/core/server/impl/QueueImpl.java | 5 +- .../integration/client/SlowConsumerTest.java | 71 +++++++++++++++---- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java index 594b525aba0..f781d712bfb 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java @@ -2782,13 +2782,14 @@ private long calculateRedeliveryDelay(final AddressSettings addressSettings, fin public float getRate() { + long locaMessageAdded = getMessagesAdded(); float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); if (timeSlice == 0) { - messagesAddedSnapshot.getAndSet(messagesAdded); + messagesAddedSnapshot.getAndSet(locaMessageAdded); return 0.0f; } - return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); + return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); } // Inner classes diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java index 80ec5921194..3c231ff5848 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SlowConsumerTest.java @@ -33,10 +33,13 @@ import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.Queue; +import org.hornetq.core.settings.impl.AddressFullMessagePolicy; import org.hornetq.core.settings.impl.AddressSettings; import org.hornetq.core.settings.impl.SlowConsumerPolicy; import org.hornetq.tests.util.RandomUtil; import org.hornetq.tests.util.ServiceTestBase; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,21 +52,25 @@ public class SlowConsumerTest extends ServiceTestBase { private boolean isNetty = false; + private boolean isPaging = false; // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" - @Parameterized.Parameters + @Parameterized.Parameters(name = "persistent={0}, paging={1}") public static Collection getParameters() { return Arrays.asList(new Object[][]{ - {true}, - {false} + {true, false}, + {false, false}, + {true, true}, + {false, true} }); } - public SlowConsumerTest(boolean isNetty) + public SlowConsumerTest(boolean isNetty, boolean isPaging) { this.isNetty = isNetty; + this.isPaging = isPaging; } private HornetQServer server; @@ -78,17 +85,33 @@ public void setUp() throws Exception { super.setUp(); - server = createServer(false, isNetty); + server = createServer(true, isNetty); AddressSettings addressSettings = new AddressSettings(); - addressSettings.setSlowConsumerCheckPeriod(2); + addressSettings.setSlowConsumerCheckPeriod(1); addressSettings.setSlowConsumerThreshold(10); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + if (isPaging) + { + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setMaxSizeBytes(10 * 1024); + addressSettings.setPageSizeBytes(1024); + } + else + { + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + addressSettings.setMaxSizeBytes(-1); + addressSettings.setPageSizeBytes(1024); + + } + server.start(); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging(); + locator = createFactory(isNetty); } @@ -99,10 +122,10 @@ public void testSlowConsumerKilled() throws Exception ClientSession session = addClientSession(sf.createSession(false, true, true, false)); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + assertPaging(); + final int numMessages = 25; for (int i = 0; i < numMessages; i++) @@ -126,6 +149,19 @@ public void testSlowConsumerKilled() throws Exception } } + private void assertPaging() throws Exception + { + Queue queue = server.locateQueue(QUEUE); + if (isPaging) + { + Assert.assertTrue(queue.getPageSubscription().isPaging()); + } + else + { + Assert.assertFalse(queue.getPageSubscription().isPaging()); + } + } + @Test public void testSlowConsumerNotification() throws Exception { @@ -134,16 +170,21 @@ public void testSlowConsumerNotification() throws Exception ClientSession session = addClientSession(sf.createSession(false, true, true, false)); - session.createQueue(QUEUE, QUEUE, null, false); - AddressSettings addressSettings = new AddressSettings(); addressSettings.setSlowConsumerCheckPeriod(2); addressSettings.setSlowConsumerThreshold(10); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + if (!isPaging) + { + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + addressSettings.setMaxSizeBytes(-1); + } server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + assertPaging(); + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); final int numMessages = 25; @@ -195,7 +236,7 @@ public void onMessage(ClientMessage message) ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); session.start(); - assertTrue(notifLatch.await(3, TimeUnit.SECONDS)); + assertTrue(notifLatch.await(15, TimeUnit.SECONDS)); } @Test @@ -205,8 +246,6 @@ public void testSlowConsumerSpared() throws Exception ClientSession session = addClientSession(sf.createSession(true, true)); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); final int numMessages = 5; @@ -216,6 +255,8 @@ public void testSlowConsumerSpared() throws Exception producer.send(createTextMessage(session, "m" + i)); } + assertPaging(); + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); session.start(); @@ -238,8 +279,6 @@ public void testFastThenSlowConsumerSpared() throws Exception final ClientSession producerSession = addClientSession(sf.createSession(true, true)); - session.createQueue(QUEUE, QUEUE, null, false); - final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE)); final AtomicLong messagesProduced = new AtomicLong(0); @@ -289,6 +328,8 @@ public void run() t.start(); + assertPaging(); + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); session.start();