Skip to content

Commit

Permalink
ARTEMIS-2065 Can't change queue routing-type between restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Aug 30, 2018
1 parent 4f1e74b commit 3827c54
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 17 deletions.
Expand Up @@ -44,6 +44,7 @@

import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
Expand Down Expand Up @@ -1898,12 +1899,12 @@ void slowConsumerDetected(String sessionID,
void criticalSystemLog(Object component);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
@Message(id = 224076, value = "Undeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployAddress(SimpleString addressName);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(SimpleString queueName);
@Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(RoutingType routingType, SimpleString queueName);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)
Expand Down
Expand Up @@ -2601,6 +2601,9 @@ public void run() {
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}

// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();

// Deploy the rest of the stuff

// Deploy predefined addresses
Expand All @@ -2609,9 +2612,6 @@ public void run() {
// Deploy any predefined queues
deployQueuesFromConfiguration();

// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();

// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
Expand Down Expand Up @@ -2698,25 +2698,28 @@ private void undeployAddressesAndQueueNotInConfiguration(Configuration configura
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());

Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getQueueConfigurations)
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
.collect(Collectors.toSet());
Set<String> queuesInConfig = new HashSet<>();
for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
// combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
}
}

for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());

if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
queue.deleteQueue(true);
}
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null);
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) {
if (!queuesInConfig.contains(queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
queue.deleteQueue(true);
}
}
Expand Down Expand Up @@ -3441,8 +3444,8 @@ public void reload(URL uri) throws Exception {
}

ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
deployAddressesFromConfiguration(config);
undeployAddressesAndQueueNotInConfiguration(config);
deployAddressesFromConfiguration(config);
configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations());
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
Expand Down Expand Up @@ -196,9 +197,6 @@ private boolean tryConsume() throws JMSException {

}




@Test
public void testRedeployAddressQueue() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
Expand Down Expand Up @@ -268,6 +266,46 @@ public void run() {
}
}

@Test
public void testRedeployChangeQueueRoutingType() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml");
Files.copy(url1.openStream(), brokerXML);

EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();

final ReusableLatch latch = new ReusableLatch(1);

Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};

embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);

try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());

Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);

Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
} finally {
embeddedActiveMQ.stop();
}
}



/**
Expand Down
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.integration.persistence;

import java.util.ArrayList;
import java.util.List;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;

public class ConfigChangeTest extends ActiveMQTestBase {

private ActiveMQServer server;

@Test
public void testChangeQueueRoutingTypeOnRestart() throws Exception {
internalTestChangeQueueRoutingTypeOnRestart(false);
}

@Test
public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
internalTestChangeQueueRoutingTypeOnRestart(true);
}

public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
// if negative == true then the queue's routing type should *not* change

Configuration configuration = createDefaultInVMConfig();
configuration.addAddressesSetting("#", new AddressSettings()
.setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
.setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));

List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new CoreQueueConfiguration()
.setName("myQueue")
.setAddress("myAddress")
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
server.stop();

addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.MULTICAST)
.addQueueConfiguration(new CoreQueueConfiguration()
.setName("myQueue")
.setAddress("myAddress")
.setRoutingType(RoutingType.MULTICAST));
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);

server.start();
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
server.stop();
}
}
@@ -0,0 +1,40 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core">
<address-settings>
<address-setting match="#">
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>

<addresses>
<address name="myAddress">
<anycast>
<queue name="myQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>
@@ -0,0 +1,40 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core">
<address-settings>
<address-setting match="#">
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>

<addresses>
<address name="myAddress">
<multicast>
<queue name="myQueue"/>
</multicast>
</address>
</addresses>
</core>
</configuration>

0 comments on commit 3827c54

Please sign in to comment.