Skip to content

Commit

Permalink
This closes #1345
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jun 22, 2017
2 parents be8eb3e + f63f130 commit a970b41
Show file tree
Hide file tree
Showing 14 changed files with 616 additions and 5 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;

/**
Expand Down Expand Up @@ -156,6 +157,16 @@ public void validate(final String name, final Object value) {
}
};

public static final Validator DELETION_POLICY_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null || !val.equals(DeletionPolicy.OFF.toString()) && !val.equals(DeletionPolicy.FORCE.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidDeletionPolicyType(val);
}
}
};

public static final Validator MESSAGE_LOAD_BALANCING_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
Expand Down
Expand Up @@ -72,6 +72,7 @@
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.utils.ByteUtil;
Expand Down Expand Up @@ -193,10 +194,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String AUTO_DELETE_QUEUES = "auto-delete-queues";

private static final String CONFIG_DELETE_QUEUES = "config-delete-queues";

private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses";

private static final String AUTO_DELETE_ADDRESSES = "auto-delete-addresses";

private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses";

private static final String DEFAULT_PURGE_ON_NO_CONSUMERS = "default-purge-on-no-consumers";

private static final String DEFAULT_MAX_CONSUMERS = "default-max-consumers";
Expand Down Expand Up @@ -985,10 +990,20 @@ protected Pair<String, AddressSettings> parseAddressSettings(final Node node) {
addressSettings.setAutoCreateQueues(XMLUtil.parseBoolean(child));
} else if (AUTO_DELETE_QUEUES.equalsIgnoreCase(name)) {
addressSettings.setAutoDeleteQueues(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value);
DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value);
addressSettings.setConfigDeleteQueues(policy);
} else if (AUTO_CREATE_ADDRESSES.equalsIgnoreCase(name)) {
addressSettings.setAutoCreateAddresses(XMLUtil.parseBoolean(child));
} else if (AUTO_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
addressSettings.setAutoDeleteAddresses(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value);
DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value);
addressSettings.setConfigDeleteAddresses(policy);
} else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
} else if (DEFAULT_PURGE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) {
Expand Down
Expand Up @@ -427,5 +427,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
String address,
Set<RoutingType> supportedRoutingTypes);

@Message(id = 119212, value = "Invalid deletion policy type {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidDeletionPolicyType(String val);

}
Expand Up @@ -1582,4 +1582,12 @@ void slowConsumerDetected(String sessionID,
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224075, value = "Cannot find pageTX id = {0}", format = Message.Format.MESSAGE_FORMAT)
void journalCannotFindPageTX(Long id);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployAddress(SimpleString addressName);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(SimpleString queueName);
}
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
Expand Down Expand Up @@ -151,6 +152,7 @@
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
Expand Down Expand Up @@ -2238,6 +2240,9 @@ public void run() {
// Deploy any predefined queues
deployQueuesFromConfiguration();

// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();

// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
Expand Down Expand Up @@ -2313,6 +2318,53 @@ private void deploySecurityFromConfiguration() {
}
}

private void undeployAddressesAndQueueNotInConfiguration() throws Exception {
undeployAddressesAndQueueNotInConfiguration(configuration);
}

private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());

Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getQueueConfigurations)
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
.collect(Collectors.toSet());

for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());

if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null);
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) {
if (!queuesInConfig.contains(queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
}
}
}
}

private Set<SimpleString> listAddressNames() {
return postOffice.getAddresses();
}

private List<Queue> listConfiguredQueues(SimpleString address) throws Exception {
return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList());
}

private List<Queue> listQueues(SimpleString address) throws Exception {
return postOffice.listQueuesForAddress(address);
}

private void deployAddressesFromConfiguration() throws Exception {
deployAddressesFromConfiguration(configuration);
}
Expand Down Expand Up @@ -2818,6 +2870,7 @@ public void reload(URL uri) throws Exception {
}
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
deployAddressesFromConfiguration(config);
undeployAddressesAndQueueNotInConfiguration(config);
}
}

Expand Down
Expand Up @@ -71,10 +71,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable

public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;

public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES = DeletionPolicy.OFF;

public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true;

public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES = true;

public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES = DeletionPolicy.OFF;

public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;

public static final long DEFAULT_EXPIRY_DELAY = -1;
Expand Down Expand Up @@ -148,10 +152,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable

private Boolean autoDeleteQueues = null;

private DeletionPolicy configDeleteQueues = null;

private Boolean autoCreateAddresses = null;

private Boolean autoDeleteAddresses = null;

private DeletionPolicy configDeleteAddresses = null;

private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;

private Long maxSizeBytesRejectThreshold = null;
Expand Down Expand Up @@ -194,8 +202,10 @@ public AddressSettings(AddressSettings other) {
this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
this.autoCreateQueues = other.autoCreateQueues;
this.autoDeleteQueues = other.autoDeleteQueues;
this.configDeleteQueues = other.configDeleteQueues;
this.autoCreateAddresses = other.autoCreateAddresses;
this.autoDeleteAddresses = other.autoDeleteAddresses;
this.configDeleteAddresses = other.configDeleteAddresses;
this.managementBrowsePageSize = other.managementBrowsePageSize;
this.queuePrefetch = other.queuePrefetch;
this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
Expand Down Expand Up @@ -270,6 +280,15 @@ public AddressSettings setAutoDeleteQueues(Boolean autoDeleteQueues) {
return this;
}

public DeletionPolicy getConfigDeleteQueues() {
return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES;
}

public AddressSettings setConfigDeleteQueues(DeletionPolicy configDeleteQueues) {
this.configDeleteQueues = configDeleteQueues;
return this;
}

public boolean isAutoCreateAddresses() {
return autoCreateAddresses != null ? autoCreateAddresses : AddressSettings.DEFAULT_AUTO_CREATE_ADDRESSES;
}
Expand All @@ -288,6 +307,15 @@ public AddressSettings setAutoDeleteAddresses(Boolean autoDeleteAddresses) {
return this;
}

public DeletionPolicy getConfigDeleteAddresses() {
return configDeleteAddresses != null ? configDeleteAddresses : AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES;
}

public AddressSettings setConfigDeleteAddresses(DeletionPolicy configDeleteAddresses) {
this.configDeleteAddresses = configDeleteAddresses;
return this;
}

public int getDefaultMaxConsumers() {
return defaultMaxConsumers != null ? defaultMaxConsumers : ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
}
Expand Down Expand Up @@ -594,12 +622,18 @@ public void merge(final AddressSettings merged) {
if (autoDeleteQueues == null) {
autoDeleteQueues = merged.autoDeleteQueues;
}
if (configDeleteQueues == null) {
configDeleteQueues = merged.configDeleteQueues;
}
if (autoCreateAddresses == null) {
autoCreateAddresses = merged.autoCreateAddresses;
}
if (autoDeleteAddresses == null) {
autoDeleteAddresses = merged.autoDeleteAddresses;
}
if (configDeleteAddresses == null) {
configDeleteAddresses = merged.configDeleteAddresses;
}
if (managementBrowsePageSize == null) {
managementBrowsePageSize = merged.managementBrowsePageSize;
}
Expand Down Expand Up @@ -687,10 +721,25 @@ public void decode(ActiveMQBuffer buffer) {

autoDeleteQueues = BufferHelper.readNullableBoolean(buffer);

policyStr = buffer.readNullableSimpleString();

if (policyStr != null) {
configDeleteQueues = DeletionPolicy.valueOf(policyStr.toString());
} else {
configDeleteQueues = null;
}

autoCreateAddresses = BufferHelper.readNullableBoolean(buffer);

autoDeleteAddresses = BufferHelper.readNullableBoolean(buffer);

policyStr = buffer.readNullableSimpleString();

if (policyStr != null) {
configDeleteAddresses = DeletionPolicy.valueOf(policyStr.toString());
} else {
configDeleteAddresses = null;
}
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);

maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
Expand Down Expand Up @@ -731,9 +780,9 @@ public int getEncodeSize() {
BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoCreateQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteQueues) + BufferHelper.sizeOfNullableSimpleString(configDeleteQueues != null ? configDeleteQueues.toString() : null) +
BufferHelper.sizeOfNullableBoolean(autoCreateAddresses) +
BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) +
BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) + BufferHelper.sizeOfNullableSimpleString(configDeleteAddresses != null ? configDeleteAddresses.toString() : null) +
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold) +
BufferHelper.sizeOfNullableInteger(defaultMaxConsumers) +
Expand Down Expand Up @@ -794,10 +843,14 @@ public void encode(ActiveMQBuffer buffer) {

BufferHelper.writeNullableBoolean(buffer, autoDeleteQueues);

buffer.writeNullableSimpleString(configDeleteQueues != null ? new SimpleString(configDeleteQueues.toString()) : null);

BufferHelper.writeNullableBoolean(buffer, autoCreateAddresses);

BufferHelper.writeNullableBoolean(buffer, autoDeleteAddresses);

buffer.writeNullableSimpleString(configDeleteAddresses != null ? new SimpleString(configDeleteAddresses.toString()) : null);

BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);

BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
Expand Down Expand Up @@ -843,8 +896,10 @@ public int hashCode() {
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
result = prime * result + ((autoCreateQueues == null) ? 0 : autoCreateQueues.hashCode());
result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode());
result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode());
result = prime * result + ((autoCreateAddresses == null) ? 0 : autoCreateAddresses.hashCode());
result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode());
result = prime * result + ((configDeleteAddresses == null) ? 0 : configDeleteAddresses.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : maxSizeBytesRejectThreshold.hashCode());
Expand Down Expand Up @@ -992,6 +1047,11 @@ public boolean equals(Object obj) {
return false;
} else if (!autoDeleteQueues.equals(other.autoDeleteQueues))
return false;
if (configDeleteQueues == null) {
if (other.configDeleteQueues != null)
return false;
} else if (!configDeleteQueues.equals(other.configDeleteQueues))
return false;
if (autoCreateAddresses == null) {
if (other.autoCreateAddresses != null)
return false;
Expand All @@ -1002,6 +1062,11 @@ public boolean equals(Object obj) {
return false;
} else if (!autoDeleteAddresses.equals(other.autoDeleteAddresses))
return false;
if (configDeleteAddresses == null) {
if (other.configDeleteAddresses != null)
return false;
} else if (!configDeleteAddresses.equals(other.configDeleteAddresses))
return false;
if (managementBrowsePageSize == null) {
if (other.managementBrowsePageSize != null)
return false;
Expand Down Expand Up @@ -1101,10 +1166,14 @@ public String toString() {
autoCreateQueues +
", autoDeleteQueues=" +
autoDeleteQueues +
", configDeleteQueues=" +
configDeleteQueues +
", autoCreateAddresses=" +
autoCreateAddresses +
", autoDeleteAddresses=" +
autoDeleteAddresses +
", configDeleteAddresses=" +
configDeleteAddresses +
", managementBrowsePageSize=" +
managementBrowsePageSize +
", defaultMaxConsumers=" +
Expand Down
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.settings.impl;

public enum DeletionPolicy {
OFF, FORCE;
}

0 comments on commit a970b41

Please sign in to comment.