Skip to content

Commit

Permalink
ARTEMIS-2064 make address & queue deployment more robust
Browse files Browse the repository at this point in the history
Any failure to deploy an address or queue will short-circuit the broker
initialization process preventing any other addresses or queues from
being deployed as well as other critical resources like acceptors, etc.

(cherry picked from commit b0d30d4)
  • Loading branch information
jbertram committed Aug 30, 2018
1 parent 6852114 commit 4ce8743
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 27 deletions.
Expand Up @@ -65,4 +65,14 @@ public CoreAddressConfiguration addQueueConfiguration(CoreQueueConfiguration que
public List<CoreQueueConfiguration> getQueueConfigurations() {
return queueConfigurations;
}

@Override
public String toString() {
return "CoreAddressConfiguration[" +
"name=" + name +
", routingTypes=" + routingTypes +
", queueConfigurations=" + queueConfigurations +
"]";
}

}
Expand Up @@ -183,6 +183,7 @@ public int hashCode() {
result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
result = prime * result + (maxConsumerConfigured ? 1331 : 1337);
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
return result;
}

Expand Down Expand Up @@ -237,6 +238,27 @@ public boolean equals(Object obj) {
} else if (!lastValue.equals(other.lastValue)) {
return false;
}
if (routingType == null) {
if (other.routingType != null)
return false;
} else if (!routingType.equals(other.routingType)) {
return false;
}
return true;
}

@Override
public String toString() {
return "CoreQueueConfiguration[" +
"name=" + name +
", address=" + address +
", routingType=" + routingType +
", durable=" + durable +
", filterString=" + filterString +
", maxConsumers=" + maxConsumers +
", purgeOnNoConsumers=" + purgeOnNoConsumers +
", exclusive=" + exclusive +
", lastValue=" + lastValue +
"]";
}
}
Expand Up @@ -764,8 +764,7 @@ private void parseAddresses(final Element e, final Configuration config) {
Element node = (Element) elements.item(0);
NodeList list = node.getElementsByTagName("address");
for (int i = 0; i < list.getLength(); i++) {
CoreAddressConfiguration addrConfig = parseAddressConfiguration(list.item(i));
config.getAddressConfigurations().add(addrConfig);
config.addAddressConfiguration(parseAddressConfiguration(list.item(i)));
}
}
}
Expand Down
Expand Up @@ -1593,6 +1593,16 @@ void slowConsumerDetected(String sessionID,
@Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
void bridgeAddressFull(String addressName, String bridgeName);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222274, value = "Failed to deploy address {0}: {1}",
format = Message.Format.MESSAGE_FORMAT)
void problemDeployingAddress(String addressName, String message);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222275, value = "Failed to deploy queue {0}: {1}",
format = Message.Format.MESSAGE_FORMAT)
void problemDeployingQueue(String queueName, String message);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
Expand Down
Expand Up @@ -2530,37 +2530,45 @@ private void deployAddressesFromConfiguration() throws Exception {

private void deployAddressesFromConfiguration(Configuration configuration) throws Exception {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
addOrUpdateAddressInfo(info);
ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
try {
ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
addOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage());
}
}
}

private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
SimpleString queueName = SimpleString.toSimpleString(config.getName());
ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString());
AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
// determine if there is an address::queue match; update it if so
int maxConsumerAddressSetting = as.getDefaultMaxConsumers();
int maxConsumerQueueConfig = config.getMaxConsumers();
int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting;
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(),
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
} else {
// if the address::queue doesn't exist then create it
try {
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
config.isDurable(),false,false,false,false,maxConsumer,config.getPurgeOnNoConsumers(),
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(),
config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
try {
SimpleString queueName = SimpleString.toSimpleString(config.getName());
ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString());
AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
// determine if there is an address::queue match; update it if so
int maxConsumerAddressSetting = as.getDefaultMaxConsumers();
int maxConsumerQueueConfig = config.getMaxConsumers();
int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting;
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(),
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
} else {
// if the address::queue doesn't exist then create it
try {
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
config.isDurable(),false,false,false,false,maxConsumer,config.getPurgeOnNoConsumers(),
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(),
config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
}
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.problemDeployingQueue(config.getName(), e.getMessage());
}
}
}
Expand Down
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;

import java.util.UUID;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(BMUnitRunner.class)
public class AddressDeploymentFailedTest extends ActiveMQTestBase {

@Test
@BMRule(name = "blow up address deployment",
targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl",
targetMethod = "addOrUpdateAddressInfo(AddressInfo)",
targetLocation = "EXIT",
action = "throw new IllegalStateException(\"test exception\")")
public void testAddressDeploymentFailure() throws Exception {
ActiveMQServer server = createServer(false, createDefaultNettyConfig());
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(UUID.randomUUID().toString()).addRoutingType(RoutingType.ANYCAST));
server.start();
assertTrue(server.getRemotingService().isStarted());
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;

import java.util.UUID;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(BMUnitRunner.class)
public class QueueDeploymentFailedTest extends ActiveMQTestBase {

@Test
@BMRule(name = "blow up queue deployment",
targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl",
targetMethod = "createQueue(SimpleString,RoutingType,SimpleString,SimpleString,SimpleString,boolean,boolean,boolean,boolean,boolean,int,boolean,boolean,boolean,int,long,boolean",
targetLocation = "EXIT",
action = "throw new IllegalStateException(\"test exception\")")
public void testQueueDeploymentFailure() throws Exception {
ActiveMQServer server = createServer(false, createDefaultNettyConfig());
String address = UUID.randomUUID().toString();
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(address).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new CoreQueueConfiguration().setName(UUID.randomUUID().toString()).setRoutingType(RoutingType.ANYCAST).setAddress(address)));
server.start();
assertTrue(server.getRemotingService().isStarted());
}
}

0 comments on commit 4ce8743

Please sign in to comment.