Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@
public class TransactedStoreUsageSuspendResumeTest {
private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class);

private static final int MAX_MESSAGES = 10000;
/**
* Keep volume modest while still triggering store usage blocking. Smaller
* store limits combined with these numbers still exercise the same flow
* control behaviour but in a fraction of the time.
*/
private static final int MAX_MESSAGES = 1000;
private static final int RETAIN_QUEUE_FILL_COUNT = 400;
private static final int SHORT_RETAIN_QUEUE_FILL_COUNT = 200;
private static final int STORE_USAGE_LIMIT = 4 * 1024 * 1024;
private static final int RECEIVE_TIMEOUT_MILLIS = 5000;
private static final int MAX_IDLE_RECEIVES = 12; // 1 minute (RECEIVE_TIMEOUT_MILLIS times MAX_IDLE_RECEIVES attempts)

private static final String QUEUE_NAME = "test.queue";

Expand Down Expand Up @@ -85,11 +95,18 @@ public void run() {

MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));

int idleReceives = 0;
do {
Message message = consumer.receive(5000);
Message message = consumer.receive(RECEIVE_TIMEOUT_MILLIS);
if (message != null) {
session.commit();
messagesReceivedCountDown.countDown();
idleReceives = 0;
} else {
idleReceives++;
if (idleReceives >= MAX_IDLE_RECEIVES) {
Assert.fail("Timed out waiting for messages, remaining: " + messagesReceivedCountDown.getCount());
}
}
if (messagesReceivedCountDown.getCount() % 500 == 0) {
LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount());
Expand Down Expand Up @@ -121,7 +138,7 @@ public void setup() throws Exception {
kahaDB.setCompactAcksAfterNoGC(5);
broker.setPersistenceAdapter(kahaDB);

broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024);
broker.getSystemUsage().getStoreUsage().setLimit(STORE_USAGE_LIMIT);

broker.start();
broker.waitUntilStarted();
Expand Down Expand Up @@ -179,7 +196,7 @@ private void sendMessages() throws Exception {
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10]);

for (int i=0; i<1240; i++) {
for (int i = 0; i < RETAIN_QUEUE_FILL_COUNT; i++) {
// mostly fill the store with retained messages
// so consumer only has a small bit of store usage to work with
producer.send(retainQueue, message);
Expand All @@ -190,13 +207,13 @@ private void sendMessages() throws Exception {
// some daylight in needed between retainQ and regularQ to free up the store
// log4j.logger.org.apache.activemq.store.kahadb.MessageDatabase=TRACE
Destination shortRetainQueue = session.createQueue(QUEUE_NAME + "-retain-short");
for (int i=0; i<1240; i++) {
for (int i = 0; i < SHORT_RETAIN_QUEUE_FILL_COUNT; i++) {
producer.send(shortRetainQueue, message);
session.commit();
}

MessageConsumer consumer = session.createConsumer(shortRetainQueue);
for (int i=0; i<1240; i++) {
for (int i = 0; i < SHORT_RETAIN_QUEUE_FILL_COUNT; i++) {
consumer.receive(4000);
session.commit();
}
Expand Down