diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java index fe6c16543eb..95d46157101 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java @@ -66,6 +66,17 @@ public class DivertConfiguration implements Serializable, EncodingSupport { public DivertConfiguration() { } + public DivertConfiguration(DivertConfiguration config) { + this.name = config.getName(); + this.routingName = config.getRoutingName(); + this.address = config.getAddress(); + this.forwardingAddress = config.getForwardingAddress(); + this.exclusive = config.isExclusive(); + this.filterString = config.getFilterString(); + this.transformerConfiguration = config.getTransformerConfiguration(); + this.routingType = config.getRoutingType(); + } + /** * Set the value of a parameter based on its "key" {@code String}. Valid key names and corresponding {@code static} * {@code final} are: @@ -108,7 +119,7 @@ public DivertConfiguration set(String key, String value) { setTransformerConfiguration(transformerConfiguration); } } else if (key.equals(ROUTING_TYPE)) { - setRoutingType(ComponentConfigurationRoutingType.valueOf(value)); + setRoutingType(value == null ? null : ComponentConfigurationRoutingType.valueOf(value)); } } return this; @@ -227,7 +238,9 @@ public String toJSON() { builder.add(TRANSFORMER_CONFIGURATION, tc.createJsonObjectBuilder()); } - if (getRoutingType() != null) { + if (getRoutingType() == null) { + builder.add(ROUTING_TYPE, JsonValue.NULL); + } else { builder.add(ROUTING_TYPE, getRoutingType().name()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 9ff3f8ea383..30ef3233f97 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -391,6 +392,8 @@ JournalLoadInformation loadBindingJournal(List queueBindingInf List recoverDivertConfigurations(); + DivertConfiguration getDivertConfiguration(String name); + void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception; void deleteBridgeConfiguration(String bridgeName) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 1389395b3e3..eb6f22717a0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; @@ -845,6 +846,16 @@ public List recoverDivertConfigurations() { return new ArrayList<>(mapPersistedDivertConfigurations.values()); } + @Override + public DivertConfiguration getDivertConfiguration(String name) { + PersistedDivertConfiguration persistedDivertConfiguration = mapPersistedDivertConfigurations.get(name); + if (persistedDivertConfiguration != null) { + return new DivertConfiguration(persistedDivertConfiguration.getDivertConfiguration()); + } else { + return null; + } + } + @Override public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception { storeConfiguration(persistedBridgeConfiguration, mapPersistedBridgeConfigurations); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 47daeae68bc..e8d44ea53dd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; @@ -498,6 +499,11 @@ public List recoverDivertConfigurations() { return null; } + @Override + public DivertConfiguration getDivertConfiguration(String name) { + return null; + } + @Override public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 0956b37172a..93ead521c2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1544,4 +1544,7 @@ void slowConsumerDetected(String sessionID, @LogMessage(id = 224163, value = "Failed to clone SHA256 MessageDigest, falling back to getInstance", level = LogMessage.Level.INFO) void sha256CloneNotSupported(CloneNotSupportedException cns); + + @LogMessage(id = 224164, value = "Failed to recover stored configuration for divert named '{}': {}. To repair this record create a new divert with the same name via the management API.", level = LogMessage.Level.WARN) + void failedToRecoverStoredDivertConfiguration(String divertName, String divert); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 75e5e0284f7..3ae92b1a1bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3069,38 +3069,59 @@ public Divert updateDivert(DivertConfiguration config) throws Exception { return null; } - final Divert divert = divertBinding.getDivert(); + // The divert config may be in defined in the broker config (e.g. XML) or stored in the journal. If it's in the + // journal we want to make sure it's updated propertly otherwise we just update what's in memory. + DivertConfiguration onStorageDivert = storageManager.getDivertConfiguration(config.getName()); + final Divert inMemoryDivert = divertBinding.getDivert(); Filter filter = FilterImpl.createFilter(config.getFilterString()); if (filter == null) { - divert.setFilter(null); + inMemoryDivert.setFilter(null); + if (onStorageDivert != null) { + onStorageDivert.setFilterString(null); + } } else { - if (!filter.equals(divert.getFilter())) { - divert.setFilter(filter); + if (!filter.equals(inMemoryDivert.getFilter())) { + inMemoryDivert.setFilter(filter); + if (onStorageDivert != null) { + onStorageDivert.setFilterString(config.getFilterString()); + } } } if (config.getTransformerConfiguration() != null) { - getServiceRegistry().removeDivertTransformer(divert.getUniqueName().toString()); + getServiceRegistry().removeDivertTransformer(inMemoryDivert.getUniqueName().toString()); Transformer transformer = getServiceRegistry().getDivertTransformer( config.getName(), config.getTransformerConfiguration()); - divert.setTransformer(transformer); + inMemoryDivert.setTransformer(transformer); + if (onStorageDivert != null) { + onStorageDivert.setTransformerConfiguration(config.getTransformerConfiguration()); + } } if (config.getForwardingAddress() != null) { SimpleString forwardAddress = SimpleString.of(config.getForwardingAddress()); - if (!forwardAddress.equals(divert.getForwardAddress())) { - divert.setForwardAddress(forwardAddress); + if (!forwardAddress.equals(inMemoryDivert.getForwardAddress())) { + inMemoryDivert.setForwardAddress(forwardAddress); + if (onStorageDivert != null) { + onStorageDivert.setForwardingAddress(config.getForwardingAddress()); + } } } - if (config.getRoutingType() != null && divert.getRoutingType() != config.getRoutingType()) { - divert.setRoutingType(config.getRoutingType()); + if (config.getRoutingType() != null && inMemoryDivert.getRoutingType() != config.getRoutingType()) { + inMemoryDivert.setRoutingType(config.getRoutingType()); + if (onStorageDivert != null) { + onStorageDivert.setRoutingType(config.getRoutingType()); + } } - storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(config)); + if (onStorageDivert != null) { + // this will replace the existing divert record in the journal using delete + add + storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(onStorageDivert)); + } - return divert; + return inMemoryDivert; } @Override @@ -4465,15 +4486,20 @@ private void recoverStoredDiverts() throws Exception { if (storageManager.recoverDivertConfigurations() != null) { for (PersistedDivertConfiguration persistedDivertConfiguration : storageManager.recoverDivertConfigurations()) { - //has it been removed from config - boolean deleted = configuration.getDivertConfigurations().stream().noneMatch(divertConfiguration -> divertConfiguration.getName().equals(persistedDivertConfiguration.getName())); - // if it has remove it if configured to do so - if (deleted) { - if (addressSettingsRepository.getMatch(persistedDivertConfiguration.getDivertConfiguration().getAddress()).getConfigDeleteDiverts() == DeletionPolicy.FORCE) { - storageManager.deleteDivertConfiguration(persistedDivertConfiguration.getName()); - } else { - deployDivert(persistedDivertConfiguration.getDivertConfiguration()); + try { + //has it been removed from config + boolean deleted = configuration.getDivertConfigurations().stream().noneMatch(divertConfiguration -> divertConfiguration.getName().equals(persistedDivertConfiguration.getName())); + // if it has remove it if configured to do so + if (deleted) { + if (addressSettingsRepository.getMatch(persistedDivertConfiguration.getDivertConfiguration().getAddress()).getConfigDeleteDiverts() == DeletionPolicy.FORCE) { + storageManager.deleteDivertConfiguration(persistedDivertConfiguration.getName()); + } else { + deployDivert(persistedDivertConfiguration.getDivertConfiguration()); + } } + } catch (Exception e) { + logger.debug(e.getMessage(), e); + ActiveMQServerLogger.LOGGER.failedToRecoverStoredDivertConfiguration(persistedDivertConfiguration.getName(), String.valueOf(persistedDivertConfiguration.getDivertConfiguration())); } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 524f7656284..e99d97897b9 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -746,6 +747,11 @@ public List recoverDivertConfigurations() { return null; } + @Override + public DivertConfiguration getDivertConfiguration(String name) { + return null; + } + @Override public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 76e0c026870..31782823684 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; @@ -748,6 +749,11 @@ public List recoverDivertConfigurations() { return null; } + @Override + public DivertConfiguration getDivertConfiguration(String name) { + return null; + } + @Override public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertManagementTest.java new file mode 100644 index 00000000000..d094618ffea --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertManagementTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.management; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.Divert; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DivertManagementTest extends ManagementTestBase { + + private ActiveMQServer server; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + Configuration conf = createDefaultNettyConfig().setJMXManagementEnabled(true); + server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer)); + server.start(); + } + + @Test + public void testBadJournalRecord() throws Exception { + final String name = RandomUtil.randomUUIDString(); + String routingName = RandomUtil.randomUUIDString(); + String address = RandomUtil.randomUUIDString(); + String forwardingAddress = RandomUtil.randomUUIDString(); + ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.getType((byte) RandomUtil.randomMax(3)); + boolean exclusive = RandomUtil.randomBoolean(); + + DivertConfiguration config = new DivertConfiguration() + .setName(name); + + // store an invalid divert config (i.e. missing address) + server.getStorageManager().storeDivertConfiguration(new PersistedDivertConfiguration(config)); + + server.stop(); + waitForServerToStop(server); + + // start the broker and verify the invalid divert config failed to load + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + server.start(); + waitForServerToStart(server); + assertTrue(loggerHandler.findText("AMQ224164", name)); + } + assertNull(server.getPostOffice().getBinding(SimpleString.of(name))); + + // create a new divert with the same name + ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); + config.setAddress(address) + .setRoutingType(routingType) + .setRoutingName(routingName) + .setForwardingAddress(forwardingAddress) + .setExclusive(exclusive); + + serverControl.createDivert(config.toJSON()); + + // verify divert was created + assertInstanceOf(DivertBinding.class, server.getPostOffice().getBinding(SimpleString.of(name))); + assertEquals(1, server.getPostOffice().getBindingsForAddress(SimpleString.of(address)).size()); + + server.stop(); + waitForServerToStop(server); + + // start the broker and verify that the invalid divert config is no longer there + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + server.start(); + waitForServerToStart(server); + assertFalse(loggerHandler.findText("AMQ224164")); + } + + Binding binding = server.getPostOffice().getBinding(SimpleString.of(name)); + assertInstanceOf(DivertBinding.class, binding); + assertEquals(1, server.getPostOffice().getBindingsForAddress(SimpleString.of(address)).size()); + + Divert divert = ((DivertBinding) binding).getDivert(); + assertEquals(name, divert.getUniqueName().toString()); + assertEquals(address, divert.getAddress().toString()); + assertEquals(routingType, divert.getRoutingType()); + assertEquals(routingName, divert.getRoutingName().toString()); + assertEquals(forwardingAddress, divert.getForwardAddress().toString()); + assertEquals(exclusive, divert.isExclusive()); + } + + @Test + public void testPersistentUpdate() throws Exception { + String name = RandomUtil.randomUUIDString(); + String routingName = RandomUtil.randomUUIDString(); + String address = RandomUtil.randomUUIDString(); + String forwardingAddress = RandomUtil.randomUUIDString(); + ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.getType((byte) RandomUtil.randomMax(3)); + boolean exclusive = RandomUtil.randomBoolean(); + + ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); + DivertConfiguration config = new DivertConfiguration() + .setName(name) + .setAddress(address) + .setRoutingType(routingType) + .setRoutingName(routingName) + .setForwardingAddress(forwardingAddress) + .setExclusive(exclusive); + + serverControl.createDivert(config.toJSON()); + + String updatedForwardingAddress = RandomUtil.randomUUIDString(); + serverControl.updateDivert(new DivertConfiguration() + .setName(name) + .setForwardingAddress(updatedForwardingAddress) + .setRoutingType(null) // must set to null to avoid updating since the default is non-null + .toJSON()); + + server.stop(); + waitForServerToStop(server); + server.start(); + waitForServerToStart(server); + + Binding binding = server.getPostOffice().getBinding(SimpleString.of(name)); + assertInstanceOf(DivertBinding.class, binding); + assertEquals(1, server.getPostOffice().getBindingsForAddress(SimpleString.of(address)).size()); + + Divert divert = ((DivertBinding) binding).getDivert(); + assertEquals(name, divert.getUniqueName().toString()); + assertEquals(address, divert.getAddress().toString()); + assertEquals(routingType, divert.getRoutingType()); + assertEquals(routingName, divert.getRoutingName().toString()); + assertEquals(updatedForwardingAddress, divert.getForwardAddress().toString()); + assertEquals(exclusive, divert.isExclusive()); + } +}