Skip to content
Permalink
Browse files
ARTEMIS-3759 Add mirror controller address filter support
Allow replication only certain addresses with mirror controller.
The configuration is similar to cluster address configuration.

Co-authored-by: Robbie Gemmell <robbie@apache.org>
  • Loading branch information
2 people authored and clebertsuconic committed May 5, 2022
1 parent dc51435 commit 0b321ab8ff62268d55df56e504cbfc043e2de8eb
Showing 9 changed files with 254 additions and 2 deletions.
@@ -82,6 +82,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
final boolean acks;
final boolean addQueues;
final boolean deleteQueues;
final MirrorAddressFilter addressFilter;
private final AMQPBrokerConnection brokerConnection;

final AMQPMirrorBrokerConnectionElement replicaConfig;
@@ -110,6 +111,7 @@ public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Q
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.addQueues = replicaConfig.isQueueCreation();
this.deleteQueues = replicaConfig.isQueueRemoval();
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
this.acks = replicaConfig.isMessageAcknowledgements();
this.brokerConnection = brokerConnection;
}
@@ -131,6 +133,11 @@ public void addAddress(AddressInfo addressInfo) throws Exception {
if (getControllerInUse() != null && !addressInfo.isInternal()) {
return;
}

if (ignoreAddress(addressInfo.getName())) {
return;
}

if (addQueues) {
Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON());
route(server, message);
@@ -145,6 +152,9 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
return;
}
if (ignoreAddress(addressInfo.getName())) {
return;
}
if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
route(server, message);
@@ -162,6 +172,12 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}
return;
}
if (ignoreAddress(queueConfiguration.getAddress())) {
if (logger.isTraceEnabled()) {
logger.trace("Skipping create " + queueConfiguration + ", queue address " + queueConfiguration.getAddress() + " doesn't match filter");
}
return;
}
if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
route(server, message);
@@ -178,6 +194,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
return;
}

if (ignoreAddress(address)) {
return;
}

if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
route(server, message);
@@ -188,12 +208,18 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean ignoreAddress(SimpleString address) {
return !addressFilter.match(address);
}

private boolean sameNode(String remoteID, String sourceID) {
return (remoteID != null && sourceID != null && remoteID.equals(sourceID));
}

@Override
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
SimpleString address = context.getAddress(message);

if (invalidTarget(context.getMirrorSource())) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid infinite loop (reflection with the mirror)");
@@ -208,6 +234,13 @@ public void sendMessage(Message message, RoutingContext context, List<MessageRef
return;
}

if (ignoreAddress(address)) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to address " + address + ", address doesn't match filter");
}
return;
}

if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}
@@ -301,6 +334,13 @@ public void postAcknowledge(MessageReference ref, AckReason reason) throws Excep
return;
}

if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
logger.trace(server + " rejecting postAcknowledge queue=" + ref.getQueue().getName() + ", ref=" + ref + ", queue address is excluded");
}
return;
}

if (logger.isTraceEnabled()) {
logger.trace(server + " postAcknowledge " + ref);
}
@@ -337,4 +377,5 @@ public boolean isMirrorController() {
return true;
}
}

}
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.util.HashSet;
import java.util.Set;

import org.apache.activemq.artemis.api.core.SimpleString;

public class MirrorAddressFilter {

private final SimpleString[] allowList;

private final SimpleString[] denyList;

public MirrorAddressFilter(String filter) {
Set<SimpleString> allowList = new HashSet<>();
Set<SimpleString> denyList = new HashSet<>();

if (filter != null && !filter.isEmpty()) {
String[] parts = filter.split(",");
for (String part : parts) {
if (!"".equals(part) && !"!".equals(part)) {
if (part.startsWith("!")) {
denyList.add(new SimpleString(part.substring(1)));
} else {
allowList.add(new SimpleString(part));
}
}
}
}

this.allowList = allowList.toArray(new SimpleString[]{});
this.denyList = denyList.toArray(new SimpleString[]{});
}

public boolean match(SimpleString checkAddress) {
if (denyList.length > 0) {
for (SimpleString pattern : denyList) {
if (checkAddress.startsWith(pattern)) {
return false;
}
}
}

if (allowList.length > 0) {
for (SimpleString pattern : allowList) {
if (checkAddress.startsWith(pattern)) {
return true;
}
}
return false;
}
return true;
}
}
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.junit.Assert;
import org.junit.Test;

public class MirrorAddressFilterTest {

@Test
public void testAddressFilter() {
Assert.assertTrue(new MirrorAddressFilter("").match(new SimpleString("any")));
Assert.assertTrue(new MirrorAddressFilter("test").match(new SimpleString("test123")));
Assert.assertTrue(new MirrorAddressFilter("a,b").match(new SimpleString("b")));
Assert.assertTrue(new MirrorAddressFilter("!c").match(new SimpleString("a")));
Assert.assertTrue(new MirrorAddressFilter("!a,!").match(new SimpleString("b123")));
Assert.assertFalse(new MirrorAddressFilter("a,b,!ab").match(new SimpleString("ab")));
Assert.assertFalse(new MirrorAddressFilter("!a,!b").match(new SimpleString("b123")));
Assert.assertFalse(new MirrorAddressFilter("a,").match(new SimpleString("b")));
}
}
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme

SimpleString mirrorSNF;

String addressFilter;

public SimpleString getMirrorSNF() {
return mirrorSNF;
}
@@ -86,4 +88,14 @@ public AMQPMirrorBrokerConnectionElement setMessageAcknowledgements(boolean mess
this.messageAcknowledgements = messageAcknowledgements;
return this;
}

public String getAddressFilter() {
return addressFilter;
}

public AMQPMirrorBrokerConnectionElement setAddressFilter(String addressFilter) {
this.addressFilter = addressFilter;
return this;
}

}
@@ -2094,8 +2094,10 @@ private void parseAMQPBrokerConnections(final Element e,
boolean queueCreation = getBooleanAttribute(e2,"queue-creation", true);
boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
String addressFilter = getAttributeValue(e2, "address-filter");

AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable);
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter);
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
} else {
@@ -2442,6 +2442,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
This defines a filter that mirror will use to determine witch events will be forwarded toward
target server based on source address.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>

</xsd:complexType>

@@ -442,7 +442,7 @@
<receiver address-match="TEST-RECEIVER" />
<peer address-match="TEST-PEER"/>
<receiver queue-name="TEST-WITH-QUEUE-NAME"/>
<mirror message-acknowledgements="false" queue-creation="false" durable="false" queue-removal="false"/>
<mirror message-acknowledgements="false" queue-creation="false" durable="false" queue-removal="false" address-filter="TEST-QUEUE,!IGNORE-QUEUE"/>
</amqp-connection>
<amqp-connection uri="tcp://test2:222" name="test2">
<mirror durable="false"/>
@@ -103,6 +103,25 @@ The following optional arguments can be utilized:
* `queue-removal`: Specifies whether a queue- or address-removal event is sent. The default value is `true`.
* `message-acknowledgements`: Specifies whether message acknowledgements are sent. The default value is `true`.
* `queue-creation`: Specifies whether a queue- or address-creation event is sent. The default value is `true`.
* `address-filter`: An optional comma-separated list of inclusion and/or exclusion filter entries used to govern which addresses (and related queues) mirroring events will be created for on this broker-connection. That is, events will only be mirrored to the target broker for addresses that match the filter.
An address is matched when it begins with an inclusion entry specified in this field, unless the address is also explicitly excluded by another entry. An exclusion entry is prefixed with `!` to denote any address beginning with that value does not match.
If no inclusion entry is specified in the list, all addresses not explicitly excluded will match. If the address-filter attribute is not specified, then all addresses (and related queues) will match and be mirrored.

Examples:

- 'eu'
matches all addresses starting with 'eu'
- '!eu'
matches all address except for those starting with 'eu'
- 'eu.uk,eu.de'
matches all addresses starting with either 'eu.uk' or 'eu.de'
- 'eu,!eu.uk'
matches all addresses starting with 'eu' but not those starting with 'eu.uk'

**Note:**

- Address exclusion will always take precedence over address inclusion.
- Address matching on mirror elements is prefix-based and does not support wild-card matching.

An example of a mirror configuration is shown below:
```xml
@@ -473,6 +473,70 @@ public void testNoAddressWithAnnotations() throws Exception {
}
}

@Test
public void testAddressFilter() throws Exception {
final String REPLICATED = "replicated";
final String NON_REPLICATED = "nonReplicated";
final String ADDRESS_FILTER = REPLICATED + "," + "!" + NON_REPLICATED;
final String MSG = "msg";

server.start();

server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("server_2");

AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("mirror-source", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setDurable(true).setAddressFilter(ADDRESS_FILTER);
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);

server_2.start();

try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2).createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Send to non replicated address
try (MessageProducer producer = session.createProducer(session.createQueue(NON_REPLICATED))) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 2; i++) {
producer.send(session.createTextMessage("never receive"));
}
}

// Check nothing was added to SnF queue
Assert.assertEquals(0, server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded());

// Send to replicated address
try (MessageProducer producer = session.createProducer(session.createQueue(REPLICATED))) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 2; i++) {
producer.send(session.createTextMessage(MSG));
}
}

// Check some messages were sent to SnF queue
Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded() > 0);
}

try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT).createConnection()) {
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

try (MessageConsumer consumer = session.createConsumer(session.createQueue(REPLICATED))) {
Message message = consumer.receive(3000);
Assert.assertNotNull(message);
Assert.assertEquals(MSG, message.getBody(String.class));
}

try (MessageConsumer consumer = session.createConsumer(session.createQueue(NON_REPLICATED))) {
Assert.assertNull(consumer.receiveNoWait());
}
}

}

@Test
public void testRouteSurviving() throws Exception {
testRouteSurvivor(false);

0 comments on commit 0b321ab

Please sign in to comment.