Skip to content
Permalink
Browse files
ARTEMIS-3720 Improvements on Paging MaxMessages usage
I "used" the broker a little bit around max-messages and found a few minor issues.
  • Loading branch information
clebertsuconic committed Mar 21, 2022
1 parent 7bdb3dc commit 5f22a5192657efd7d1c2a2e36e27e7a8cc9693d4
Show file tree
Hide file tree
Showing 11 changed files with 556 additions and 30 deletions.
@@ -133,8 +133,14 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->


<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
based on the first attribute to hits the maximum value -->
<!-- limit for the address in bytes, -1 means unlimited -->
<max-size-bytes>-1</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>-1</max-size-messages>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>${full-policy}</address-full-policy>
<auto-create-queues>${auto-create}</auto-create-queues>
@@ -1,9 +1,6 @@
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory

The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.

<global-max-size>100Mb</global-max-size>

-->
<global-max-size>100Mb</global-max-size> -->
@@ -452,10 +452,7 @@ public void parseMainConfig(final Element e, final Configuration config) throws

long globalMaxMessages = getLong(e, GLOBAL_MAX_MESSAGES, -1, Validators.MINUS_ONE_OR_GT_ZERO);

if (globalMaxSize > 0) {
config.setGlobalMaxMessages(globalMaxMessages);
}

config.setGlobalMaxMessages(globalMaxMessages);

config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE_OR_MINUS_ONE));

@@ -131,6 +131,10 @@ default long getGlobalSize() {
return 0;
}

default long getGlobalMessages() {
return 0;
}

/**
* Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
* @param runWhenAvailable
@@ -149,4 +153,8 @@ default void checkStorage(Runnable runWhenAvailable) {
default long getMaxSize() {
return 0;
}

default long getMaxMessages() {
return 0;
}
}
@@ -80,6 +80,8 @@ private void setGlobalFull(boolean globalFull) {

private long maxSize;

private long maxMessages;

private volatile boolean cleanupEnabled = true;

private volatile boolean diskFull = false;
@@ -117,6 +119,7 @@ public PagingManagerImpl(final PagingStoreFactory pagingSPI,
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
this.maxMessages = maxMessages;
this.globalSizeMetric = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages);
globalSizeMetric.setSizeEnabled(maxSize >= 0);
globalSizeMetric.setElementsEnabled(maxMessages >= 0);
@@ -132,16 +135,22 @@ SizeAwareMetric getSizeAwareMetric() {


/** To be used in tests only called through PagingManagerTestAccessor */
void resetMaxSize(long maxSize, long maxElements) {
void resetMaxSize(long maxSize, long maxMessages) {
this.maxSize = maxSize;
this.globalSizeMetric.setMax(maxSize, maxSize, maxElements, maxElements);
this.maxMessages = maxMessages;
this.globalSizeMetric.setMax(maxSize, maxSize, maxMessages, maxMessages);
}

@Override
public long getMaxSize() {
return maxSize;
}

@Override
public long getMaxMessages() {
return maxMessages;
}

public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
this(pagingSPI, addressSettingsRepository, -1, -1, null);
@@ -186,6 +195,11 @@ public long getGlobalSize() {
return globalSizeMetric.getSize();
}

@Override
public long getGlobalMessages() {
return globalSizeMetric.getElements();
}

protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) {
if (!memoryCallback.isEmpty()) {
@@ -530,14 +530,18 @@ public void stopPaging() {
final boolean isPaging = this.paging;
if (isPaging) {
paging = false;
ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, getPageInfo());
}
this.cursorProvider.onPageModeCleared();
} finally {
lock.writeLock().unlock();
}
}

private String getPageInfo() {
return String.format("size=%d bytes (%d messages); maxSize=%d bytes (%d messages); globalSize=%d bytes (%d messages); globalMaxSize=%d bytes (%d messages);", size.getSize(), size.getElements(), maxSize, maxMessages, pagingManager.getGlobalSize(), pagingManager.getGlobalMessages(), pagingManager.getMaxSize(), pagingManager.getMaxMessages());
}

@Override
public boolean startPaging() {
if (!running) {
@@ -575,7 +579,7 @@ public boolean startPaging() {
}
}
paging = true;
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, getPageInfo());

return true;
} finally {
@@ -764,7 +768,7 @@ public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable
if (pagingManager.isDiskFull()) {
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
} else {
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, size.getSize(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, getPageInfo());
}
blocking = true;
}
@@ -813,7 +817,7 @@ public boolean checkReleasedMemory() {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, size.getSize(), maxSize);
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo());
blocking = false;
return true;
}
@@ -848,7 +852,7 @@ public boolean page(Message message,
// Address is full, we just pretend we are paging, and drop the data
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize());
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
}
return true;
} else {
@@ -288,8 +288,8 @@ void messageWithDuplicateID(Object duplicateProperty,
void switchingNIO();

@LogMessage(level = Logger.Level.INFO)
@Message(id = 221046, value = "Unblocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT)
void unblockingMessageProduction(SimpleString addressName, long currentSize, long maxSize);
@Message(id = 221046, value = "Unblocking message production on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT)
void unblockingMessageProduction(SimpleString addressName, String sizeInfo);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 221047, value = "Backup Server has scaled down to live server", format = Message.Format.MESSAGE_FORMAT)
@@ -615,12 +615,12 @@ void slowConsumerDetected(String sessionID,
void pageStoreStartIOError(@Cause Exception e);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222038, value = "Starting paging on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}; global-size-bytes: {3}", format = Message.Format.MESSAGE_FORMAT)
void pageStoreStart(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize);
@Message(id = 222038, value = "Starting paging on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT)
void pageStoreStart(SimpleString storeName, String sizeInfo);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222039, value = "Messages sent to address ''{0}'' are being dropped; size is currently: {1} bytes; max-size-bytes: {2}; global-size-bytes: {3}", format = Message.Format.MESSAGE_FORMAT)
void pageStoreDropMessages(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize);
@Message(id = 222039, value = "Messages sent to address ''{0}'' are being dropped; {1}", format = Message.Format.MESSAGE_FORMAT)
void pageStoreDropMessages(SimpleString storeName, String sizeInfo);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222040, value = "Server is stopped", format = Message.Format.MESSAGE_FORMAT)
@@ -1218,8 +1218,8 @@ void slowConsumerDetected(String sessionID,
void missingClusterConfigForScaleDown(String scaleDownCluster);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT)
void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize);
@Message(id = 222183, value = "Blocking message production on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT)
void blockingMessageProduction(SimpleString addressName, String pageInfo);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222184,
@@ -2181,8 +2181,8 @@ void slowConsumerDetected(String sessionID,
void enableTraceForCriticalAnalyzer();

@LogMessage(level = Logger.Level.WARN)
@Message(id = 224108, value = "Stopped paging on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}; global-size-bytes: {3}", format = Message.Format.MESSAGE_FORMAT)
void pageStoreStop(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize);
@Message(id = 224108, value = "Stopped paging on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT)
void pageStoreStop(SimpleString storeName, String pageInfo);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 224109, value = "ConnectionRouter {0} not found", format = Message.Format.MESSAGE_FORMAT)
@@ -861,7 +861,6 @@
</args>
</configuration>
</execution>
<!-- END JmxReplicatedMultipleFailbackTest -->
<execution>
<phase>test-compile</phase>
<id>create-paging</id>
@@ -882,6 +881,46 @@
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-paging-address-messages</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/pagingAddressMaxMessages</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/pagingAddressMaxMessages</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-paging-global-messages</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/pagingGlobalMaxMessages</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/pagingGlobalMaxMessages</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
<!-- used on TransferTest -->
<execution>
<phase>test-compile</phase>

0 comments on commit 5f22a51

Please sign in to comment.