Skip to content

Commit

Permalink
ARTEMIS-4162 support deleting addresses & queues w/o usage check
Browse files Browse the repository at this point in the history
There are certain use-cases where addresses will be auto-created and
never have a direct binding created on them. Because of this they will
never be auto-deleted. If a large number of these addresses build up
they will consume a problematic amount of heap space.

One specific example of this use-case is an MQTT subscriber with a
wild-card subscription and a large number of MQTT producers sending one
or two messages a large number of different MQTT topics covered by the
wild-card. Since no bindings are ever created on any of these individual
addresses (e.g. from a subscription queue) they will never be
auto-deleted, but they will eventually consume a large amount of heap.
The only way to deal with these addresses is to manually delete them.

There are also situations  where queues may be created and never have
any messages sent to them or never have a consumer connect. These
queues will never be auto-deleted so they must be deleted manually.

This commit adds the ability to configure the broker to skip the usage
check so that these kinds of addresses and queues can be deleted
automatically.
  • Loading branch information
jbertram committed Mar 3, 2023
1 parent 6874556 commit b76c672
Show file tree
Hide file tree
Showing 21 changed files with 323 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String AUTO_DELETE_QUEUES_MESSAGE_COUNT = "auto-delete-queues-message-count";

private static final String AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK = "auto-delete-queues-skip-usage-check";

private static final String CONFIG_DELETE_QUEUES = "config-delete-queues";

private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses";
Expand All @@ -295,6 +297,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String AUTO_DELETE_ADDRESSES_DELAY = "auto-delete-addresses-delay";

private static final String AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK = "auto-delete-addresses-skip-usage-check";

private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses";

private static final String CONFIG_DELETE_DIVERTS = "config-delete-diverts";
Expand Down Expand Up @@ -337,8 +341,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp";

private static final String SUPPRESS_SESSION_NOTIFICATIONS = "suppress-session-notifications";

private boolean validateAIO = false;

private boolean printPageMaxSizeUsed = false;
Expand Down Expand Up @@ -1372,6 +1374,8 @@ protected Pair<String, AddressSettings> parseAddressSettings(final Node node) {
long autoDeleteQueuesMessageCount = XMLUtil.parseLong(child);
Validators.MINUS_ONE_OR_GE_ZERO.validate(AUTO_DELETE_QUEUES_MESSAGE_COUNT, autoDeleteQueuesMessageCount);
addressSettings.setAutoDeleteQueuesMessageCount(autoDeleteQueuesMessageCount);
} else if (AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK.equalsIgnoreCase(name)) {
addressSettings.setAutoDeleteQueuesSkipUsageCheck(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value);
Expand All @@ -1385,6 +1389,8 @@ protected Pair<String, AddressSettings> parseAddressSettings(final Node node) {
long autoDeleteAddressesDelay = XMLUtil.parseLong(child);
Validators.GE_ZERO.validate(AUTO_DELETE_ADDRESSES_DELAY, autoDeleteAddressesDelay);
addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay);
} else if (AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK.equalsIgnoreCase(name)) {
addressSettings.setAutoDeleteAddressesSkipUsageCheck(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,8 @@ public interface AddressManager {

void scanAddresses(MirrorController mirrorController) throws Exception;

boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
boolean checkAutoRemoveAddress(AddressInfo addressInfo,
AddressSettings settings,
boolean ignoreDelay) throws Exception;

boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ private void deleteDuplicateCache(SimpleString address) throws Exception {

@Override
public boolean isAddressBound(final SimpleString address) throws Exception {
Bindings bindings = lookupBindingsForAddress(address);
return bindings != null && !bindings.getBindings().isEmpty();
Collection<Binding> bindings = getDirectBindings(address);
return bindings != null && !bindings.isEmpty();
}

@Override
Expand Down Expand Up @@ -1970,15 +1970,16 @@ public void run() {
}
}

private static boolean queueWasUsed(Queue queue) {
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1;
private static boolean queueWasUsed(Queue queue, AddressSettings settings) {
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1 || settings.getAutoDeleteQueuesSkipUsageCheck();
}

/** To be used by the AddressQueueReaper.
* It is also exposed for tests through PostOfficeTestAccessor */
void reapAddresses(boolean initialCheck) {
getLocalQueues().forEach(queue -> {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue))) {
AddressSettings settings = addressSettingsRepository.getMatch(queue.getAddress().toString());
if (!queue.isInternalQueue() && queue.isAutoDelete() && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue, settings)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue, settings))) {
if (initialCheck || queue.isSwept()) {
if (logger.isDebugEnabled()) {
if (initialCheck) {
Expand All @@ -2003,7 +2004,7 @@ void reapAddresses(boolean initialCheck) {
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());

try {
if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings, initialCheck)) {
if (addressManager.checkAutoRemoveAddress(addressInfo, settings, initialCheck)) {
if (initialCheck || addressInfo.isSwept()) {

server.autoRemoveAddressInfo(address, null);
Expand Down Expand Up @@ -2033,12 +2034,6 @@ void reapAddresses(boolean initialCheck) {
}
}

public boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings) throws Exception {
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay());
}

private Stream<Queue> getLocalQueues() {
return addressManager.getBindings()
.filter(binding -> binding.getType() == BindingType.LOCAL_QUEUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,18 @@ private void validateRoutingTypes(SimpleString addressName, EnumSet<RoutingType>
}

@Override
public boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings) throws Exception {
return checkAutoRemoveAddress(address, addressInfo, settings, false);
public boolean checkAutoRemoveAddress(AddressInfo addressInfo,
AddressSettings settings,
boolean ignoreDelay) throws Exception {
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(addressInfo.getName()) && addressWasUsed(addressInfo, settings) && (ignoreDelay || delayCheck(addressInfo, settings));
}

@Override
public boolean checkAutoRemoveAddress(SimpleString address,
AddressInfo addressInfo,
AddressSettings settings, boolean ignoreDelay) throws Exception {
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && (ignoreDelay || addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()));
private boolean delayCheck(AddressInfo addressInfo, AddressSettings settings) {
return (!settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()) || (settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() - addressInfo.getCreatedTimestamp() >= settings.getAutoDeleteAddressesDelay());
}

private boolean addressWasUsed(AddressInfo addressInfo, AddressSettings settings) {
return addressInfo.getBindingRemovedTimestamp() != -1 || settings.isAutoDeleteAddressesSkipUsageCheck();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,10 +1540,10 @@ void slowConsumerDetected(String sessionID,
@LogMessage(id = 224111, value = "Both 'whitelist' and 'allowlist' detected. Configuration 'whitelist' is deprecated, please use only the 'allowlist' configuration", level = LogMessage.Level.WARN)
void useOnlyAllowList();

@LogMessage(id = 224112, value = "Auto removing Queue {} with queueID={} on address={}", level = LogMessage.Level.INFO)
@LogMessage(id = 224112, value = "Auto removing queue {} with queueID={} on address={}", level = LogMessage.Level.INFO)
void autoRemoveQueue(String name, long queueID, String address);

@LogMessage(id = 224113, value = "Auto removing Address {}", level = LogMessage.Level.INFO)
@LogMessage(id = 224113, value = "Auto removing address {}", level = LogMessage.Level.INFO)
void autoRemoveAddress(String name);

@LogMessage(id = 224114, value = "Address control block, blocking message production on address '{}'. Clients will not get further credit.", level = LogMessage.Level.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,4 +542,8 @@ default void errorProcessing(Consumer consumer, Throwable t, MessageReference me
default QueueConfiguration getQueueConfiguration() {
return null;
}

default long getCreatedTimestamp() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ public class AddressInfo {

private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");

private long bindingRemovedTimestamp = -1;
private volatile long bindingRemovedTimestamp = -1;

private volatile boolean paused = false;

private PostOffice postOffice;
private StorageManager storageManager;
private HierarchicalRepositoryChangeListener repositoryChangeListener;
private long createdTimestamp = -1;

public boolean isSwept() {
return swept;
Expand All @@ -96,11 +97,11 @@ private AddressInfo() {
}

public AddressInfo(String name) {
this(SimpleString.toSimpleString(name), EnumSet.noneOf(RoutingType.class));
this(SimpleString.toSimpleString(name), EMPTY_ROUTING_TYPES);
}

public AddressInfo(SimpleString name) {
this(name, EnumSet.noneOf(RoutingType.class));
this(name, EMPTY_ROUTING_TYPES);
}

/**
Expand All @@ -110,6 +111,7 @@ public AddressInfo(SimpleString name) {
*/
public AddressInfo(SimpleString name, EnumSet<RoutingType> routingTypes) {
this.name = CompositeAddress.extractAddressName(name);
this.createdTimestamp = System.currentTimeMillis();
setRoutingTypes(routingTypes);
}

Expand All @@ -120,6 +122,7 @@ public AddressInfo(SimpleString name, EnumSet<RoutingType> routingTypes) {
*/
public AddressInfo(SimpleString name, RoutingType routingType) {
this.name = CompositeAddress.extractAddressName(name);
this.createdTimestamp = System.currentTimeMillis();
addRoutingType(routingType);
}

Expand Down Expand Up @@ -313,6 +316,9 @@ public String toString() {
buff.append("}");
buff.append(", autoCreated=").append(autoCreated);
buff.append(", paused=").append(paused);
buff.append(", bindingRemovedTimestamp=").append(bindingRemovedTimestamp);
buff.append(", swept=").append(swept);
buff.append(", createdTimestamp=").append(createdTimestamp);
buff.append("]");
return buff.toString();
}
Expand Down Expand Up @@ -388,6 +394,7 @@ public String toJSON() {
}
builder.add("routingTypes", arrayBuilder);
}
builder.add("created-timestamp", createdTimestamp);

return builder.build().toString();
}
Expand All @@ -412,6 +419,9 @@ protected void setJson(String key, Object value) {
JsonNumber jsonNumber = (JsonNumber)rtValue;
this.addRoutingType(RoutingType.getType((byte)jsonNumber.intValue()));
}
} else if (key.equals("created-timestamp")) {
JsonNumber jsonLong = (JsonNumber) value;
this.createdTimestamp = jsonLong.longValue();
}
}

Expand All @@ -427,4 +437,7 @@ public static AddressInfo fromJSON(String jsonString) {
return result;
}

public long getCreatedTimestamp() {
return createdTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {

private volatile long ringSize;

private volatile long createdTimestamp = -1;

@Override
public boolean isSwept() {
return swept;
Expand Down Expand Up @@ -635,6 +637,8 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
final QueueFactory factory) {
super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);

this.createdTimestamp = System.currentTimeMillis();

this.id = queueConfiguration.getId();

this.address = queueConfiguration.getAddress();
Expand Down Expand Up @@ -1603,6 +1607,11 @@ public synchronized void setRingSize(long ringSize) {
this.ringSize = ringSize;
}

@Override
public long getCreatedTimestamp() {
return createdTimestamp;
}

public long getMessageCountForRing() {
return (long) pendingMetrics.getMessageCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ private static void purge(Queue queue) {

public static void performAutoDeleteQueue(ActiveMQServer server, Queue queue) {
SimpleString queueName = queue.getName();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());

if (logger.isDebugEnabled()) {
logger.debug("deleting auto-created queue \"{}\": consumerCount = {}; messageCount = {}; isAutoDelete = {}", queueName, queue.getConsumerCount(), queue.getMessageCount(), queue.isAutoDelete());
Expand All @@ -80,16 +79,12 @@ public static void performAutoDeleteQueue(ActiveMQServer server, Queue queue) {
}
}

public static boolean isAutoDelete(Queue queue) {
return queue.isAutoDelete();
}

public static boolean messageCountCheck(Queue queue) {
return queue.getAutoDeleteMessageCount() == -1 || queue.getMessageCount() <= queue.getAutoDeleteMessageCount();
}

public static boolean delayCheck(Queue queue) {
return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay();
public static boolean delayCheck(Queue queue, AddressSettings settings) {
return (!settings.getAutoDeleteQueuesSkipUsageCheck() && System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay()) || (settings.getAutoDeleteQueuesSkipUsageCheck() && System.currentTimeMillis() - queue.getCreatedTimestamp() >= queue.getAutoDeleteDelay());
}

public static boolean consumerCountCheck(Queue queue) {
Expand Down

0 comments on commit b76c672

Please sign in to comment.