diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java index d6d628ad7b6..cb064f77b86 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java @@ -51,7 +51,17 @@ public class TransactedStoreUsageSuspendResumeTest { private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class); - private static final int MAX_MESSAGES = 10000; + /** + * Keep volume modest while still triggering store usage blocking. Smaller + * store limits combined with these numbers still exercise the same flow + * control behaviour but in a fraction of the time. + */ + private static final int MAX_MESSAGES = 1000; + private static final int RETAIN_QUEUE_FILL_COUNT = 400; + private static final int SHORT_RETAIN_QUEUE_FILL_COUNT = 200; + private static final int STORE_USAGE_LIMIT = 4 * 1024 * 1024; + private static final int RECEIVE_TIMEOUT_MILLIS = 5000; + private static final int MAX_IDLE_RECEIVES = 12; // 1 minute (RECEIVE_TIMEOUT_MILLIS times MAX_IDLE_RECEIVES attempts) private static final String QUEUE_NAME = "test.queue"; @@ -85,11 +95,18 @@ public void run() { MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + int idleReceives = 0; do { - Message message = consumer.receive(5000); + Message message = consumer.receive(RECEIVE_TIMEOUT_MILLIS); if (message != null) { session.commit(); messagesReceivedCountDown.countDown(); + idleReceives = 0; + } else { + idleReceives++; + if (idleReceives >= MAX_IDLE_RECEIVES) { + Assert.fail("Timed out waiting for messages, remaining: " + messagesReceivedCountDown.getCount()); + } } if (messagesReceivedCountDown.getCount() % 500 == 0) { LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount()); @@ -121,7 +138,7 @@ public void setup() throws Exception { kahaDB.setCompactAcksAfterNoGC(5); broker.setPersistenceAdapter(kahaDB); - broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024); + broker.getSystemUsage().getStoreUsage().setLimit(STORE_USAGE_LIMIT); broker.start(); broker.waitUntilStarted(); @@ -179,7 +196,7 @@ private void sendMessages() throws Exception { BytesMessage message = session.createBytesMessage(); message.writeBytes(new byte[10]); - for (int i=0; i<1240; i++) { + for (int i = 0; i < RETAIN_QUEUE_FILL_COUNT; i++) { // mostly fill the store with retained messages // so consumer only has a small bit of store usage to work with producer.send(retainQueue, message); @@ -190,13 +207,13 @@ private void sendMessages() throws Exception { // some daylight in needed between retainQ and regularQ to free up the store // log4j.logger.org.apache.activemq.store.kahadb.MessageDatabase=TRACE Destination shortRetainQueue = session.createQueue(QUEUE_NAME + "-retain-short"); - for (int i=0; i<1240; i++) { + for (int i = 0; i < SHORT_RETAIN_QUEUE_FILL_COUNT; i++) { producer.send(shortRetainQueue, message); session.commit(); } MessageConsumer consumer = session.createConsumer(shortRetainQueue); - for (int i=0; i<1240; i++) { + for (int i = 0; i < SHORT_RETAIN_QUEUE_FILL_COUNT; i++) { consumer.receive(4000); session.commit(); }