Skip to content

Commit

Permalink
ARTEMIS-2094 - Fix Configuration change loss when network Issue
Browse files Browse the repository at this point in the history
(cherry picked from commit cce0e19)
  • Loading branch information
michaelandrepearce authored and clebertsuconic committed Sep 26, 2018
1 parent 4f6a13e commit a59a589
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 30 deletions.
Expand Up @@ -2390,15 +2390,7 @@ public void run() {
}

// Deploy the rest of the stuff

// Deploy predefined addresses
deployAddressesFromConfiguration();

// Deploy any predefined queues
deployQueuesFromConfiguration();

// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
deployReloadableConfigFromConfiguration(configuration);

// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
Expand Down Expand Up @@ -2620,6 +2612,8 @@ private void deployAddressSettingsFromConfiguration() {
}
}



private JournalLoadInformation[] loadJournals() throws Exception {
JournalLoader journalLoader = activation.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);

Expand Down Expand Up @@ -3169,30 +3163,36 @@ private final class ConfigurationFileReloader implements ReloadCallback {

@Override
public void reload(URL uri) throws Exception {
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());
configuration.setSecurityRoles(config.getSecurityRoles());
configuration.setAddressesSettings(config.getAddressesSettings());
configuration.setDivertConfigurations(config.getDivertConfigurations());
configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations());
if (isActive()) {
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");
securityRepository.swap(config.getSecurityRoles().entrySet());
configuration.setSecurityRoles(config.getSecurityRoles());

ActiveMQServerLogger.LOGGER.reloadingConfiguration("address settings");
addressSettingsRepository.swap(config.getAddressesSettings().entrySet());
configuration.setAddressesSettings(config.getAddressesSettings());

ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
for (DivertConfiguration divertConfig : config.getDivertConfigurations()) {
if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) {
deployDivert(divertConfig);
}
}
deployReloadableConfigFromConfiguration(configuration);
}
}
}

ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
undeployAddressesAndQueueNotInConfiguration(config);
deployAddressesFromConfiguration(config);
configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations());
private void deployReloadableConfigFromConfiguration(Configuration config) throws Exception {
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");
securityRepository.swap(config.getSecurityRoles().entrySet());

ActiveMQServerLogger.LOGGER.reloadingConfiguration("address settings");
addressSettingsRepository.swap(config.getAddressesSettings().entrySet());

ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
for (DivertConfiguration divertConfig : config.getDivertConfigurations()) {
if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) {
deployDivert(divertConfig);
}
}

ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
undeployAddressesAndQueueNotInConfiguration(config);
deployAddressesFromConfiguration(config);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
}

public Set<ActivateCallback> getActivateCallbacks() {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,6 +42,7 @@
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
Expand Down Expand Up @@ -108,6 +110,12 @@ public void run() {

@Test
public void testRedeployWithFailover() throws Exception {

Set<Role> original = new HashSet<>();
original.add(new Role("a", false, true, false, false, false, false, false, false, false, false));
Set<Role> changed = new HashSet<>();
changed.add(new Role("b", false, true, false, false, false, false, false, false, false, false));

EmbeddedJMS live = new EmbeddedJMS();
EmbeddedJMS backup = new EmbeddedJMS();

Expand Down Expand Up @@ -135,6 +143,11 @@ public void testRedeployWithFailover() throws Exception {

Wait.waitFor(() -> backup.getActiveMQServer().isReplicaSync(), 10000, 200);

assertEquals("Test address settings original - live", AddressFullMessagePolicy.BLOCK, live.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test address settings original - backup", AddressFullMessagePolicy.BLOCK, backup.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test security settings original - live", original, live.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
assertEquals("Test security settings original - backup", original, backup.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));

final ReusableLatch liveReloadLatch = new ReusableLatch(1);
Runnable liveTick = () -> liveReloadLatch.countDown();
live.getActiveMQServer().getReloadManager().setTick(liveTick);
Expand Down Expand Up @@ -162,9 +175,13 @@ public void testRedeployWithFailover() throws Exception {
Session session = connection.createSession();
Queue queue = session.createQueue("myQueue2");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
producer.send(session.createTextMessage("text1"));
}

assertFalse(backup.getActiveMQServer().isActive());
assertEquals("Test address settings redeploy - live", AddressFullMessagePolicy.PAGE, live.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test security settings redeploy - live", changed, live.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));

live.stop();

Wait.waitFor(() -> (backup.getActiveMQServer().isActive()), 5000, 100);
Expand All @@ -180,6 +197,8 @@ public void testRedeployWithFailover() throws Exception {
Assert.assertNotNull("Queue wasn't deployed accordingly", consumer.receive(5000));
Assert.assertNotNull(consumer.receive(5000));
}
assertEquals("Test address settings redeploy - backup", changed, backup.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
assertEquals("Test security settings redeploy - backup", AddressFullMessagePolicy.PAGE, backup.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
} finally {
live.stop();
backup.stop();
Expand Down
Expand Up @@ -78,5 +78,19 @@ under the License.
</anycast>
</address>
</addresses>

<address-settings>
<address-setting match="#" >
<address-full-policy>PAGE</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>

<security-settings>
<security-setting match="#" >
<permission type="consume" roles="b" />
</security-setting>
</security-settings>
</core>
</configuration>
Expand Up @@ -73,5 +73,19 @@ under the License.
</anycast>
</address>
</addresses>

<address-settings>
<address-setting match="#" >
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>

<security-settings>
<security-setting match="#" >
<permission type="consume" roles="a" />
</security-setting>
</security-settings>
</core>
</configuration>
14 changes: 14 additions & 0 deletions tests/integration-tests/src/test/resources/reload-live-changed.xml
Expand Up @@ -78,5 +78,19 @@ under the License.
</anycast>
</address>
</addresses>

<address-settings>
<address-setting match="#" >
<address-full-policy>PAGE</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>

<security-settings>
<security-setting match="#" >
<permission type="consume" roles="b" />
</security-setting>
</security-settings>
</core>
</configuration>
Expand Up @@ -73,5 +73,19 @@ under the License.
</anycast>
</address>
</addresses>

<address-settings>
<address-setting match="#" >
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>

<security-settings>
<security-setting match="#" >
<permission type="consume" roles="a" />
</security-setting>
</security-settings>
</core>
</configuration>

0 comments on commit a59a589

Please sign in to comment.